mirror of
https://github.com/Astatin3/unshell.git
synced 2026-06-08 22:38:01 -06:00
Rewrite protocol flow around compiled routing
Compile routing prefixes once per endpoint, restore minimal pending-to-active hook transitions, and route call/data/fault packets from the header before decoding payloads for local delivery only. Document the remaining protocol-level pressure points in src/protocol/PROTOCOL_CHANGES.md.
This commit is contained in:
@@ -5,13 +5,13 @@
|
||||
|
||||
use alloc::{collections::BTreeSet, string::String, vec::Vec};
|
||||
|
||||
use crate::protocol::tree::ActiveHook;
|
||||
use crate::protocol::tree::{ActiveHook, HookKey};
|
||||
use crate::protocol::{
|
||||
CallMessage, DataMessage, FrameBytes, HookTarget, PacketHeader, PacketType, ValidationError,
|
||||
encode_packet, validate_call, validate_header, validate_procedure_id,
|
||||
};
|
||||
|
||||
use super::super::RouteDecision;
|
||||
use super::super::{CompiledRoutes, RouteDecision};
|
||||
use super::core::{ChildRoute, EndpointError, EndpointOutcome, ProtocolEndpoint};
|
||||
use crate::protocol::tree::LeafSpec;
|
||||
|
||||
@@ -63,6 +63,8 @@ impl ProtocolEndpoint {
|
||||
peer_path: header.dst_path.clone(),
|
||||
procedure_id: call.procedure_id.clone(),
|
||||
dst_leaf: header.dst_leaf.clone(),
|
||||
local_ended: false,
|
||||
peer_ended: false,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
@@ -114,9 +116,15 @@ impl ProtocolEndpoint {
|
||||
children: Vec<ChildRoute>,
|
||||
leaves: Vec<LeafSpec>,
|
||||
) -> Self {
|
||||
let registered_children = children
|
||||
.iter()
|
||||
.filter(|child| child.state == super::core::ConnectionState::Registered)
|
||||
.map(|child| child.path.clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Self {
|
||||
routing: CompiledRoutes::new(&path, ®istered_children, parent_path.is_some()),
|
||||
path,
|
||||
parent_path,
|
||||
children,
|
||||
leaves: leaves
|
||||
.into_iter()
|
||||
@@ -202,6 +210,13 @@ impl ProtocolEndpoint {
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
let (header, message) = self.prepare_data(dst_path, hook_id, procedure_id, data, end_hook)?;
|
||||
|
||||
if end_hook {
|
||||
let key = HookKey::new(self.path.clone(), hook_id);
|
||||
if self.hooks.mark_local_end(&key) {
|
||||
self.hooks.remove_active(&key);
|
||||
}
|
||||
}
|
||||
|
||||
match self.decide_route(&header.dst_path) {
|
||||
RouteDecision::Local => self.handle_local_data(header, message),
|
||||
route => Ok(EndpointOutcome::forward(route, encode_packet(&header, &message)?)),
|
||||
|
||||
@@ -16,7 +16,7 @@ use crate::protocol::{
|
||||
CallMessage, DataMessage, FaultMessage, FrameBytes, FrameError, PacketHeader, ValidationError,
|
||||
};
|
||||
|
||||
use super::super::{HookTable, RouteDecision};
|
||||
use super::super::{CompiledRoutes, HookTable, RouteDecision};
|
||||
|
||||
/// Local connection state used for child route eligibility.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -182,8 +182,8 @@ pub trait Endpoint {
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ProtocolEndpoint {
|
||||
pub(crate) path: Vec<String>,
|
||||
pub(crate) parent_path: Option<Vec<String>>,
|
||||
pub(crate) children: Vec<ChildRoute>,
|
||||
pub(crate) routing: CompiledRoutes,
|
||||
pub(crate) leaves: BTreeMap<String, LeafSpec>,
|
||||
pub(crate) endpoint_procedures: BTreeSet<String>,
|
||||
pub(crate) hooks: HookTable,
|
||||
|
||||
@@ -9,7 +9,7 @@ use crate::protocol::{
|
||||
DataMessage, FaultMessage, PacketHeader, PacketType, ProtocolFault, encode_packet,
|
||||
};
|
||||
|
||||
use super::super::{HookKey, RouteDecision, route_destination};
|
||||
use super::super::{HookKey, RouteDecision};
|
||||
use super::core::{EndpointError, EndpointOutcome, Ingress, LocalEvent, ProtocolEndpoint};
|
||||
|
||||
impl ProtocolEndpoint {
|
||||
@@ -23,6 +23,7 @@ impl ProtocolEndpoint {
|
||||
return Ok(EndpointOutcome::dropped());
|
||||
};
|
||||
|
||||
self.hooks.remove_pending(&key);
|
||||
self.hooks.remove_active(&key);
|
||||
|
||||
let header = PacketHeader {
|
||||
@@ -82,7 +83,7 @@ impl ProtocolEndpoint {
|
||||
return Ok(EndpointOutcome::dropped());
|
||||
}
|
||||
|
||||
if message.end_hook {
|
||||
if message.end_hook && self.hooks.mark_peer_end(&key) {
|
||||
self.hooks.remove_active(&key);
|
||||
}
|
||||
|
||||
@@ -100,7 +101,16 @@ impl ProtocolEndpoint {
|
||||
header.hook_id.expect("validated"),
|
||||
&header.src_path,
|
||||
) else {
|
||||
return Ok(EndpointOutcome::dropped());
|
||||
let key = HookKey::new(self.path.clone(), header.hook_id.expect("validated"));
|
||||
let matches_pending = self
|
||||
.hooks
|
||||
.pending(&key)
|
||||
.is_some_and(|pending| pending.caller_src_path == header.src_path);
|
||||
if !matches_pending {
|
||||
return Ok(EndpointOutcome::dropped());
|
||||
}
|
||||
self.hooks.remove_pending(&key);
|
||||
return Ok(EndpointOutcome::event(LocalEvent::Fault { header, message }));
|
||||
};
|
||||
|
||||
self.hooks.remove_active(&key);
|
||||
@@ -110,18 +120,7 @@ impl ProtocolEndpoint {
|
||||
|
||||
/// Chooses the next hop using the protocol's longest-prefix routing rule.
|
||||
pub(crate) fn decide_route(&self, dst_path: &[String]) -> RouteDecision {
|
||||
let child_paths = self
|
||||
.children
|
||||
.iter()
|
||||
.filter(|child| child.state == super::core::ConnectionState::Registered)
|
||||
.map(|child| &child.path);
|
||||
|
||||
route_destination(
|
||||
&self.path,
|
||||
child_paths,
|
||||
self.parent_path.is_some(),
|
||||
dst_path,
|
||||
)
|
||||
self.routing.route(dst_path)
|
||||
}
|
||||
|
||||
/// Validates whether a source path is attributable to the ingress side.
|
||||
|
||||
@@ -4,11 +4,12 @@
|
||||
//! the `Call`, `Data`, and `Fault` sections of `PROTOCOL.md`.
|
||||
|
||||
use crate::protocol::{
|
||||
CallMessage, PacketType, ProtocolFault, decode_frame, introspection::INTROSPECTION_PROCEDURE_ID,
|
||||
validate_call, validate_header,
|
||||
CallMessage, PacketType, ProtocolFault, decode_frame, deserialize_archived_bytes,
|
||||
introspection::INTROSPECTION_PROCEDURE_ID, validate_call, validate_header,
|
||||
};
|
||||
use crate::protocol::types::{ArchivedCallMessage, ArchivedDataMessage, ArchivedFaultMessage};
|
||||
|
||||
use super::super::{ActiveHook, HookKey, RouteDecision};
|
||||
use super::super::{HookKey, PendingHook, RouteDecision};
|
||||
use super::core::{
|
||||
Endpoint, EndpointError, EndpointOutcome, Ingress, LocalEvent, ProtocolEndpoint,
|
||||
};
|
||||
@@ -57,18 +58,26 @@ impl ProtocolEndpoint {
|
||||
|
||||
if let Some(hook) = &message.response_hook
|
||||
&& hook.return_path != self.path
|
||||
&& self
|
||||
{
|
||||
if self
|
||||
.hooks
|
||||
.insert_active(ActiveHook {
|
||||
.insert_pending(PendingHook {
|
||||
return_path: hook.return_path.clone(),
|
||||
hook_id: hook.hook_id,
|
||||
peer_path: header.src_path.clone(),
|
||||
caller_src_path: header.src_path.clone(),
|
||||
procedure_id: message.procedure_id.clone(),
|
||||
dst_leaf: header.dst_leaf.clone(),
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
return self.emit_fault_if_possible(key, ProtocolFault::INTERNAL_ERROR);
|
||||
{
|
||||
return self.emit_fault_if_possible(key, ProtocolFault::INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
if let Some(key) = &key
|
||||
&& self.hooks.activate_pending(key).is_none()
|
||||
{
|
||||
return self.emit_fault_if_possible(Some(key.clone()), ProtocolFault::INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(EndpointOutcome::event(LocalEvent::Call { header, message }))
|
||||
@@ -95,12 +104,10 @@ impl Endpoint for ProtocolEndpoint {
|
||||
|
||||
match header.packet_type {
|
||||
PacketType::Call => {
|
||||
let message = parsed.deserialize_call()?;
|
||||
if !matches!(ingress, Ingress::Parent | Ingress::Local) {
|
||||
return Ok(EndpointOutcome::dropped());
|
||||
}
|
||||
|
||||
validate_call(header, &message)?;
|
||||
match self.decide_route(&header.dst_path) {
|
||||
RouteDecision::Child(index) => {
|
||||
Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame))
|
||||
@@ -109,14 +116,24 @@ impl Endpoint for ProtocolEndpoint {
|
||||
Ok(EndpointOutcome::forward(RouteDecision::Parent, frame))
|
||||
}
|
||||
RouteDecision::Drop => Ok(EndpointOutcome::dropped()),
|
||||
RouteDecision::Local => self.handle_local_call(parsed.deserialize_header(), message),
|
||||
RouteDecision::Local => {
|
||||
let (header, payload) = parsed.into_parts();
|
||||
let message =
|
||||
deserialize_archived_bytes::<ArchivedCallMessage, CallMessage>(payload)?;
|
||||
validate_call(&header, &message)?;
|
||||
self.handle_local_call(header, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
PacketType::Data => {
|
||||
match self.decide_route(&header.dst_path) {
|
||||
RouteDecision::Local => {
|
||||
let message = parsed.deserialize_data()?;
|
||||
self.handle_local_data(parsed.deserialize_header(), message)
|
||||
let (header, payload) = parsed.into_parts();
|
||||
let message = deserialize_archived_bytes::<
|
||||
ArchivedDataMessage,
|
||||
crate::protocol::DataMessage,
|
||||
>(payload)?;
|
||||
self.handle_local_data(header, message)
|
||||
}
|
||||
RouteDecision::Child(index) => {
|
||||
Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame))
|
||||
@@ -130,8 +147,12 @@ impl Endpoint for ProtocolEndpoint {
|
||||
PacketType::Fault => {
|
||||
match self.decide_route(&header.dst_path) {
|
||||
RouteDecision::Local => {
|
||||
let message = parsed.deserialize_fault()?;
|
||||
self.handle_local_fault(parsed.deserialize_header(), message)
|
||||
let (header, payload) = parsed.into_parts();
|
||||
let message = deserialize_archived_bytes::<
|
||||
ArchivedFaultMessage,
|
||||
crate::protocol::FaultMessage,
|
||||
>(payload)?;
|
||||
self.handle_local_fault(header, message)
|
||||
}
|
||||
RouteDecision::Child(index) => {
|
||||
Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame))
|
||||
|
||||
Reference in New Issue
Block a user