From 71d1aee23567663be612a0c98e879213f4706e83 Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Sat, 9 May 2026 13:40:21 -0600 Subject: [PATCH] Reduce leaf send call actions --- API.md | 18 +- .../src/protocol/tree/endpoint/core.rs | 2 +- unshell-protocol/src/protocol/tree/hook.rs | 2 +- unshell-runtime/src/node/packet.rs | 58 ++- unshell-runtime/src/node/runtime.rs | 356 +++++++++++++++++- 5 files changed, 410 insertions(+), 26 deletions(-) diff --git a/API.md b/API.md index cac2536..65d4814 100644 --- a/API.md +++ b/API.md @@ -329,9 +329,9 @@ connection closes or unregisters ## Known Gaps In The Current Branch -- `LeafAction::SendHookData` is reduced by `NodeRuntime`; other action variants - are still unsupported and must remain queued when encountered. -- Local outbound calls through the runtime are not implemented. +- `LeafAction::SendCall` and `LeafAction::SendHookData` are reduced by + `NodeRuntime`; hook fault and connection action variants are still unsupported + and must remain queued when encountered. - Hook fault actions through the runtime are not implemented. - Connection actions through the runtime are not implemented. - Disconnect does not yet clean hooks, sessions, route state, and queued effects. @@ -342,11 +342,11 @@ connection closes or unregisters Implement the next narrow leaf-action path: -1. Apply queued `LeafAction::SendCall` through endpoint packet state. -2. Preserve hook reservation and routing failure semantics without dropping - unprocessed actions. -3. Add tests proving a local leaf can initiate an outbound call and receive the - response through the existing dispatch path. +1. Apply queued `LeafAction::FailHook` through endpoint packet state. +2. Preserve pending/active hook cleanup semantics without dropping unprocessed + actions. +3. Keep connection registration actions queued until runtime-owned disconnect + cleanup can update connections, routes, hooks, and queued effects atomically. That slice should continue the one-variant-at-a-time reducer approach without -implementing hook faults or connection actions early. +implementing connection actions early. diff --git a/unshell-protocol/src/protocol/tree/endpoint/core.rs b/unshell-protocol/src/protocol/tree/endpoint/core.rs index d858f04..cacf552 100644 --- a/unshell-protocol/src/protocol/tree/endpoint/core.rs +++ b/unshell-protocol/src/protocol/tree/endpoint/core.rs @@ -311,7 +311,7 @@ pub trait Endpoint { /// let endpoint = ProtocolEndpoint::new(vec!["worker".into()], Some(Vec::new()), Vec::new(), Vec::new()); /// let _ = endpoint; /// ``` -#[derive(Debug, Default)] +#[derive(Debug, Clone, Default)] pub struct ProtocolEndpoint { pub(crate) local_id: Option, pub(crate) path: Vec, diff --git a/unshell-protocol/src/protocol/tree/hook.rs b/unshell-protocol/src/protocol/tree/hook.rs index 368b4b4..7b7b5f2 100644 --- a/unshell-protocol/src/protocol/tree/hook.rs +++ b/unshell-protocol/src/protocol/tree/hook.rs @@ -130,7 +130,7 @@ pub struct HookConflict; /// }).unwrap(); /// assert_eq!(hooks.pending_len(), 1); /// ``` -#[derive(Debug, Default)] +#[derive(Debug, Clone, Default)] pub struct HookTable { pending: BTreeMap, active: BTreeMap, diff --git a/unshell-runtime/src/node/packet.rs b/unshell-runtime/src/node/packet.rs index a076fe9..b1d3eaa 100644 --- a/unshell-runtime/src/node/packet.rs +++ b/unshell-runtime/src/node/packet.rs @@ -5,7 +5,10 @@ //! into packet-only and runtime-owned layers. The wrapper does not own transport //! handles, does not dispatch leaves, and does not make admission decisions. -use unshell_protocol::{FrameBytes, tree::Endpoint as ProtocolEndpointTrait}; +use unshell_protocol::{ + CallMessage, FrameBytes, PacketHeader, PacketType, tree::Endpoint as ProtocolEndpointTrait, + validate_call, validate_header, validate_procedure_id, +}; pub use unshell_protocol::tree::{ ChildRoute, EndpointError, EndpointOutcome, HookKey, Ingress, LeafSpec, LocalEvent, @@ -32,7 +35,7 @@ pub trait PacketProcessor { /// This is a compatibility shell around [`ProtocolEndpoint`]. It exists so new /// runtime code can depend on `unshell_runtime::node::EndpointState` while the /// old protocol-tree endpoint remains the source of truth for packet invariants. -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct EndpointState { endpoint: ProtocolEndpoint, } @@ -87,6 +90,57 @@ impl EndpointState { .send_data(dst_path, hook_id, procedure_id, data, end_hook) } + /// Builds and routes one call packet through the wrapped endpoint state. + pub fn send_call( + &mut self, + dst_path: alloc::vec::Vec, + dst_leaf: Option, + procedure_id: alloc::string::String, + response_hook_id: Option, + data: alloc::vec::Vec, + ) -> Result { + self.endpoint + .send_call(dst_path, dst_leaf, procedure_id, response_hook_id, data) + } + + /// Validates an outbound call request before allocating response hook state. + pub fn validate_call_request( + &self, + dst_path: &[alloc::string::String], + dst_leaf: Option<&alloc::string::String>, + procedure_id: &str, + data: &[u8], + expects_response: bool, + ) -> Result<(), EndpointError> { + validate_procedure_id(procedure_id)?; + + let header = PacketHeader { + packet_type: PacketType::Call, + src_path: self.endpoint.path().to_vec(), + dst_path: dst_path.to_vec(), + dst_leaf: dst_leaf.cloned(), + hook_id: None, + }; + let call = CallMessage { + procedure_id: procedure_id.into(), + data: data.to_vec(), + response_hook: expects_response.then(|| unshell_protocol::HookTarget { + hook_id: 1, + return_path: self.endpoint.path().to_vec(), + }), + }; + + validate_header(&header)?; + validate_call(&header, &call)?; + Ok(()) + } + + /// Allocates a response hook id scoped to this endpoint path. + #[must_use] + pub fn allocate_hook_id(&mut self) -> u64 { + self.endpoint.allocate_hook_id() + } + /// Consumes the wrapper and returns the underlying protocol endpoint. #[must_use] pub fn into_endpoint(self) -> ProtocolEndpoint { diff --git a/unshell-runtime/src/node/runtime.rs b/unshell-runtime/src/node/runtime.rs index 6c98c85..605175a 100644 --- a/unshell-runtime/src/node/runtime.rs +++ b/unshell-runtime/src/node/runtime.rs @@ -3,7 +3,8 @@ //! This first slice owns transport and connection metadata, derives ingress from //! registered connections, delegates packet invariants to [`EndpointState`], and //! queues concrete runtime effects. Leaf action reduction is intentionally -//! narrow: this slice only turns hook-data replies into endpoint outcomes. +//! narrow: this slice only turns outbound calls and hook-data replies into +//! endpoint outcomes. use crate::alloc::{string::String, vec::Vec}; use crate::connections::{ @@ -543,9 +544,10 @@ where /// Reduces queued leaf actions through endpoint packet state. /// - /// Only [`LeafAction::SendHookData`] is implemented in this slice. Unsupported - /// actions stop reduction and remain queued with all later actions so callers - /// can retry after a future runtime gains support. + /// [`LeafAction::SendCall`] and [`LeafAction::SendHookData`] are implemented + /// in this slice. Unsupported actions stop reduction and remain queued with + /// all later actions so callers can retry after a future runtime gains + /// support. pub fn reduce_leaf_actions(&mut self) -> Result> { let mut reduced = 0usize; let mut retained = Vec::new(); @@ -553,6 +555,64 @@ where while let Some((leaf_id, action)) = pending.next() { match action { + LeafAction::SendCall(call) => { + let original_action = LeafAction::SendCall(call.clone()); + let route = self.endpoint.route_decision(&call.dst_path); + if route_requires_connection(route) + && self.connection_for_route(route).is_none() + { + retained.push((leaf_id, original_action)); + retained.extend(pending); + self.leaf_actions = retained; + return Err(NodeRuntimeError::MissingRouteConnection); + } + + if let Err(error) = self.endpoint.validate_call_request( + &call.dst_path, + call.dst_leaf.as_ref(), + &call.procedure_id, + &call.payload, + call.expects_response, + ) { + retained.push((leaf_id, original_action)); + retained.extend(pending); + self.leaf_actions = retained; + return Err(NodeRuntimeError::Endpoint(error)); + } + + // Allocate only after transport availability is known. A + // failed preflight must leave the queued call retryable + // without consuming a hook id or reserving pending hook state. + let endpoint_checkpoint = self.endpoint.clone(); + let response_hook_id = call + .expects_response + .then(|| self.endpoint.allocate_hook_id()); + let outcome = match self.endpoint.send_call( + call.dst_path, + call.dst_leaf, + call.procedure_id, + response_hook_id, + call.payload, + ) { + Ok(outcome) => outcome, + Err(error) => { + self.endpoint = endpoint_checkpoint; + retained.push((leaf_id, original_action)); + retained.extend(pending); + self.leaf_actions = retained; + return Err(NodeRuntimeError::Endpoint(error)); + } + }; + + if let Err(error) = self.apply_outcome(outcome) { + self.endpoint = endpoint_checkpoint; + retained.push((leaf_id, original_action)); + retained.extend(pending); + self.leaf_actions = retained; + return Err(error); + } + reduced += 1; + } LeafAction::SendHookData(data) => { let original_action = LeafAction::SendHookData(data.clone()); let route = self.endpoint.route_decision(&data.dst_path); @@ -1476,6 +1536,279 @@ mod tests { assert!(data.end_hook); } + #[test] + fn leaf_send_call_reduces_to_child_transport_frame() { + let child = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::registered( + child, + ConnectionDirection::Child, + vec![String::from("agent"), String::from("worker")], + ConnectionGeneration::INITIAL, + )); + + let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.client")); + let endpoint = ProtocolEndpoint::new( + vec![String::from("agent")], + None, + vec![ChildRoute::registered(vec![ + String::from("agent"), + String::from("worker"), + ])], + Vec::new(), + ); + let mut runtime = NodeRuntime::new( + EndpointState::new(endpoint), + connections, + RecordingTransport::default(), + ); + runtime.leaf_actions.push(( + leaf_id, + LeafAction::SendCall(OutboundCall { + dst_path: vec![String::from("agent"), String::from("worker")], + dst_leaf: Some(String::from("org.example.v1.echo")), + procedure_id: String::from("org.example.v1.echo.invoke"), + payload: vec![4, 5, 6], + expects_response: false, + }), + )); + + let reduced = runtime.reduce_leaf_actions().expect("call reduces"); + let outcome = runtime.tick(TickBudget::default()).expect("tick flushes"); + + assert_eq!(reduced, 1); + assert!(runtime.leaf_actions().is_empty()); + assert_eq!(outcome.outbound_frames, 1); + assert_eq!(runtime.transport().sent.len(), 1); + assert_eq!(runtime.transport().sent[0].0, child); + let parsed = decode_frame(&runtime.transport().sent[0].1).expect("sent call decodes"); + let header = parsed.header(); + assert_eq!(header.packet_type, PacketType::Call); + assert_eq!(header.src_path, [String::from("agent")]); + assert_eq!( + header.dst_path, + [String::from("agent"), String::from("worker")] + ); + assert_eq!(header.dst_leaf.as_deref(), Some("org.example.v1.echo")); + let call = parsed.deserialize_call().expect("payload is call"); + assert_eq!(call.procedure_id, "org.example.v1.echo.invoke"); + assert_eq!(call.data, [4, 5, 6]); + assert!(call.response_hook.is_none()); + } + + #[test] + fn expected_response_send_call_preflights_route_and_uses_retry_hook() { + let child = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::connected(child, ConnectionGeneration::INITIAL)); + + let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.client")); + let endpoint = ProtocolEndpoint::new( + vec![String::from("agent")], + None, + vec![ChildRoute::registered(vec![ + String::from("agent"), + String::from("worker"), + ])], + Vec::new(), + ); + let mut runtime = NodeRuntime::new( + EndpointState::new(endpoint), + connections, + RecordingTransport::default(), + ); + runtime.leaf_actions.push(( + leaf_id, + LeafAction::SendCall(OutboundCall { + dst_path: vec![String::from("agent"), String::from("worker")], + dst_leaf: Some(String::from("org.example.v1.echo")), + procedure_id: String::from("org.example.v1.echo.invoke"), + payload: vec![], + expects_response: true, + }), + )); + + let error = runtime + .reduce_leaf_actions() + .expect_err("missing child connection is reported"); + + assert!(matches!(error, NodeRuntimeError::MissingRouteConnection)); + assert_eq!(runtime.leaf_actions().len(), 1); + assert!(runtime.effects().is_empty()); + + runtime + .register_child_connection( + child, + vec![String::from("agent"), String::from("worker")], + ConnectionGeneration::INITIAL, + ) + .expect("child route restored"); + let reduced = runtime + .reduce_leaf_actions() + .expect("retry reduces after route exists"); + let outcome = runtime.tick(TickBudget::default()).expect("tick flushes"); + + assert_eq!(reduced, 1); + assert_eq!(outcome.outbound_frames, 1); + let parsed = decode_frame(&runtime.transport().sent[0].1).expect("sent call decodes"); + let call = parsed.deserialize_call().expect("payload is call"); + assert_eq!( + call.response_hook, + Some(HookTarget { + hook_id: 1, + return_path: vec![String::from("agent")], + }) + ); + + let response = encode_packet( + &PacketHeader { + packet_type: PacketType::Data, + src_path: vec![String::from("agent"), String::from("worker")], + dst_path: vec![String::from("agent")], + dst_leaf: None, + hook_id: Some(1), + }, + &unshell_protocol::DataMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![9], + end_hook: true, + }, + ) + .expect("response encodes"); + runtime + .receive_frame(child, response) + .expect("response hook is accepted"); + + assert!( + matches!(runtime.effects()[0], RuntimeEffect::Local(LocalEvent::Data { ref hook_key, .. }) if hook_key.hook_id == 1) + ); + } + + #[test] + fn invalid_send_call_does_not_affect_next_response_hook_id() { + let child = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::registered( + child, + ConnectionDirection::Child, + vec![String::from("agent"), String::from("worker")], + ConnectionGeneration::INITIAL, + )); + + let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.client")); + let endpoint = ProtocolEndpoint::new( + vec![String::from("agent")], + None, + vec![ChildRoute::registered(vec![ + String::from("agent"), + String::from("worker"), + ])], + Vec::new(), + ); + let mut runtime = NodeRuntime::new( + EndpointState::new(endpoint), + connections, + RecordingTransport::default(), + ); + runtime.leaf_actions.push(( + leaf_id.clone(), + LeafAction::SendCall(OutboundCall { + dst_path: vec![String::from("agent"), String::from("worker")], + dst_leaf: Some(String::from("org.example.v1.echo")), + procedure_id: String::new(), + payload: vec![], + expects_response: false, + }), + )); + + let error = runtime + .reduce_leaf_actions() + .expect_err("invalid procedure is rejected"); + + assert!(matches!(error, NodeRuntimeError::Endpoint(_))); + assert_eq!(runtime.leaf_actions().len(), 1); + runtime.leaf_actions.clear(); + runtime.leaf_actions.push(( + leaf_id, + LeafAction::SendCall(OutboundCall { + dst_path: vec![String::from("agent"), String::from("worker")], + dst_leaf: Some(String::from("org.example.v1.echo")), + procedure_id: String::from("org.example.v1.echo.invoke"), + payload: vec![], + expects_response: true, + }), + )); + + runtime.reduce_leaf_actions().expect("valid retry reduces"); + runtime.tick(TickBudget::default()).expect("tick flushes"); + + let parsed = decode_frame(&runtime.transport().sent[0].1).expect("sent call decodes"); + let call = parsed.deserialize_call().expect("payload is call"); + assert_eq!( + call.response_hook, + Some(HookTarget { + hook_id: 1, + return_path: vec![String::from("agent")], + }) + ); + } + + #[test] + fn failed_leaf_send_call_routing_retains_failed_and_remaining_actions() { + let child = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::connected(child, ConnectionGeneration::INITIAL)); + + let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.client")); + let endpoint = ProtocolEndpoint::new( + vec![String::from("agent")], + None, + vec![ChildRoute::registered(vec![ + String::from("agent"), + String::from("worker"), + ])], + Vec::new(), + ); + let mut runtime = NodeRuntime::new( + EndpointState::new(endpoint), + connections, + RecordingTransport::default(), + ); + runtime.leaf_actions.push(( + leaf_id.clone(), + LeafAction::SendCall(OutboundCall { + dst_path: vec![String::from("agent"), String::from("worker")], + dst_leaf: Some(String::from("org.example.v1.echo")), + procedure_id: String::from("org.example.v1.echo.invoke"), + payload: vec![], + expects_response: true, + }), + )); + runtime.leaf_actions.push(( + leaf_id, + LeafAction::FailHook { + hook_id: 7, + fault: ProtocolFault::INTERNAL_ERROR, + }, + )); + + let error = runtime + .reduce_leaf_actions() + .expect_err("missing child connection is reported"); + + assert!(matches!(error, NodeRuntimeError::MissingRouteConnection)); + assert_eq!(runtime.leaf_actions().len(), 2); + assert!(matches!( + runtime.leaf_actions()[0].1, + LeafAction::SendCall(_) + )); + assert!(matches!( + runtime.leaf_actions()[1].1, + LeafAction::FailHook { .. } + )); + assert!(runtime.effects().is_empty()); + } + #[test] fn unsupported_leaf_action_is_reported_and_retained() { let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.echo")); @@ -1491,13 +1824,10 @@ mod tests { ); runtime.leaf_actions.push(( leaf_id.clone(), - LeafAction::SendCall(OutboundCall { - dst_path: vec![], - dst_leaf: None, - procedure_id: String::from("org.example.v1.echo.invoke"), - payload: vec![], - expects_response: false, - }), + LeafAction::FailHook { + hook_id: 7, + fault: ProtocolFault::INTERNAL_ERROR, + }, )); runtime.leaf_actions.push(( leaf_id.clone(), @@ -1513,12 +1843,12 @@ mod tests { assert!(matches!( error, NodeRuntimeError::UnsupportedLeafAction { ref leaf_id, action } - if leaf_id.as_str() == "org.example.v1.echo" && action == "SendCall" + if leaf_id.as_str() == "org.example.v1.echo" && action == "FailHook" )); assert_eq!(runtime.leaf_actions().len(), 2); assert!(matches!( runtime.leaf_actions()[0].1, - LeafAction::SendCall(_) + LeafAction::FailHook { .. } )); assert!(matches!( runtime.leaf_actions()[1].1,