From 371f3ae492053b68afb8bb04515f966f17e697c5 Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Sun, 26 Apr 2026 16:13:28 -0600 Subject: [PATCH] Add router-aware endpoint topology APIs --- unshell-macros/src/procedures.rs | 70 ++++++- unshell-protocol/src/protocol/tests/call.rs | 192 ++++++++++++++++++ unshell-protocol/src/protocol/tree/call.rs | 181 +++++++++++++---- .../src/protocol/tree/endpoint/builders.rs | 164 ++++++++++++++- .../src/protocol/tree/endpoint/core.rs | 28 +++ .../src/protocol/tree/endpoint/mod.rs | 2 +- unshell-protocol/src/protocol/tree/leaf.rs | 76 ++++++- unshell-protocol/src/protocol/tree/mod.rs | 7 +- unshell-protocol/src/protocol/validation.rs | 3 + 9 files changed, 669 insertions(+), 54 deletions(-) diff --git a/unshell-macros/src/procedures.rs b/unshell-macros/src/procedures.rs index aa05e42..4d23fc8 100644 --- a/unshell-macros/src/procedures.rs +++ b/unshell-macros/src/procedures.rs @@ -64,6 +64,12 @@ struct CallArm { dispatch_tokens: TokenStream, } +#[derive(Clone, Copy)] +enum EndpointArgKind { + Shared, + Mutable, +} + pub(crate) fn expand_procedures( attr: ProceduresAttributes, mut item: ItemImpl, @@ -118,6 +124,7 @@ pub(crate) fn expand_procedures( fn dispatch_call( &mut self, + endpoint: &mut ::unshell::protocol::tree::ProtocolEndpoint, call: ::unshell::protocol::tree::IncomingCall, ) -> ::core::result::Result< ::unshell::protocol::tree::CallReply, @@ -150,7 +157,9 @@ fn expand_call_arm(method: &ImplItemFn) -> Result { .filter(|input| !matches!(input, FnArg::Receiver(_))) .collect::>(); - let invocation = expand_invocation(method_name, &inputs)?; + let (endpoint_arg, inputs) = split_endpoint_arg(&inputs)?; + + let invocation = expand_invocation(method_name, endpoint_arg, &inputs)?; let return_value = expand_return_conversion(&method.sig.output, quote! { __unshell_result })?; Ok(CallArm { @@ -164,9 +173,18 @@ fn expand_call_arm(method: &ImplItemFn) -> Result { }) } -fn expand_invocation(method_name: &Ident, inputs: &[&FnArg]) -> Result { +fn expand_invocation( + method_name: &Ident, + endpoint_arg: Option, + inputs: &[&FnArg], +) -> Result { + let endpoint_prefix = endpoint_arg.map(endpoint_arg_tokens); if inputs.is_empty() { - return Ok(quote! { self.#method_name() }); + return Ok(if let Some(prefix) = endpoint_prefix { + quote! { self.#method_name(#prefix) } + } else { + quote! { self.#method_name() } + }); } if inputs.len() == 1 { @@ -199,7 +217,7 @@ fn expand_invocation(method_name: &Ident, inputs: &[&FnArg]) -> Result Result Result(inputs: &[&'a FnArg]) -> Result<(Option, Vec<&'a FnArg>)> { + let Some(first) = inputs.first() else { + return Ok((None, Vec::new())); + }; + let Some(kind) = endpoint_arg_kind(first)? else { + return Ok((None, inputs.to_vec())); + }; + Ok((Some(kind), inputs[1..].to_vec())) +} + +fn endpoint_arg_kind(arg: &FnArg) -> Result> { + let FnArg::Typed(PatType { ty, .. }) = arg else { + return Ok(None); + }; + let Type::Reference(reference) = ty.as_ref() else { + return Ok(None); + }; + let Type::Path(type_path) = reference.elem.as_ref() else { + return Ok(None); + }; + let Some(segment) = type_path.path.segments.last() else { + return Ok(None); + }; + if segment.ident != "ProtocolEndpoint" { + return Ok(None); + } + Ok(Some(if reference.mutability.is_some() { + EndpointArgKind::Mutable + } else { + EndpointArgKind::Shared + })) +} + +fn endpoint_arg_tokens(kind: EndpointArgKind) -> TokenStream { + match kind { + EndpointArgKind::Shared => quote! { &*endpoint, }, + EndpointArgKind::Mutable => quote! { endpoint, }, + } +} + fn expand_return_conversion(return_type: &ReturnType, value: TokenStream) -> Result { match return_type { ReturnType::Default => Ok(quote! { diff --git a/unshell-protocol/src/protocol/tests/call.rs b/unshell-protocol/src/protocol/tests/call.rs index 0560ce2..de25565 100644 --- a/unshell-protocol/src/protocol/tests/call.rs +++ b/unshell-protocol/src/protocol/tests/call.rs @@ -105,3 +105,195 @@ fn leaf_runtime_dispatches_generated_call_procedure() { .expect("typed response should decode"); assert_eq!(response.text, "echo: hello"); } + +#[derive(Default)] +struct TopologyLeaf; + +#[leaf( + id = "org.example.v1.topology", + endpoint_struct = TopologyLeaf, + procedures = ["add_child", "remove_child", "connections"] +)] +struct Topology; + +#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +struct ChildRequest { + child_path: Vec, +} + +#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +struct ConnectionsReply { + parent: Option>, + children: Vec>, +} + +#[procedures(error = Infallible)] +impl TopologyLeaf { + #[call] + fn add_child( + &mut self, + endpoint: &mut ProtocolEndpoint, + request: ChildRequest, + ) -> ConnectionsReply { + endpoint + .upsert_child_route(ChildRoute::registered(request.child_path)) + .expect("topology mutation should satisfy direct-child invariants"); + ConnectionsReply { + parent: endpoint.parent_path().map(<[String]>::to_vec), + children: endpoint + .child_routes() + .iter() + .map(|child| child.path.clone()) + .collect(), + } + } + + #[call] + fn remove_child( + &mut self, + endpoint: &mut ProtocolEndpoint, + request: ChildRequest, + ) -> ConnectionsReply { + endpoint.remove_child_route(&request.child_path); + ConnectionsReply { + parent: endpoint.parent_path().map(<[String]>::to_vec), + children: endpoint + .child_routes() + .iter() + .map(|child| child.path.clone()) + .collect(), + } + } + + #[call] + fn connections(&mut self, endpoint: &ProtocolEndpoint) -> ConnectionsReply { + ConnectionsReply { + parent: endpoint.parent_path().map(<[String]>::to_vec), + children: endpoint + .child_routes() + .iter() + .map(|child| child.path.clone()) + .collect(), + } + } +} + +impl CallLeaf for TopologyLeaf { + type Error = Infallible; +} + +#[test] +fn generated_call_procedure_can_query_and_mutate_endpoint_topology() { + let endpoint = ProtocolEndpoint::new( + path(&["agent"]), + Some(Vec::new()), + Vec::new(), + vec![TopologyLeaf::protocol_leaf_spec()], + ); + let mut runtime = LeafRuntime::new(endpoint, TopologyLeaf); + + let mut controller = ProtocolEndpoint::new( + Vec::new(), + None, + vec![ChildRoute::registered(path(&["agent"]))], + Vec::new(), + ); + + let add_hook = controller.allocate_hook_id(); + let add_child = controller + .send_call( + path(&["agent"]), + Some(TopologyLeaf::protocol_leaf_name()), + TopologyLeaf::protocol_procedure_id("add_child").expect("suffix should resolve"), + Some(add_hook), + encode_call_reply(&ChildRequest { + child_path: path(&["agent", "child"]), + }) + .expect("request should encode"), + ) + .expect("call should encode"); + let EndpointOutcome::Forward { + frame: add_child_frame, + .. + } = add_child + else { + panic!("controller should forward add-child call"); + }; + let add_outcome = runtime + .receive(&Ingress::Parent, add_child_frame) + .expect("runtime should mutate topology"); + let [response] = add_outcome.frames.as_slice() else { + panic!("expected add-child response frame"); + }; + let parsed = decode_frame(response).expect("response should decode"); + let reply = decode_call_input::( + parsed + .deserialize_data() + .expect("reply data should decode") + .data + .as_slice(), + ) + .expect("typed reply should decode"); + assert_eq!(reply.parent, Some(Vec::new())); + assert_eq!(reply.children, vec![path(&["agent", "child"])]); + assert_eq!(runtime.endpoint().child_routes().len(), 1); + + let list_hook = controller.allocate_hook_id(); + let list = controller + .send_call( + path(&["agent"]), + Some(TopologyLeaf::protocol_leaf_name()), + TopologyLeaf::protocol_procedure_id("connections").expect("suffix should resolve"), + Some(list_hook), + encode_call_reply(&()).expect("unit request should encode"), + ) + .expect("list call should encode"); + let EndpointOutcome::Forward { + frame: list_frame, .. + } = list + else { + panic!("controller should forward connections call"); + }; + let list_outcome = runtime + .receive(&Ingress::Parent, list_frame) + .expect("runtime should return topology snapshot"); + let [list_response] = list_outcome.frames.as_slice() else { + panic!("expected connections response frame"); + }; + let list_reply = decode_call_input::( + decode_frame(list_response) + .expect("response should decode") + .deserialize_data() + .expect("data should deserialize") + .data + .as_slice(), + ) + .expect("typed reply should decode"); + assert_eq!(list_reply.children, vec![path(&["agent", "child"])]); + + let remove_hook = controller.allocate_hook_id(); + let remove = controller + .send_call( + path(&["agent"]), + Some(TopologyLeaf::protocol_leaf_name()), + TopologyLeaf::protocol_procedure_id("remove_child") + .expect("suffix should resolve"), + Some(remove_hook), + encode_call_reply(&ChildRequest { + child_path: path(&["agent", "child"]), + }) + .expect("request should encode"), + ) + .expect("remove call should encode"); + let EndpointOutcome::Forward { + frame: remove_frame, + .. + } = remove + else { + panic!("controller should forward remove-child call"); + }; + runtime + .receive(&Ingress::Parent, remove_frame) + .expect("runtime should prune topology"); + assert!(runtime.endpoint().child_routes().is_empty()); +} diff --git a/unshell-protocol/src/protocol/tree/call.rs b/unshell-protocol/src/protocol/tree/call.rs index 47673db..56587a5 100644 --- a/unshell-protocol/src/protocol/tree/call.rs +++ b/unshell-protocol/src/protocol/tree/call.rs @@ -11,7 +11,9 @@ use crate::protocol::{ use super::{ Endpoint, EndpointError, HookKey, Ingress, LocalEvent, ProtocolEndpoint, ProtocolLeaf, + RouteDecision, RouterLeaf, }; +use super::endpoint::ForwardedFrame; /// One typed incoming `Call` passed to a leaf procedure. /// @@ -366,6 +368,28 @@ pub struct RuntimeOutcome { pub dropped: bool, } +/// Frames emitted by the runtime together with their chosen next hops. +/// +/// What it is: the router-oriented variant of [`RuntimeOutcome`], preserving the +/// `RouteDecision` for every emitted frame. +/// +/// Why it exists: transport-owning leaves need to know whether each frame should +/// go to the parent or to a specific child, not just the encoded bytes. +/// +/// # Example +/// ```rust +/// use unshell::protocol::tree::RoutedRuntimeOutcome; +/// let outcome = RoutedRuntimeOutcome::default(); +/// assert!(outcome.forwarded.is_empty()); +/// ``` +#[derive(Debug, Default, Clone)] +pub struct RoutedRuntimeOutcome { + /// Forwarded frames paired with the route chosen by the endpoint runtime. + pub forwarded: Vec, + /// Whether the endpoint dropped the incoming packet. + pub dropped: bool, +} + impl LeafRuntime { /// Builds a runtime from one endpoint and one leaf instance. #[must_use] @@ -453,8 +477,32 @@ where ingress: &Ingress, frame: FrameBytes, ) -> Result::Error>> { + let routed = self.receive_routed(ingress, frame)?; + Ok(RuntimeOutcome { + frames: routed + .forwarded + .into_iter() + .map(|forwarded| forwarded.frame) + .collect(), + dropped: routed.dropped, + }) + } + + /// Delivers one inbound frame while preserving route decisions for emitted traffic. + /// + /// # Example + /// ```rust + /// # use unshell::protocol::tree::{LeafRuntime, ProtocolEndpoint}; + /// # struct ExampleLeaf; + /// # let _ = core::marker::PhantomData::>; + /// ``` + pub fn receive_routed( + &mut self, + ingress: &Ingress, + frame: FrameBytes, + ) -> Result::Error>> { let outcome = self.endpoint.receive(ingress, frame)?; - self.process_endpoint_outcome(outcome) + self.process_endpoint_outcome_routed(outcome) } /// Polls the leaf for locally-generated hook traffic and routes any emitted frames. @@ -466,21 +514,45 @@ where /// # let _ = core::marker::PhantomData::>; /// ``` pub fn poll(&mut self) -> Result::Error>> { - let outgoing = self.leaf.poll().map_err(LeafRuntimeError::Leaf)?; - self.emit_outgoing(outgoing) + let routed = self.poll_routed()?; + Ok(RuntimeOutcome { + frames: routed + .forwarded + .into_iter() + .map(|forwarded| forwarded.frame) + .collect(), + dropped: routed.dropped, + }) } - fn process_endpoint_outcome( + /// Polls the leaf while preserving route decisions for emitted traffic. + /// + /// # Example + /// ```rust + /// # use unshell::protocol::tree::{LeafRuntime, ProtocolEndpoint}; + /// # struct ExampleLeaf; + /// # let _ = core::marker::PhantomData::>; + /// ``` + pub fn poll_routed( + &mut self, + ) -> Result::Error>> { + let outgoing = self.leaf.poll().map_err(LeafRuntimeError::Leaf)?; + self.emit_outgoing_routed(outgoing) + } + + fn process_endpoint_outcome_routed( &mut self, outcome: crate::protocol::tree::EndpointOutcome, - ) -> Result::Error>> { + ) -> Result::Error>> { match outcome { - crate::protocol::tree::EndpointOutcome::Forward { frame, .. } => Ok(RuntimeOutcome { - frames: vec![frame], - dropped: false, - }), - crate::protocol::tree::EndpointOutcome::Dropped => Ok(RuntimeOutcome { - frames: Vec::new(), + crate::protocol::tree::EndpointOutcome::Forward { route, frame } => { + Ok(RoutedRuntimeOutcome { + forwarded: vec![ForwardedFrame { route, frame }], + dropped: false, + }) + } + crate::protocol::tree::EndpointOutcome::Dropped => Ok(RoutedRuntimeOutcome { + forwarded: Vec::new(), dropped: true, }), crate::protocol::tree::EndpointOutcome::Local(event) => self.process_local_event(event), @@ -490,7 +562,7 @@ where fn process_local_event( &mut self, event: LocalEvent, - ) -> Result::Error>> { + ) -> Result::Error>> { match event { LocalEvent::Call { header, message } => self.process_local_call(header, message), LocalEvent::Data { @@ -510,7 +582,7 @@ where &mut self, header: PacketHeader, message: CallMessage, - ) -> Result::Error>> { + ) -> Result::Error>> { let CallMessage { procedure_id, data, @@ -528,19 +600,16 @@ where }, }; - match self.leaf.dispatch_call(incoming) { + match self.leaf.dispatch_call(&mut self.endpoint, incoming) { Ok(CallReply::Reply(bytes)) => { let frames = if let Some(hook) = response_hook { self.send_reply_data(hook, procedure_id, bytes, true)? } else { - Vec::new() + RoutedRuntimeOutcome::default() }; - Ok(RuntimeOutcome { - frames, - dropped: false, - }) + Ok(frames) } - Ok(CallReply::NoReply) => Ok(RuntimeOutcome::default()), + Ok(CallReply::NoReply) => Ok(RoutedRuntimeOutcome::default()), Err(error) => { // Dispatch failures still emit a protocol fault for the remote caller when a // response hook exists, even though the local runtime also surfaces the error. @@ -555,7 +624,7 @@ where header: PacketHeader, message: DataMessage, hook_key: HookKey, - ) -> Result::Error>> { + ) -> Result::Error>> { let outgoing = self .leaf .on_data(IncomingData { @@ -564,7 +633,7 @@ where hook_key, }) .map_err(LeafRuntimeError::Leaf)?; - self.emit_outgoing(outgoing) + self.emit_outgoing_routed(outgoing) } fn process_local_fault( @@ -572,7 +641,7 @@ where header: PacketHeader, message: crate::protocol::FaultMessage, hook_key: HookKey, - ) -> Result::Error>> { + ) -> Result::Error>> { self.leaf .on_fault(IncomingFault { header, @@ -580,14 +649,14 @@ where hook_key, }) .map_err(LeafRuntimeError::Leaf)?; - Ok(RuntimeOutcome::default()) + Ok(RoutedRuntimeOutcome::default()) } - fn emit_outgoing( + fn emit_outgoing_routed( &mut self, outgoing: Vec, - ) -> Result::Error>> { - let mut runtime = RuntimeOutcome::default(); + ) -> Result::Error>> { + let mut runtime = RoutedRuntimeOutcome::default(); for packet in outgoing { let endpoint_outcome = self.endpoint.send_data( packet.dst_path, @@ -597,8 +666,8 @@ where packet.end_hook, )?; runtime - .frames - .extend(self.process_endpoint_outcome(endpoint_outcome)?.frames); + .forwarded + .extend(self.process_endpoint_outcome_routed(endpoint_outcome)?.forwarded); } Ok(runtime) } @@ -609,7 +678,7 @@ where procedure_id: String, bytes: Vec, end_hook: bool, - ) -> Result, LeafRuntimeError<::Error>> { + ) -> Result::Error>> { let endpoint_outcome = self.endpoint.send_data( hook.return_path, hook.hook_id, @@ -617,21 +686,65 @@ where bytes, end_hook, )?; - Ok(self.process_endpoint_outcome(endpoint_outcome)?.frames) + self.process_endpoint_outcome_routed(endpoint_outcome) } fn emit_internal_fault_if_possible( &mut self, hook: Option<&HookTarget>, - ) -> Result, LeafRuntimeError<::Error>> { + ) -> Result::Error>> { let Some(hook) = hook else { - return Ok(Vec::new()); + return Ok(RoutedRuntimeOutcome::default()); }; let key = HookKey::new(hook.return_path.clone(), hook.hook_id); let outcome = self .endpoint .emit_fault_if_possible(Some(key), ProtocolFault::INTERNAL_ERROR)?; - Ok(self.process_endpoint_outcome(outcome)?.frames) + self.process_endpoint_outcome_routed(outcome) + } +} + +impl LeafRuntime +where + L: CallLeaf + super::CallProcedures::Error> + RouterLeaf, +{ + /// Sends previously forwarded frames through the router leaf's parent/child links. + /// + /// What it is: a small transport bridge from endpoint route decisions to the + /// leaf-owned connections. + /// + /// Why it exists: router leaves should be able to reuse the normal protocol + /// runtime and still own the concrete forwarding mechanism. + /// + /// # Example + /// ```rust + /// # use unshell::protocol::tree::{LeafRuntime, ProtocolEndpoint}; + /// # struct ExampleLeaf; + /// # let _ = core::marker::PhantomData::>; + /// ``` + pub fn route_forwarded( + &mut self, + forwarded: Vec, + ) -> Result<(), ::RouteError> { + for forwarded in forwarded { + match forwarded.route { + RouteDecision::Parent => { + self.leaf + .route_to_parent(self.endpoint.path(), forwarded.frame)?; + } + RouteDecision::Child(index) => { + let child_path = self + .endpoint + .child_routes() + .get(index) + .map(|child| child.path.clone()) + .unwrap_or_default(); + self.leaf.route_to_child(&child_path, forwarded.frame)?; + } + RouteDecision::Local | RouteDecision::Drop => {} + } + } + Ok(()) } } diff --git a/unshell-protocol/src/protocol/tree/endpoint/builders.rs b/unshell-protocol/src/protocol/tree/endpoint/builders.rs index ef2ca33..1c721cc 100644 --- a/unshell-protocol/src/protocol/tree/endpoint/builders.rs +++ b/unshell-protocol/src/protocol/tree/endpoint/builders.rs @@ -128,6 +128,7 @@ impl ProtocolEndpoint { children: Vec, leaves: Vec, ) -> Self { + let has_parent = parent_path.is_some(); let registered_child_paths = children .iter() .filter(|child| child.registered) @@ -136,7 +137,8 @@ impl ProtocolEndpoint { Self { local_id: None, - routing: CompiledRoutes::new(&path, ®istered_child_paths, parent_path.is_some()), + parent_path, + routing: CompiledRoutes::new(&path, ®istered_child_paths, has_parent), path, children, leaves: leaves @@ -194,6 +196,129 @@ impl ProtocolEndpoint { self.local_id.as_deref() } + /// Returns the absolute path of this endpoint's direct parent, if one exists. + /// + /// What it is: the currently configured one-hop parent boundary for this + /// endpoint. + /// + /// Why it exists: router-style leaves need to expose and inspect the tree edge + /// they use for upstream traffic. + /// + /// # Example + /// ```rust + /// use unshell::protocol::tree::ProtocolEndpoint; + /// let endpoint = ProtocolEndpoint::new(vec!["worker".into()], Some(Vec::new()), Vec::new(), Vec::new()); + /// assert_eq!(endpoint.parent_path(), Some([].as_slice())); + /// ``` + pub fn parent_path(&self) -> Option<&[String]> { + self.parent_path.as_deref() + } + + /// Returns the direct child routes currently known to this endpoint. + /// + /// What it is: the local routing-table inputs for direct descendants. + /// + /// Why it exists: management leaves often need to inspect or mirror the child + /// topology they are controlling. + /// + /// # Example + /// ```rust + /// use unshell::protocol::tree::{ChildRoute, ProtocolEndpoint}; + /// let endpoint = ProtocolEndpoint::new( + /// vec!["root".into()], + /// None, + /// vec![ChildRoute::registered(vec!["root".into(), "child".into()])], + /// Vec::new(), + /// ); + /// assert_eq!(endpoint.child_routes().len(), 1); + /// ``` + pub fn child_routes(&self) -> &[ChildRoute] { + &self.children + } + + /// Replaces the configured direct parent path and recompiles local routing. + /// + /// What it is: the supported way to attach or detach this endpoint from its + /// upstream boundary. + /// + /// Why it exists: a router leaf should be able to promote or remove its parent + /// connection without rebuilding the entire endpoint. + /// + /// # Example + /// ```rust + /// use unshell::protocol::tree::ProtocolEndpoint; + /// let mut endpoint = ProtocolEndpoint::new(vec!["root".into(), "worker".into()], Some(vec!["root".into()]), Vec::new(), Vec::new()); + /// endpoint.set_parent_path(None)?; + /// assert!(endpoint.parent_path().is_none()); + /// # Ok::<(), unshell::protocol::tree::EndpointError>(()) + /// ``` + pub fn set_parent_path( + &mut self, + parent_path: Option>, + ) -> Result<(), EndpointError> { + if let Some(path) = parent_path.as_deref() { + self.validate_direct_parent_path(path)?; + } + self.parent_path = parent_path; + self.rebuild_routing(); + Ok(()) + } + + /// Inserts or updates one direct child route and recompiles local routing. + /// + /// What it is: the supported mutation API for the endpoint's child list. + /// + /// Why it exists: router-management leaves need one invariant-preserving way to + /// reflect child connection changes into path routing. + /// + /// # Example + /// ```rust + /// use unshell::protocol::tree::{ChildRoute, ProtocolEndpoint}; + /// let mut endpoint = ProtocolEndpoint::new(vec!["root".into()], None, Vec::new(), Vec::new()); + /// endpoint.upsert_child_route(ChildRoute::registered(vec!["root".into(), "child".into()]))?; + /// assert_eq!(endpoint.child_routes().len(), 1); + /// # Ok::<(), unshell::protocol::tree::EndpointError>(()) + /// ``` + pub fn upsert_child_route(&mut self, route: ChildRoute) -> Result<(), EndpointError> { + self.validate_direct_child_path(&route.path)?; + if let Some(existing) = self.children.iter_mut().find(|child| child.path == route.path) { + *existing = route; + } else { + self.children.push(route); + } + self.rebuild_routing(); + Ok(()) + } + + /// Removes one direct child route by absolute path and recompiles local routing. + /// + /// What it is: the supported mutation API for pruning a direct descendant. + /// + /// Why it exists: connection-management leaves need to tear down routes without + /// mutating the endpoint internals directly. + /// + /// # Example + /// ```rust + /// use unshell::protocol::tree::{ChildRoute, ProtocolEndpoint}; + /// let mut endpoint = ProtocolEndpoint::new( + /// vec!["root".into()], + /// None, + /// vec![ChildRoute::registered(vec!["root".into(), "child".into()])], + /// Vec::new(), + /// ); + /// assert!(endpoint.remove_child_route(&[String::from("root"), String::from("child")])); + /// assert!(endpoint.child_routes().is_empty()); + /// ``` + pub fn remove_child_route(&mut self, path: &[String]) -> bool { + let original_len = self.children.len(); + self.children.retain(|child| child.path != path); + let removed = self.children.len() != original_len; + if removed { + self.rebuild_routing(); + } + removed + } + /// Registers a procedure that is handled directly by the endpoint. /// /// Endpoint-level procedures exist for protocol services that are not attached to one leaf, @@ -230,6 +355,43 @@ impl ProtocolEndpoint { self.hooks.allocate_hook_id(&self.path) } + fn rebuild_routing(&mut self) { + let registered_child_paths = self + .children + .iter() + .filter(|child| child.registered) + .map(|child| child.path.clone()) + .collect::>(); + self.routing = CompiledRoutes::new( + &self.path, + ®istered_child_paths, + self.parent_path.is_some(), + ); + } + + fn validate_direct_parent_path(&self, parent_path: &[String]) -> Result<(), EndpointError> { + let Some((_, expected_parent)) = self.path.split_last() else { + return Err(EndpointError::Validation(ValidationError::TopologyInvariant( + "root endpoints cannot declare a parent path", + ))); + }; + if parent_path != expected_parent { + return Err(EndpointError::Validation(ValidationError::TopologyInvariant( + "parent path must equal the direct path prefix of this endpoint", + ))); + } + Ok(()) + } + + fn validate_direct_child_path(&self, child_path: &[String]) -> Result<(), EndpointError> { + if child_path.len() != self.path.len() + 1 || !child_path.starts_with(&self.path) { + return Err(EndpointError::Validation(ValidationError::TopologyInvariant( + "child path must be one direct descendant of this endpoint", + ))); + } + Ok(()) + } + /// Encodes a call frame without routing it through the local endpoint. /// /// This exists for callers that want a fully encoded outbound frame while handling transport diff --git a/unshell-protocol/src/protocol/tree/endpoint/core.rs b/unshell-protocol/src/protocol/tree/endpoint/core.rs index fa28e05..d858f04 100644 --- a/unshell-protocol/src/protocol/tree/endpoint/core.rs +++ b/unshell-protocol/src/protocol/tree/endpoint/core.rs @@ -177,6 +177,33 @@ pub enum EndpointOutcome { Dropped, } +/// One framed packet together with the next hop selected by endpoint routing. +/// +/// What it is: a transport-ready frame paired with the resolved direction the +/// endpoint chose for it. +/// +/// Why it exists: high-level runtimes often flatten forwarded traffic down to raw +/// bytes, but router-host leaves need the route decision so they can choose the +/// correct parent or child connection. +/// +/// # Example +/// ```rust +/// use unshell::protocol::FrameBytes; +/// use unshell::protocol::tree::{ForwardedFrame, RouteDecision}; +/// let forwarded = ForwardedFrame { +/// route: RouteDecision::Parent, +/// frame: FrameBytes::new(), +/// }; +/// assert!(matches!(forwarded.route, RouteDecision::Parent)); +/// ``` +#[derive(Debug, Clone)] +pub struct ForwardedFrame { + /// The next hop selected by the endpoint runtime. + pub route: RouteDecision, + /// The encoded protocol frame to send over that hop. + pub frame: FrameBytes, +} + /// Error surfaced while validating or encoding protocol frames. /// /// This exists so endpoint callers can preserve the distinction between malformed wire/archive @@ -288,6 +315,7 @@ pub trait Endpoint { pub struct ProtocolEndpoint { pub(crate) local_id: Option, pub(crate) path: Vec, + pub(crate) parent_path: Option>, pub(crate) children: Vec, pub(crate) routing: CompiledRoutes, pub(crate) leaves: BTreeMap, diff --git a/unshell-protocol/src/protocol/tree/endpoint/mod.rs b/unshell-protocol/src/protocol/tree/endpoint/mod.rs index 0e3a03b..0f402ba 100644 --- a/unshell-protocol/src/protocol/tree/endpoint/mod.rs +++ b/unshell-protocol/src/protocol/tree/endpoint/mod.rs @@ -11,6 +11,6 @@ mod introspection; mod receive; pub use core::{ - ChildRoute, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec, LocalEvent, + ChildRoute, Endpoint, EndpointError, EndpointOutcome, ForwardedFrame, Ingress, LeafSpec, LocalEvent, ProtocolEndpoint, }; diff --git a/unshell-protocol/src/protocol/tree/leaf.rs b/unshell-protocol/src/protocol/tree/leaf.rs index 8cf9e08..ca87621 100644 --- a/unshell-protocol/src/protocol/tree/leaf.rs +++ b/unshell-protocol/src/protocol/tree/leaf.rs @@ -7,7 +7,9 @@ use alloc::{string::String, vec::Vec}; -use super::LeafSpec; +use crate::protocol::FrameBytes; + +use super::{ChildRoute, LeafSpec, ProtocolEndpoint}; /// Static metadata for one application-defined protocol leaf. /// @@ -167,7 +169,7 @@ pub trait LeafBinding: ProtocolLeaf { /// impl CallProcedures for ExampleLeaf { /// type Error = core::convert::Infallible; /// fn procedure_suffixes() -> &'static [&'static str] { &["invoke"] } -/// fn dispatch_call(&mut self, _call: IncomingCall) -> Result> { +/// fn dispatch_call(&mut self, _endpoint: &mut unshell::protocol::tree::ProtocolEndpoint, _call: IncomingCall) -> Result> { /// Ok(unshell::protocol::tree::CallReply::NoReply) /// } /// } @@ -188,21 +190,77 @@ pub trait CallProcedures: LeafDeclaration { /// use unshell::protocol::tree::{CallProcedures, DispatchError, IncomingCall, ProtocolLeaf}; /// struct ExampleLeaf; /// impl ProtocolLeaf for ExampleLeaf { fn leaf_name() -> String { "org.example.v1.echo".into() } } - /// impl CallProcedures for ExampleLeaf { - /// type Error = core::convert::Infallible; - /// fn procedure_suffixes() -> &'static [&'static str] { &["invoke"] } - /// fn dispatch_call(&mut self, _call: IncomingCall) -> Result> { - /// Ok(unshell::protocol::tree::CallReply::NoReply) - /// } - /// } +/// impl CallProcedures for ExampleLeaf { +/// type Error = core::convert::Infallible; +/// fn procedure_suffixes() -> &'static [&'static str] { &["invoke"] } +/// fn dispatch_call(&mut self, _endpoint: &mut unshell::protocol::tree::ProtocolEndpoint, _call: IncomingCall) -> Result> { +/// Ok(unshell::protocol::tree::CallReply::NoReply) +/// } +/// } /// # let _ = ExampleLeaf; /// ``` fn dispatch_call( &mut self, + endpoint: &mut ProtocolEndpoint, call: crate::protocol::tree::IncomingCall, ) -> Result>; } +/// Router-facing transport hooks for leaves that own parent/child connections. +/// +/// What it is: an opt-in trait for leaves that want to act as the transport layer +/// for one endpoint's forwarded traffic. +/// +/// Why it exists: ordinary leaves only need validated local events, but a router +/// leaf also needs to know its active parent/children and where to physically send +/// frames chosen by the endpoint's routing logic. +/// +/// # Example +/// ```rust +/// use unshell::protocol::FrameBytes; +/// use unshell::protocol::tree::{ChildRoute, RouterLeaf}; +/// #[derive(Default)] +/// struct DemoRouter { +/// parent: Option>, +/// children: Vec, +/// } +/// impl unshell::protocol::tree::ProtocolLeaf for DemoRouter { +/// fn leaf_name() -> String { "org.example.v1.router".into() } +/// } +/// impl RouterLeaf for DemoRouter { +/// type RouteError = core::convert::Infallible; +/// +/// fn parent_path(&self) -> Option<&[String]> { self.parent.as_deref() } +/// fn child_routes(&self) -> &[ChildRoute] { &self.children } +/// fn route_to_parent(&mut self, _local_path: &[String], _frame: FrameBytes) -> Result<(), Self::RouteError> { Ok(()) } +/// fn route_to_child(&mut self, _child_path: &[String], _frame: FrameBytes) -> Result<(), Self::RouteError> { Ok(()) } +/// } +/// ``` +pub trait RouterLeaf: ProtocolLeaf { + /// Transport-specific error surfaced while handing a frame to the chosen link. + type RouteError; + + /// Returns the currently connected direct parent path, if any. + fn parent_path(&self) -> Option<&[String]>; + + /// Returns the currently connected direct child routes. + fn child_routes(&self) -> &[ChildRoute]; + + /// Sends one routed frame toward the direct parent connection. + fn route_to_parent( + &mut self, + local_path: &[String], + frame: FrameBytes, + ) -> Result<(), Self::RouteError>; + + /// Sends one routed frame toward the chosen direct child connection. + fn route_to_child( + &mut self, + child_path: &[String], + frame: FrameBytes, + ) -> Result<(), Self::RouteError>; +} + /// Builds one canonical dotted leaf id from crate-local metadata plus optional /// user overrides. /// diff --git a/unshell-protocol/src/protocol/tree/mod.rs b/unshell-protocol/src/protocol/tree/mod.rs index f147f30..0bd9485 100644 --- a/unshell-protocol/src/protocol/tree/mod.rs +++ b/unshell-protocol/src/protocol/tree/mod.rs @@ -16,8 +16,8 @@ mod routing; pub use call::{ Call, CallLeaf, CallReply, CallResult, DispatchError, IncomingCall, IncomingData, - IncomingFault, LeafRuntime, LeafRuntimeError, OutgoingData, RuntimeOutcome, decode_call_input, - encode_call_reply, + IncomingFault, LeafRuntime, LeafRuntimeError, OutgoingData, RoutedRuntimeOutcome, + RuntimeOutcome, decode_call_input, encode_call_reply, }; pub use endpoint::{ ChildRoute, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec, LocalEvent, @@ -25,7 +25,8 @@ pub use endpoint::{ }; pub use hook::{ActiveHook, HookConflict, HookKey, HookTable, PendingHook}; pub use leaf::{ - CallProcedures, LeafBinding, LeafDeclaration, ProtocolLeaf, derive_leaf_name, leaf_spec_of, + CallProcedures, LeafBinding, LeafDeclaration, ProtocolLeaf, RouterLeaf, derive_leaf_name, + leaf_spec_of, }; pub use procedure::{ Procedure, ProcedureEffect, ProcedureMetadata, ProcedureRuntime, ProcedureRuntimeError, diff --git a/unshell-protocol/src/protocol/validation.rs b/unshell-protocol/src/protocol/validation.rs index 22cee53..f36e254 100644 --- a/unshell-protocol/src/protocol/validation.rs +++ b/unshell-protocol/src/protocol/validation.rs @@ -32,6 +32,8 @@ pub enum ValidationError { CallInvariant(&'static str), /// A hook lifecycle transition would break protocol state invariants. HookInvariant(&'static str), + /// One endpoint-topology update would break local tree invariants. + TopologyInvariant(&'static str), /// A hook id collided with existing endpoint-local state. InvalidHookId, } @@ -43,6 +45,7 @@ impl fmt::Display for ValidationError { Self::ProcedureId(message) => write!(f, "invalid procedure id: {message}"), Self::CallInvariant(message) => write!(f, "invalid call: {message}"), Self::HookInvariant(message) => write!(f, "invalid hook state: {message}"), + Self::TopologyInvariant(message) => write!(f, "invalid topology: {message}"), Self::InvalidHookId => f.write_str("invalid hook identifier"), } }