Add router-aware endpoint topology APIs

This commit is contained in:
Michael Mikovsky
2026-04-26 16:13:28 -06:00
parent 99d1097f2a
commit 371f3ae492
9 changed files with 669 additions and 54 deletions
+64 -6
View File
@@ -64,6 +64,12 @@ struct CallArm {
dispatch_tokens: TokenStream, dispatch_tokens: TokenStream,
} }
#[derive(Clone, Copy)]
enum EndpointArgKind {
Shared,
Mutable,
}
pub(crate) fn expand_procedures( pub(crate) fn expand_procedures(
attr: ProceduresAttributes, attr: ProceduresAttributes,
mut item: ItemImpl, mut item: ItemImpl,
@@ -118,6 +124,7 @@ pub(crate) fn expand_procedures(
fn dispatch_call( fn dispatch_call(
&mut self, &mut self,
endpoint: &mut ::unshell::protocol::tree::ProtocolEndpoint,
call: ::unshell::protocol::tree::IncomingCall, call: ::unshell::protocol::tree::IncomingCall,
) -> ::core::result::Result< ) -> ::core::result::Result<
::unshell::protocol::tree::CallReply, ::unshell::protocol::tree::CallReply,
@@ -150,7 +157,9 @@ fn expand_call_arm(method: &ImplItemFn) -> Result<CallArm> {
.filter(|input| !matches!(input, FnArg::Receiver(_))) .filter(|input| !matches!(input, FnArg::Receiver(_)))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let invocation = expand_invocation(method_name, &inputs)?; let (endpoint_arg, inputs) = split_endpoint_arg(&inputs)?;
let invocation = expand_invocation(method_name, endpoint_arg, &inputs)?;
let return_value = expand_return_conversion(&method.sig.output, quote! { __unshell_result })?; let return_value = expand_return_conversion(&method.sig.output, quote! { __unshell_result })?;
Ok(CallArm { Ok(CallArm {
@@ -164,9 +173,18 @@ fn expand_call_arm(method: &ImplItemFn) -> Result<CallArm> {
}) })
} }
fn expand_invocation(method_name: &Ident, inputs: &[&FnArg]) -> Result<TokenStream> { fn expand_invocation(
method_name: &Ident,
endpoint_arg: Option<EndpointArgKind>,
inputs: &[&FnArg],
) -> Result<TokenStream> {
let endpoint_prefix = endpoint_arg.map(endpoint_arg_tokens);
if inputs.is_empty() { if inputs.is_empty() {
return Ok(quote! { self.#method_name() }); return Ok(if let Some(prefix) = endpoint_prefix {
quote! { self.#method_name(#prefix) }
} else {
quote! { self.#method_name() }
});
} }
if inputs.len() == 1 { if inputs.len() == 1 {
@@ -199,7 +217,7 @@ fn expand_invocation(method_name: &Ident, inputs: &[&FnArg]) -> Result<TokenStre
hook.hook_id, hook.hook_id,
)), )),
}; };
self.#method_name(__unshell_call) self.#method_name(#endpoint_prefix __unshell_call)
}}); }});
} }
@@ -208,7 +226,7 @@ fn expand_invocation(method_name: &Ident, inputs: &[&FnArg]) -> Result<TokenStre
call.message.data.as_slice(), call.message.data.as_slice(),
) )
.map_err(::unshell::protocol::tree::DispatchError::Decode)?; .map_err(::unshell::protocol::tree::DispatchError::Decode)?;
self.#method_name(__unshell_input) self.#method_name(#endpoint_prefix __unshell_input)
}}); }});
} }
@@ -231,10 +249,50 @@ fn expand_invocation(method_name: &Ident, inputs: &[&FnArg]) -> Result<TokenStre
call.message.data.as_slice(), call.message.data.as_slice(),
) )
.map_err(::unshell::protocol::tree::DispatchError::Decode)?; .map_err(::unshell::protocol::tree::DispatchError::Decode)?;
self.#method_name(#(#vars),*) self.#method_name(#endpoint_prefix #(#vars),*)
}}) }})
} }
fn split_endpoint_arg<'a>(inputs: &[&'a FnArg]) -> Result<(Option<EndpointArgKind>, Vec<&'a FnArg>)> {
let Some(first) = inputs.first() else {
return Ok((None, Vec::new()));
};
let Some(kind) = endpoint_arg_kind(first)? else {
return Ok((None, inputs.to_vec()));
};
Ok((Some(kind), inputs[1..].to_vec()))
}
fn endpoint_arg_kind(arg: &FnArg) -> Result<Option<EndpointArgKind>> {
let FnArg::Typed(PatType { ty, .. }) = arg else {
return Ok(None);
};
let Type::Reference(reference) = ty.as_ref() else {
return Ok(None);
};
let Type::Path(type_path) = reference.elem.as_ref() else {
return Ok(None);
};
let Some(segment) = type_path.path.segments.last() else {
return Ok(None);
};
if segment.ident != "ProtocolEndpoint" {
return Ok(None);
}
Ok(Some(if reference.mutability.is_some() {
EndpointArgKind::Mutable
} else {
EndpointArgKind::Shared
}))
}
fn endpoint_arg_tokens(kind: EndpointArgKind) -> TokenStream {
match kind {
EndpointArgKind::Shared => quote! { &*endpoint, },
EndpointArgKind::Mutable => quote! { endpoint, },
}
}
fn expand_return_conversion(return_type: &ReturnType, value: TokenStream) -> Result<TokenStream> { fn expand_return_conversion(return_type: &ReturnType, value: TokenStream) -> Result<TokenStream> {
match return_type { match return_type {
ReturnType::Default => Ok(quote! { ReturnType::Default => Ok(quote! {
+192
View File
@@ -105,3 +105,195 @@ fn leaf_runtime_dispatches_generated_call_procedure() {
.expect("typed response should decode"); .expect("typed response should decode");
assert_eq!(response.text, "echo: hello"); assert_eq!(response.text, "echo: hello");
} }
#[derive(Default)]
struct TopologyLeaf;
#[leaf(
id = "org.example.v1.topology",
endpoint_struct = TopologyLeaf,
procedures = ["add_child", "remove_child", "connections"]
)]
struct Topology;
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
struct ChildRequest {
child_path: Vec<String>,
}
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
struct ConnectionsReply {
parent: Option<Vec<String>>,
children: Vec<Vec<String>>,
}
#[procedures(error = Infallible)]
impl TopologyLeaf {
#[call]
fn add_child(
&mut self,
endpoint: &mut ProtocolEndpoint,
request: ChildRequest,
) -> ConnectionsReply {
endpoint
.upsert_child_route(ChildRoute::registered(request.child_path))
.expect("topology mutation should satisfy direct-child invariants");
ConnectionsReply {
parent: endpoint.parent_path().map(<[String]>::to_vec),
children: endpoint
.child_routes()
.iter()
.map(|child| child.path.clone())
.collect(),
}
}
#[call]
fn remove_child(
&mut self,
endpoint: &mut ProtocolEndpoint,
request: ChildRequest,
) -> ConnectionsReply {
endpoint.remove_child_route(&request.child_path);
ConnectionsReply {
parent: endpoint.parent_path().map(<[String]>::to_vec),
children: endpoint
.child_routes()
.iter()
.map(|child| child.path.clone())
.collect(),
}
}
#[call]
fn connections(&mut self, endpoint: &ProtocolEndpoint) -> ConnectionsReply {
ConnectionsReply {
parent: endpoint.parent_path().map(<[String]>::to_vec),
children: endpoint
.child_routes()
.iter()
.map(|child| child.path.clone())
.collect(),
}
}
}
impl CallLeaf for TopologyLeaf {
type Error = Infallible;
}
#[test]
fn generated_call_procedure_can_query_and_mutate_endpoint_topology() {
let endpoint = ProtocolEndpoint::new(
path(&["agent"]),
Some(Vec::new()),
Vec::new(),
vec![TopologyLeaf::protocol_leaf_spec()],
);
let mut runtime = LeafRuntime::new(endpoint, TopologyLeaf);
let mut controller = ProtocolEndpoint::new(
Vec::new(),
None,
vec![ChildRoute::registered(path(&["agent"]))],
Vec::new(),
);
let add_hook = controller.allocate_hook_id();
let add_child = controller
.send_call(
path(&["agent"]),
Some(TopologyLeaf::protocol_leaf_name()),
TopologyLeaf::protocol_procedure_id("add_child").expect("suffix should resolve"),
Some(add_hook),
encode_call_reply(&ChildRequest {
child_path: path(&["agent", "child"]),
})
.expect("request should encode"),
)
.expect("call should encode");
let EndpointOutcome::Forward {
frame: add_child_frame,
..
} = add_child
else {
panic!("controller should forward add-child call");
};
let add_outcome = runtime
.receive(&Ingress::Parent, add_child_frame)
.expect("runtime should mutate topology");
let [response] = add_outcome.frames.as_slice() else {
panic!("expected add-child response frame");
};
let parsed = decode_frame(response).expect("response should decode");
let reply = decode_call_input::<ConnectionsReply>(
parsed
.deserialize_data()
.expect("reply data should decode")
.data
.as_slice(),
)
.expect("typed reply should decode");
assert_eq!(reply.parent, Some(Vec::new()));
assert_eq!(reply.children, vec![path(&["agent", "child"])]);
assert_eq!(runtime.endpoint().child_routes().len(), 1);
let list_hook = controller.allocate_hook_id();
let list = controller
.send_call(
path(&["agent"]),
Some(TopologyLeaf::protocol_leaf_name()),
TopologyLeaf::protocol_procedure_id("connections").expect("suffix should resolve"),
Some(list_hook),
encode_call_reply(&()).expect("unit request should encode"),
)
.expect("list call should encode");
let EndpointOutcome::Forward {
frame: list_frame, ..
} = list
else {
panic!("controller should forward connections call");
};
let list_outcome = runtime
.receive(&Ingress::Parent, list_frame)
.expect("runtime should return topology snapshot");
let [list_response] = list_outcome.frames.as_slice() else {
panic!("expected connections response frame");
};
let list_reply = decode_call_input::<ConnectionsReply>(
decode_frame(list_response)
.expect("response should decode")
.deserialize_data()
.expect("data should deserialize")
.data
.as_slice(),
)
.expect("typed reply should decode");
assert_eq!(list_reply.children, vec![path(&["agent", "child"])]);
let remove_hook = controller.allocate_hook_id();
let remove = controller
.send_call(
path(&["agent"]),
Some(TopologyLeaf::protocol_leaf_name()),
TopologyLeaf::protocol_procedure_id("remove_child")
.expect("suffix should resolve"),
Some(remove_hook),
encode_call_reply(&ChildRequest {
child_path: path(&["agent", "child"]),
})
.expect("request should encode"),
)
.expect("remove call should encode");
let EndpointOutcome::Forward {
frame: remove_frame,
..
} = remove
else {
panic!("controller should forward remove-child call");
};
runtime
.receive(&Ingress::Parent, remove_frame)
.expect("runtime should prune topology");
assert!(runtime.endpoint().child_routes().is_empty());
}
+146 -33
View File
@@ -11,7 +11,9 @@ use crate::protocol::{
use super::{ use super::{
Endpoint, EndpointError, HookKey, Ingress, LocalEvent, ProtocolEndpoint, ProtocolLeaf, Endpoint, EndpointError, HookKey, Ingress, LocalEvent, ProtocolEndpoint, ProtocolLeaf,
RouteDecision, RouterLeaf,
}; };
use super::endpoint::ForwardedFrame;
/// One typed incoming `Call` passed to a leaf procedure. /// One typed incoming `Call` passed to a leaf procedure.
/// ///
@@ -366,6 +368,28 @@ pub struct RuntimeOutcome {
pub dropped: bool, pub dropped: bool,
} }
/// Frames emitted by the runtime together with their chosen next hops.
///
/// What it is: the router-oriented variant of [`RuntimeOutcome`], preserving the
/// `RouteDecision` for every emitted frame.
///
/// Why it exists: transport-owning leaves need to know whether each frame should
/// go to the parent or to a specific child, not just the encoded bytes.
///
/// # Example
/// ```rust
/// use unshell::protocol::tree::RoutedRuntimeOutcome;
/// let outcome = RoutedRuntimeOutcome::default();
/// assert!(outcome.forwarded.is_empty());
/// ```
#[derive(Debug, Default, Clone)]
pub struct RoutedRuntimeOutcome {
/// Forwarded frames paired with the route chosen by the endpoint runtime.
pub forwarded: Vec<ForwardedFrame>,
/// Whether the endpoint dropped the incoming packet.
pub dropped: bool,
}
impl<L> LeafRuntime<L> { impl<L> LeafRuntime<L> {
/// Builds a runtime from one endpoint and one leaf instance. /// Builds a runtime from one endpoint and one leaf instance.
#[must_use] #[must_use]
@@ -453,8 +477,32 @@ where
ingress: &Ingress, ingress: &Ingress,
frame: FrameBytes, frame: FrameBytes,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> { ) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let routed = self.receive_routed(ingress, frame)?;
Ok(RuntimeOutcome {
frames: routed
.forwarded
.into_iter()
.map(|forwarded| forwarded.frame)
.collect(),
dropped: routed.dropped,
})
}
/// Delivers one inbound frame while preserving route decisions for emitted traffic.
///
/// # Example
/// ```rust
/// # use unshell::protocol::tree::{LeafRuntime, ProtocolEndpoint};
/// # struct ExampleLeaf;
/// # let _ = core::marker::PhantomData::<LeafRuntime<ExampleLeaf>>;
/// ```
pub fn receive_routed(
&mut self,
ingress: &Ingress,
frame: FrameBytes,
) -> Result<RoutedRuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let outcome = self.endpoint.receive(ingress, frame)?; let outcome = self.endpoint.receive(ingress, frame)?;
self.process_endpoint_outcome(outcome) self.process_endpoint_outcome_routed(outcome)
} }
/// Polls the leaf for locally-generated hook traffic and routes any emitted frames. /// Polls the leaf for locally-generated hook traffic and routes any emitted frames.
@@ -466,21 +514,45 @@ where
/// # let _ = core::marker::PhantomData::<LeafRuntime<ExampleLeaf>>; /// # let _ = core::marker::PhantomData::<LeafRuntime<ExampleLeaf>>;
/// ``` /// ```
pub fn poll(&mut self) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> { pub fn poll(&mut self) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let outgoing = self.leaf.poll().map_err(LeafRuntimeError::Leaf)?; let routed = self.poll_routed()?;
self.emit_outgoing(outgoing) Ok(RuntimeOutcome {
frames: routed
.forwarded
.into_iter()
.map(|forwarded| forwarded.frame)
.collect(),
dropped: routed.dropped,
})
} }
fn process_endpoint_outcome( /// Polls the leaf while preserving route decisions for emitted traffic.
///
/// # Example
/// ```rust
/// # use unshell::protocol::tree::{LeafRuntime, ProtocolEndpoint};
/// # struct ExampleLeaf;
/// # let _ = core::marker::PhantomData::<LeafRuntime<ExampleLeaf>>;
/// ```
pub fn poll_routed(
&mut self,
) -> Result<RoutedRuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let outgoing = self.leaf.poll().map_err(LeafRuntimeError::Leaf)?;
self.emit_outgoing_routed(outgoing)
}
fn process_endpoint_outcome_routed(
&mut self, &mut self,
outcome: crate::protocol::tree::EndpointOutcome, outcome: crate::protocol::tree::EndpointOutcome,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> { ) -> Result<RoutedRuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
match outcome { match outcome {
crate::protocol::tree::EndpointOutcome::Forward { frame, .. } => Ok(RuntimeOutcome { crate::protocol::tree::EndpointOutcome::Forward { route, frame } => {
frames: vec![frame], Ok(RoutedRuntimeOutcome {
forwarded: vec![ForwardedFrame { route, frame }],
dropped: false, dropped: false,
}), })
crate::protocol::tree::EndpointOutcome::Dropped => Ok(RuntimeOutcome { }
frames: Vec::new(), crate::protocol::tree::EndpointOutcome::Dropped => Ok(RoutedRuntimeOutcome {
forwarded: Vec::new(),
dropped: true, dropped: true,
}), }),
crate::protocol::tree::EndpointOutcome::Local(event) => self.process_local_event(event), crate::protocol::tree::EndpointOutcome::Local(event) => self.process_local_event(event),
@@ -490,7 +562,7 @@ where
fn process_local_event( fn process_local_event(
&mut self, &mut self,
event: LocalEvent, event: LocalEvent,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> { ) -> Result<RoutedRuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
match event { match event {
LocalEvent::Call { header, message } => self.process_local_call(header, message), LocalEvent::Call { header, message } => self.process_local_call(header, message),
LocalEvent::Data { LocalEvent::Data {
@@ -510,7 +582,7 @@ where
&mut self, &mut self,
header: PacketHeader, header: PacketHeader,
message: CallMessage, message: CallMessage,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> { ) -> Result<RoutedRuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let CallMessage { let CallMessage {
procedure_id, procedure_id,
data, data,
@@ -528,19 +600,16 @@ where
}, },
}; };
match self.leaf.dispatch_call(incoming) { match self.leaf.dispatch_call(&mut self.endpoint, incoming) {
Ok(CallReply::Reply(bytes)) => { Ok(CallReply::Reply(bytes)) => {
let frames = if let Some(hook) = response_hook { let frames = if let Some(hook) = response_hook {
self.send_reply_data(hook, procedure_id, bytes, true)? self.send_reply_data(hook, procedure_id, bytes, true)?
} else { } else {
Vec::new() RoutedRuntimeOutcome::default()
}; };
Ok(RuntimeOutcome { Ok(frames)
frames,
dropped: false,
})
} }
Ok(CallReply::NoReply) => Ok(RuntimeOutcome::default()), Ok(CallReply::NoReply) => Ok(RoutedRuntimeOutcome::default()),
Err(error) => { Err(error) => {
// Dispatch failures still emit a protocol fault for the remote caller when a // Dispatch failures still emit a protocol fault for the remote caller when a
// response hook exists, even though the local runtime also surfaces the error. // response hook exists, even though the local runtime also surfaces the error.
@@ -555,7 +624,7 @@ where
header: PacketHeader, header: PacketHeader,
message: DataMessage, message: DataMessage,
hook_key: HookKey, hook_key: HookKey,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> { ) -> Result<RoutedRuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let outgoing = self let outgoing = self
.leaf .leaf
.on_data(IncomingData { .on_data(IncomingData {
@@ -564,7 +633,7 @@ where
hook_key, hook_key,
}) })
.map_err(LeafRuntimeError::Leaf)?; .map_err(LeafRuntimeError::Leaf)?;
self.emit_outgoing(outgoing) self.emit_outgoing_routed(outgoing)
} }
fn process_local_fault( fn process_local_fault(
@@ -572,7 +641,7 @@ where
header: PacketHeader, header: PacketHeader,
message: crate::protocol::FaultMessage, message: crate::protocol::FaultMessage,
hook_key: HookKey, hook_key: HookKey,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> { ) -> Result<RoutedRuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
self.leaf self.leaf
.on_fault(IncomingFault { .on_fault(IncomingFault {
header, header,
@@ -580,14 +649,14 @@ where
hook_key, hook_key,
}) })
.map_err(LeafRuntimeError::Leaf)?; .map_err(LeafRuntimeError::Leaf)?;
Ok(RuntimeOutcome::default()) Ok(RoutedRuntimeOutcome::default())
} }
fn emit_outgoing( fn emit_outgoing_routed(
&mut self, &mut self,
outgoing: Vec<OutgoingData>, outgoing: Vec<OutgoingData>,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> { ) -> Result<RoutedRuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let mut runtime = RuntimeOutcome::default(); let mut runtime = RoutedRuntimeOutcome::default();
for packet in outgoing { for packet in outgoing {
let endpoint_outcome = self.endpoint.send_data( let endpoint_outcome = self.endpoint.send_data(
packet.dst_path, packet.dst_path,
@@ -597,8 +666,8 @@ where
packet.end_hook, packet.end_hook,
)?; )?;
runtime runtime
.frames .forwarded
.extend(self.process_endpoint_outcome(endpoint_outcome)?.frames); .extend(self.process_endpoint_outcome_routed(endpoint_outcome)?.forwarded);
} }
Ok(runtime) Ok(runtime)
} }
@@ -609,7 +678,7 @@ where
procedure_id: String, procedure_id: String,
bytes: Vec<u8>, bytes: Vec<u8>,
end_hook: bool, end_hook: bool,
) -> Result<Vec<FrameBytes>, LeafRuntimeError<<L as CallLeaf>::Error>> { ) -> Result<RoutedRuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let endpoint_outcome = self.endpoint.send_data( let endpoint_outcome = self.endpoint.send_data(
hook.return_path, hook.return_path,
hook.hook_id, hook.hook_id,
@@ -617,21 +686,65 @@ where
bytes, bytes,
end_hook, end_hook,
)?; )?;
Ok(self.process_endpoint_outcome(endpoint_outcome)?.frames) self.process_endpoint_outcome_routed(endpoint_outcome)
} }
fn emit_internal_fault_if_possible( fn emit_internal_fault_if_possible(
&mut self, &mut self,
hook: Option<&HookTarget>, hook: Option<&HookTarget>,
) -> Result<Vec<FrameBytes>, LeafRuntimeError<<L as CallLeaf>::Error>> { ) -> Result<RoutedRuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let Some(hook) = hook else { let Some(hook) = hook else {
return Ok(Vec::new()); return Ok(RoutedRuntimeOutcome::default());
}; };
let key = HookKey::new(hook.return_path.clone(), hook.hook_id); let key = HookKey::new(hook.return_path.clone(), hook.hook_id);
let outcome = self let outcome = self
.endpoint .endpoint
.emit_fault_if_possible(Some(key), ProtocolFault::INTERNAL_ERROR)?; .emit_fault_if_possible(Some(key), ProtocolFault::INTERNAL_ERROR)?;
Ok(self.process_endpoint_outcome(outcome)?.frames) self.process_endpoint_outcome_routed(outcome)
}
}
impl<L> LeafRuntime<L>
where
L: CallLeaf + super::CallProcedures<Error = <L as CallLeaf>::Error> + RouterLeaf,
{
/// Sends previously forwarded frames through the router leaf's parent/child links.
///
/// What it is: a small transport bridge from endpoint route decisions to the
/// leaf-owned connections.
///
/// Why it exists: router leaves should be able to reuse the normal protocol
/// runtime and still own the concrete forwarding mechanism.
///
/// # Example
/// ```rust
/// # use unshell::protocol::tree::{LeafRuntime, ProtocolEndpoint};
/// # struct ExampleLeaf;
/// # let _ = core::marker::PhantomData::<LeafRuntime<ExampleLeaf>>;
/// ```
pub fn route_forwarded(
&mut self,
forwarded: Vec<ForwardedFrame>,
) -> Result<(), <L as RouterLeaf>::RouteError> {
for forwarded in forwarded {
match forwarded.route {
RouteDecision::Parent => {
self.leaf
.route_to_parent(self.endpoint.path(), forwarded.frame)?;
}
RouteDecision::Child(index) => {
let child_path = self
.endpoint
.child_routes()
.get(index)
.map(|child| child.path.clone())
.unwrap_or_default();
self.leaf.route_to_child(&child_path, forwarded.frame)?;
}
RouteDecision::Local | RouteDecision::Drop => {}
}
}
Ok(())
} }
} }
@@ -128,6 +128,7 @@ impl ProtocolEndpoint {
children: Vec<ChildRoute>, children: Vec<ChildRoute>,
leaves: Vec<LeafSpec>, leaves: Vec<LeafSpec>,
) -> Self { ) -> Self {
let has_parent = parent_path.is_some();
let registered_child_paths = children let registered_child_paths = children
.iter() .iter()
.filter(|child| child.registered) .filter(|child| child.registered)
@@ -136,7 +137,8 @@ impl ProtocolEndpoint {
Self { Self {
local_id: None, local_id: None,
routing: CompiledRoutes::new(&path, &registered_child_paths, parent_path.is_some()), parent_path,
routing: CompiledRoutes::new(&path, &registered_child_paths, has_parent),
path, path,
children, children,
leaves: leaves leaves: leaves
@@ -194,6 +196,129 @@ impl ProtocolEndpoint {
self.local_id.as_deref() self.local_id.as_deref()
} }
/// Returns the absolute path of this endpoint's direct parent, if one exists.
///
/// What it is: the currently configured one-hop parent boundary for this
/// endpoint.
///
/// Why it exists: router-style leaves need to expose and inspect the tree edge
/// they use for upstream traffic.
///
/// # Example
/// ```rust
/// use unshell::protocol::tree::ProtocolEndpoint;
/// let endpoint = ProtocolEndpoint::new(vec!["worker".into()], Some(Vec::new()), Vec::new(), Vec::new());
/// assert_eq!(endpoint.parent_path(), Some([].as_slice()));
/// ```
pub fn parent_path(&self) -> Option<&[String]> {
self.parent_path.as_deref()
}
/// Returns the direct child routes currently known to this endpoint.
///
/// What it is: the local routing-table inputs for direct descendants.
///
/// Why it exists: management leaves often need to inspect or mirror the child
/// topology they are controlling.
///
/// # Example
/// ```rust
/// use unshell::protocol::tree::{ChildRoute, ProtocolEndpoint};
/// let endpoint = ProtocolEndpoint::new(
/// vec!["root".into()],
/// None,
/// vec![ChildRoute::registered(vec!["root".into(), "child".into()])],
/// Vec::new(),
/// );
/// assert_eq!(endpoint.child_routes().len(), 1);
/// ```
pub fn child_routes(&self) -> &[ChildRoute] {
&self.children
}
/// Replaces the configured direct parent path and recompiles local routing.
///
/// What it is: the supported way to attach or detach this endpoint from its
/// upstream boundary.
///
/// Why it exists: a router leaf should be able to promote or remove its parent
/// connection without rebuilding the entire endpoint.
///
/// # Example
/// ```rust
/// use unshell::protocol::tree::ProtocolEndpoint;
/// let mut endpoint = ProtocolEndpoint::new(vec!["root".into(), "worker".into()], Some(vec!["root".into()]), Vec::new(), Vec::new());
/// endpoint.set_parent_path(None)?;
/// assert!(endpoint.parent_path().is_none());
/// # Ok::<(), unshell::protocol::tree::EndpointError>(())
/// ```
pub fn set_parent_path(
&mut self,
parent_path: Option<Vec<String>>,
) -> Result<(), EndpointError> {
if let Some(path) = parent_path.as_deref() {
self.validate_direct_parent_path(path)?;
}
self.parent_path = parent_path;
self.rebuild_routing();
Ok(())
}
/// Inserts or updates one direct child route and recompiles local routing.
///
/// What it is: the supported mutation API for the endpoint's child list.
///
/// Why it exists: router-management leaves need one invariant-preserving way to
/// reflect child connection changes into path routing.
///
/// # Example
/// ```rust
/// use unshell::protocol::tree::{ChildRoute, ProtocolEndpoint};
/// let mut endpoint = ProtocolEndpoint::new(vec!["root".into()], None, Vec::new(), Vec::new());
/// endpoint.upsert_child_route(ChildRoute::registered(vec!["root".into(), "child".into()]))?;
/// assert_eq!(endpoint.child_routes().len(), 1);
/// # Ok::<(), unshell::protocol::tree::EndpointError>(())
/// ```
pub fn upsert_child_route(&mut self, route: ChildRoute) -> Result<(), EndpointError> {
self.validate_direct_child_path(&route.path)?;
if let Some(existing) = self.children.iter_mut().find(|child| child.path == route.path) {
*existing = route;
} else {
self.children.push(route);
}
self.rebuild_routing();
Ok(())
}
/// Removes one direct child route by absolute path and recompiles local routing.
///
/// What it is: the supported mutation API for pruning a direct descendant.
///
/// Why it exists: connection-management leaves need to tear down routes without
/// mutating the endpoint internals directly.
///
/// # Example
/// ```rust
/// use unshell::protocol::tree::{ChildRoute, ProtocolEndpoint};
/// let mut endpoint = ProtocolEndpoint::new(
/// vec!["root".into()],
/// None,
/// vec![ChildRoute::registered(vec!["root".into(), "child".into()])],
/// Vec::new(),
/// );
/// assert!(endpoint.remove_child_route(&[String::from("root"), String::from("child")]));
/// assert!(endpoint.child_routes().is_empty());
/// ```
pub fn remove_child_route(&mut self, path: &[String]) -> bool {
let original_len = self.children.len();
self.children.retain(|child| child.path != path);
let removed = self.children.len() != original_len;
if removed {
self.rebuild_routing();
}
removed
}
/// Registers a procedure that is handled directly by the endpoint. /// Registers a procedure that is handled directly by the endpoint.
/// ///
/// Endpoint-level procedures exist for protocol services that are not attached to one leaf, /// Endpoint-level procedures exist for protocol services that are not attached to one leaf,
@@ -230,6 +355,43 @@ impl ProtocolEndpoint {
self.hooks.allocate_hook_id(&self.path) self.hooks.allocate_hook_id(&self.path)
} }
fn rebuild_routing(&mut self) {
let registered_child_paths = self
.children
.iter()
.filter(|child| child.registered)
.map(|child| child.path.clone())
.collect::<Vec<_>>();
self.routing = CompiledRoutes::new(
&self.path,
&registered_child_paths,
self.parent_path.is_some(),
);
}
fn validate_direct_parent_path(&self, parent_path: &[String]) -> Result<(), EndpointError> {
let Some((_, expected_parent)) = self.path.split_last() else {
return Err(EndpointError::Validation(ValidationError::TopologyInvariant(
"root endpoints cannot declare a parent path",
)));
};
if parent_path != expected_parent {
return Err(EndpointError::Validation(ValidationError::TopologyInvariant(
"parent path must equal the direct path prefix of this endpoint",
)));
}
Ok(())
}
fn validate_direct_child_path(&self, child_path: &[String]) -> Result<(), EndpointError> {
if child_path.len() != self.path.len() + 1 || !child_path.starts_with(&self.path) {
return Err(EndpointError::Validation(ValidationError::TopologyInvariant(
"child path must be one direct descendant of this endpoint",
)));
}
Ok(())
}
/// Encodes a call frame without routing it through the local endpoint. /// Encodes a call frame without routing it through the local endpoint.
/// ///
/// This exists for callers that want a fully encoded outbound frame while handling transport /// This exists for callers that want a fully encoded outbound frame while handling transport
@@ -177,6 +177,33 @@ pub enum EndpointOutcome {
Dropped, Dropped,
} }
/// One framed packet together with the next hop selected by endpoint routing.
///
/// What it is: a transport-ready frame paired with the resolved direction the
/// endpoint chose for it.
///
/// Why it exists: high-level runtimes often flatten forwarded traffic down to raw
/// bytes, but router-host leaves need the route decision so they can choose the
/// correct parent or child connection.
///
/// # Example
/// ```rust
/// use unshell::protocol::FrameBytes;
/// use unshell::protocol::tree::{ForwardedFrame, RouteDecision};
/// let forwarded = ForwardedFrame {
/// route: RouteDecision::Parent,
/// frame: FrameBytes::new(),
/// };
/// assert!(matches!(forwarded.route, RouteDecision::Parent));
/// ```
#[derive(Debug, Clone)]
pub struct ForwardedFrame {
/// The next hop selected by the endpoint runtime.
pub route: RouteDecision,
/// The encoded protocol frame to send over that hop.
pub frame: FrameBytes,
}
/// Error surfaced while validating or encoding protocol frames. /// Error surfaced while validating or encoding protocol frames.
/// ///
/// This exists so endpoint callers can preserve the distinction between malformed wire/archive /// This exists so endpoint callers can preserve the distinction between malformed wire/archive
@@ -288,6 +315,7 @@ pub trait Endpoint {
pub struct ProtocolEndpoint { pub struct ProtocolEndpoint {
pub(crate) local_id: Option<String>, pub(crate) local_id: Option<String>,
pub(crate) path: Vec<String>, pub(crate) path: Vec<String>,
pub(crate) parent_path: Option<Vec<String>>,
pub(crate) children: Vec<ChildRoute>, pub(crate) children: Vec<ChildRoute>,
pub(crate) routing: CompiledRoutes, pub(crate) routing: CompiledRoutes,
pub(crate) leaves: BTreeMap<String, LeafSpec>, pub(crate) leaves: BTreeMap<String, LeafSpec>,
@@ -11,6 +11,6 @@ mod introspection;
mod receive; mod receive;
pub use core::{ pub use core::{
ChildRoute, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec, LocalEvent, ChildRoute, Endpoint, EndpointError, EndpointOutcome, ForwardedFrame, Ingress, LeafSpec, LocalEvent,
ProtocolEndpoint, ProtocolEndpoint,
}; };
+61 -3
View File
@@ -7,7 +7,9 @@
use alloc::{string::String, vec::Vec}; use alloc::{string::String, vec::Vec};
use super::LeafSpec; use crate::protocol::FrameBytes;
use super::{ChildRoute, LeafSpec, ProtocolEndpoint};
/// Static metadata for one application-defined protocol leaf. /// Static metadata for one application-defined protocol leaf.
/// ///
@@ -167,7 +169,7 @@ pub trait LeafBinding: ProtocolLeaf {
/// impl CallProcedures for ExampleLeaf { /// impl CallProcedures for ExampleLeaf {
/// type Error = core::convert::Infallible; /// type Error = core::convert::Infallible;
/// fn procedure_suffixes() -> &'static [&'static str] { &["invoke"] } /// fn procedure_suffixes() -> &'static [&'static str] { &["invoke"] }
/// fn dispatch_call(&mut self, _call: IncomingCall) -> Result<unshell::protocol::tree::CallReply, DispatchError<Self::Error>> { /// fn dispatch_call(&mut self, _endpoint: &mut unshell::protocol::tree::ProtocolEndpoint, _call: IncomingCall) -> Result<unshell::protocol::tree::CallReply, DispatchError<Self::Error>> {
/// Ok(unshell::protocol::tree::CallReply::NoReply) /// Ok(unshell::protocol::tree::CallReply::NoReply)
/// } /// }
/// } /// }
@@ -191,7 +193,7 @@ pub trait CallProcedures: LeafDeclaration {
/// impl CallProcedures for ExampleLeaf { /// impl CallProcedures for ExampleLeaf {
/// type Error = core::convert::Infallible; /// type Error = core::convert::Infallible;
/// fn procedure_suffixes() -> &'static [&'static str] { &["invoke"] } /// fn procedure_suffixes() -> &'static [&'static str] { &["invoke"] }
/// fn dispatch_call(&mut self, _call: IncomingCall) -> Result<unshell::protocol::tree::CallReply, DispatchError<Self::Error>> { /// fn dispatch_call(&mut self, _endpoint: &mut unshell::protocol::tree::ProtocolEndpoint, _call: IncomingCall) -> Result<unshell::protocol::tree::CallReply, DispatchError<Self::Error>> {
/// Ok(unshell::protocol::tree::CallReply::NoReply) /// Ok(unshell::protocol::tree::CallReply::NoReply)
/// } /// }
/// } /// }
@@ -199,10 +201,66 @@ pub trait CallProcedures: LeafDeclaration {
/// ``` /// ```
fn dispatch_call( fn dispatch_call(
&mut self, &mut self,
endpoint: &mut ProtocolEndpoint,
call: crate::protocol::tree::IncomingCall, call: crate::protocol::tree::IncomingCall,
) -> Result<crate::protocol::tree::CallReply, crate::protocol::tree::DispatchError<Self::Error>>; ) -> Result<crate::protocol::tree::CallReply, crate::protocol::tree::DispatchError<Self::Error>>;
} }
/// Router-facing transport hooks for leaves that own parent/child connections.
///
/// What it is: an opt-in trait for leaves that want to act as the transport layer
/// for one endpoint's forwarded traffic.
///
/// Why it exists: ordinary leaves only need validated local events, but a router
/// leaf also needs to know its active parent/children and where to physically send
/// frames chosen by the endpoint's routing logic.
///
/// # Example
/// ```rust
/// use unshell::protocol::FrameBytes;
/// use unshell::protocol::tree::{ChildRoute, RouterLeaf};
/// #[derive(Default)]
/// struct DemoRouter {
/// parent: Option<Vec<String>>,
/// children: Vec<ChildRoute>,
/// }
/// impl unshell::protocol::tree::ProtocolLeaf for DemoRouter {
/// fn leaf_name() -> String { "org.example.v1.router".into() }
/// }
/// impl RouterLeaf for DemoRouter {
/// type RouteError = core::convert::Infallible;
///
/// fn parent_path(&self) -> Option<&[String]> { self.parent.as_deref() }
/// fn child_routes(&self) -> &[ChildRoute] { &self.children }
/// fn route_to_parent(&mut self, _local_path: &[String], _frame: FrameBytes) -> Result<(), Self::RouteError> { Ok(()) }
/// fn route_to_child(&mut self, _child_path: &[String], _frame: FrameBytes) -> Result<(), Self::RouteError> { Ok(()) }
/// }
/// ```
pub trait RouterLeaf: ProtocolLeaf {
/// Transport-specific error surfaced while handing a frame to the chosen link.
type RouteError;
/// Returns the currently connected direct parent path, if any.
fn parent_path(&self) -> Option<&[String]>;
/// Returns the currently connected direct child routes.
fn child_routes(&self) -> &[ChildRoute];
/// Sends one routed frame toward the direct parent connection.
fn route_to_parent(
&mut self,
local_path: &[String],
frame: FrameBytes,
) -> Result<(), Self::RouteError>;
/// Sends one routed frame toward the chosen direct child connection.
fn route_to_child(
&mut self,
child_path: &[String],
frame: FrameBytes,
) -> Result<(), Self::RouteError>;
}
/// Builds one canonical dotted leaf id from crate-local metadata plus optional /// Builds one canonical dotted leaf id from crate-local metadata plus optional
/// user overrides. /// user overrides.
/// ///
+4 -3
View File
@@ -16,8 +16,8 @@ mod routing;
pub use call::{ pub use call::{
Call, CallLeaf, CallReply, CallResult, DispatchError, IncomingCall, IncomingData, Call, CallLeaf, CallReply, CallResult, DispatchError, IncomingCall, IncomingData,
IncomingFault, LeafRuntime, LeafRuntimeError, OutgoingData, RuntimeOutcome, decode_call_input, IncomingFault, LeafRuntime, LeafRuntimeError, OutgoingData, RoutedRuntimeOutcome,
encode_call_reply, RuntimeOutcome, decode_call_input, encode_call_reply,
}; };
pub use endpoint::{ pub use endpoint::{
ChildRoute, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec, LocalEvent, ChildRoute, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec, LocalEvent,
@@ -25,7 +25,8 @@ pub use endpoint::{
}; };
pub use hook::{ActiveHook, HookConflict, HookKey, HookTable, PendingHook}; pub use hook::{ActiveHook, HookConflict, HookKey, HookTable, PendingHook};
pub use leaf::{ pub use leaf::{
CallProcedures, LeafBinding, LeafDeclaration, ProtocolLeaf, derive_leaf_name, leaf_spec_of, CallProcedures, LeafBinding, LeafDeclaration, ProtocolLeaf, RouterLeaf, derive_leaf_name,
leaf_spec_of,
}; };
pub use procedure::{ pub use procedure::{
Procedure, ProcedureEffect, ProcedureMetadata, ProcedureRuntime, ProcedureRuntimeError, Procedure, ProcedureEffect, ProcedureMetadata, ProcedureRuntime, ProcedureRuntimeError,
@@ -32,6 +32,8 @@ pub enum ValidationError {
CallInvariant(&'static str), CallInvariant(&'static str),
/// A hook lifecycle transition would break protocol state invariants. /// A hook lifecycle transition would break protocol state invariants.
HookInvariant(&'static str), HookInvariant(&'static str),
/// One endpoint-topology update would break local tree invariants.
TopologyInvariant(&'static str),
/// A hook id collided with existing endpoint-local state. /// A hook id collided with existing endpoint-local state.
InvalidHookId, InvalidHookId,
} }
@@ -43,6 +45,7 @@ impl fmt::Display for ValidationError {
Self::ProcedureId(message) => write!(f, "invalid procedure id: {message}"), Self::ProcedureId(message) => write!(f, "invalid procedure id: {message}"),
Self::CallInvariant(message) => write!(f, "invalid call: {message}"), Self::CallInvariant(message) => write!(f, "invalid call: {message}"),
Self::HookInvariant(message) => write!(f, "invalid hook state: {message}"), Self::HookInvariant(message) => write!(f, "invalid hook state: {message}"),
Self::TopologyInvariant(message) => write!(f, "invalid topology: {message}"),
Self::InvalidHookId => f.write_str("invalid hook identifier"), Self::InvalidHookId => f.write_str("invalid hook identifier"),
} }
} }