diff --git a/API.md b/API.md index f20b8cf..cac2536 100644 --- a/API.md +++ b/API.md @@ -168,6 +168,10 @@ impl NodeRuntime { pub fn dispatch_local_effects(&mut self) -> Result>; + pub fn reduce_leaf_actions(&mut self) -> Result> + where + T: Transport; + pub fn drain_leaf_actions(&mut self) -> impl Iterator; } @@ -325,21 +329,24 @@ connection closes or unregisters ## Known Gaps In The Current Branch -- `LeafAction` values are queued by `LeafContext` but not yet applied by - `NodeRuntime`. +- `LeafAction::SendHookData` is reduced by `NodeRuntime`; other action variants + are still unsupported and must remain queued when encountered. - Local outbound calls through the runtime are not implemented. +- Hook fault actions through the runtime are not implemented. +- Connection actions through the runtime are not implemented. - Disconnect does not yet clean hooks, sessions, route state, and queued effects. - Child ingress still allocates because the existing `Ingress::Child` owns a `Vec`. ## Next Implementation Slice -Implement one narrow end-to-end path: +Implement the next narrow leaf-action path: -1. Apply queued `LeafAction::SendHookData` through endpoint packet state. -2. Route the produced frame through `Transport`. -3. Add tests proving a leaf reply is framed and - sent through a registered connection. +1. Apply queued `LeafAction::SendCall` through endpoint packet state. +2. Preserve hook reservation and routing failure semantics without dropping + unprocessed actions. +3. Add tests proving a local leaf can initiate an outbound call and receive the + response through the existing dispatch path. -That slice forces the real architecture to work without overbuilding the rest of -the migration. +That slice should continue the one-variant-at-a-time reducer approach without +implementing hook faults or connection actions early. diff --git a/unshell-protocol/src/protocol/tree/endpoint/hooks.rs b/unshell-protocol/src/protocol/tree/endpoint/hooks.rs index b226428..81e9512 100644 --- a/unshell-protocol/src/protocol/tree/endpoint/hooks.rs +++ b/unshell-protocol/src/protocol/tree/endpoint/hooks.rs @@ -133,10 +133,19 @@ impl ProtocolEndpoint { Ok(EndpointOutcome::Dropped) } - pub(crate) fn decide_route(&self, dst_path: &[String]) -> RouteDecision { + /// Returns the current route decision for an absolute destination path. + /// + /// Runtime owners use this to validate transport availability before invoking + /// endpoint operations that also mutate hook state. + #[must_use] + pub fn route_decision(&self, dst_path: &[String]) -> RouteDecision { self.routing.route(dst_path) } + pub(crate) fn decide_route(&self, dst_path: &[String]) -> RouteDecision { + self.route_decision(dst_path) + } + /// Returns whether one `src_path` is topologically valid for the ingress side that delivered /// the frame. /// diff --git a/unshell-runtime/src/node/packet.rs b/unshell-runtime/src/node/packet.rs index dcaacc6..a076fe9 100644 --- a/unshell-runtime/src/node/packet.rs +++ b/unshell-runtime/src/node/packet.rs @@ -68,6 +68,25 @@ impl EndpointState { &mut self.endpoint } + /// Returns the endpoint's current route decision for an absolute path. + #[must_use] + pub fn route_decision(&self, dst_path: &[alloc::string::String]) -> RouteDecision { + self.endpoint.route_decision(dst_path) + } + + /// Builds and routes one hook-data packet through the wrapped endpoint state. + pub fn send_hook_data( + &mut self, + dst_path: alloc::vec::Vec, + hook_id: u64, + procedure_id: alloc::string::String, + data: alloc::vec::Vec, + end_hook: bool, + ) -> Result { + self.endpoint + .send_data(dst_path, hook_id, procedure_id, data, end_hook) + } + /// Consumes the wrapper and returns the underlying protocol endpoint. #[must_use] pub fn into_endpoint(self) -> ProtocolEndpoint { diff --git a/unshell-runtime/src/node/runtime.rs b/unshell-runtime/src/node/runtime.rs index 86b2d2c..6c98c85 100644 --- a/unshell-runtime/src/node/runtime.rs +++ b/unshell-runtime/src/node/runtime.rs @@ -2,8 +2,8 @@ //! //! This first slice owns transport and connection metadata, derives ingress from //! registered connections, delegates packet invariants to [`EndpointState`], and -//! queues concrete runtime effects. Leaf action application is intentionally not -//! implemented in this slice. +//! queues concrete runtime effects. Leaf action reduction is intentionally +//! narrow: this slice only turns hook-data replies into endpoint outcomes. use crate::alloc::{string::String, vec::Vec}; use crate::connections::{ @@ -65,6 +65,13 @@ pub enum NodeRuntimeError { Endpoint(EndpointError), /// Transport send, receive, or flush failed. Transport(TransportError), + /// A queued leaf action is not implemented by this runtime slice. + UnsupportedLeafAction { + /// Leaf id that requested the action. + leaf_id: LeafId, + /// Stable action name for diagnostics. + action: &'static str, + }, } /// Error returned when a leaf callback rejects a local event. @@ -107,6 +114,13 @@ where Self::MissingRouteConnection => f.write_str("route has no registered connection"), Self::Endpoint(error) => write!(f, "{error}"), Self::Transport(error) => write!(f, "{error}"), + Self::UnsupportedLeafAction { leaf_id, action } => { + write!( + f, + "leaf {} requested unsupported action {action}", + leaf_id.as_str() + ) + } } } } @@ -343,9 +357,6 @@ impl NodeRuntime { } /// Returns leaf actions queued by dispatched callbacks. - /// - /// These actions are intentionally only retained here; reducing them into - /// endpoint packets or connection changes belongs to a later runtime slice. #[must_use] pub fn leaf_actions(&self) -> &[(LeafId, LeafAction)] { &self.leaf_actions @@ -530,29 +541,76 @@ where self.apply_outcome(outcome) } - fn apply_outcome( - &mut self, - outcome: EndpointOutcome, - ) -> Result<(), NodeRuntimeError> { - match outcome { - EndpointOutcome::Forward { route, frame } => self.queue_forward(route, frame), - EndpointOutcome::Local(event) => { - self.effects.push(RuntimeEffect::Local(event)); - Ok(()) - } - EndpointOutcome::Dropped => { - self.effects.push(RuntimeEffect::Dropped); - Ok(()) + /// Reduces queued leaf actions through endpoint packet state. + /// + /// Only [`LeafAction::SendHookData`] is implemented in this slice. Unsupported + /// actions stop reduction and remain queued with all later actions so callers + /// can retry after a future runtime gains support. + pub fn reduce_leaf_actions(&mut self) -> Result> { + let mut reduced = 0usize; + let mut retained = Vec::new(); + let mut pending = core::mem::take(&mut self.leaf_actions).into_iter(); + + while let Some((leaf_id, action)) = pending.next() { + match action { + LeafAction::SendHookData(data) => { + let original_action = LeafAction::SendHookData(data.clone()); + let route = self.endpoint.route_decision(&data.dst_path); + if route_requires_connection(route) + && self.connection_for_route(route).is_none() + { + retained.push((leaf_id, original_action)); + retained.extend(pending); + self.leaf_actions = retained; + return Err(NodeRuntimeError::MissingRouteConnection); + } + + let outcome = match self.endpoint.send_hook_data( + data.dst_path, + data.hook_id, + data.procedure_id, + data.payload, + data.end_hook, + ) { + Ok(outcome) => outcome, + Err(error) => { + retained.push((leaf_id, original_action)); + retained.extend(pending); + self.leaf_actions = retained; + return Err(NodeRuntimeError::Endpoint(error)); + } + }; + + if let Err(error) = self.apply_outcome(outcome) { + retained.push((leaf_id, original_action)); + retained.extend(pending); + self.leaf_actions = retained; + return Err(error); + } + reduced += 1; + } + unsupported => { + let action_name = leaf_action_name(&unsupported); + retained.push((leaf_id.clone(), unsupported)); + retained.extend(pending); + self.leaf_actions = retained; + return Err(NodeRuntimeError::UnsupportedLeafAction { + leaf_id, + action: action_name, + }); + } } } + + self.leaf_actions = retained; + Ok(reduced) } - fn queue_forward( - &mut self, + fn connection_for_route( + &self, route: RouteDecision, - frame: FrameBytes, - ) -> Result<(), NodeRuntimeError> { - let (connection, generation) = match route { + ) -> Option<(ConnectionId, ConnectionGeneration)> { + match route { RouteDecision::Parent => self .connections .registered_by_direction(ConnectionDirection::Parent) @@ -582,7 +640,33 @@ where }), RouteDecision::Local | RouteDecision::Drop => None, } - .ok_or(NodeRuntimeError::MissingRouteConnection)?; + } + + fn apply_outcome( + &mut self, + outcome: EndpointOutcome, + ) -> Result<(), NodeRuntimeError> { + match outcome { + EndpointOutcome::Forward { route, frame } => self.queue_forward(route, frame), + EndpointOutcome::Local(event) => { + self.effects.push(RuntimeEffect::Local(event)); + Ok(()) + } + EndpointOutcome::Dropped => { + self.effects.push(RuntimeEffect::Dropped); + Ok(()) + } + } + } + + fn queue_forward( + &mut self, + route: RouteDecision, + frame: FrameBytes, + ) -> Result<(), NodeRuntimeError> { + let (connection, generation) = self + .connection_for_route(route) + .ok_or(NodeRuntimeError::MissingRouteConnection)?; self.effects.push(RuntimeEffect::SendFrame { connection, @@ -649,6 +733,19 @@ fn local_event_leaf_name(event: &LocalEvent) -> Option<&str> { } } +fn leaf_action_name(action: &LeafAction) -> &'static str { + match action { + LeafAction::SendCall(_) => "SendCall", + LeafAction::SendHookData(_) => "SendHookData", + LeafAction::FailHook { .. } => "FailHook", + LeafAction::Connection(_) => "Connection", + } +} + +const fn route_requires_connection(route: RouteDecision) -> bool { + matches!(route, RouteDecision::Parent | RouteDecision::Child(_)) +} + #[cfg(test)] mod tests { use core::cell::RefCell; @@ -659,16 +756,20 @@ mod tests { use crate::alloc::vec; use crate::alloc::vec::Vec; use crate::connections::{ - Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, Connections, + Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, ConnectionState, + Connections, }; - use crate::context::{LeafAction, OutboundHookData}; + use crate::context::{ConnectionAction, LeafAction, OutboundCall, OutboundHookData}; use crate::effects::RuntimeEffect; use crate::leaf::{Leaf, LeafCapabilities, LeafPermissions}; use crate::transport::Transport; use unshell_protocol::tree::{ ChildRoute, EndpointError, IncomingCall, LeafSpec, LocalEvent, ProtocolEndpoint, }; - use unshell_protocol::{CallMessage, FrameBytes, PacketHeader, PacketType, encode_packet}; + use unshell_protocol::{ + CallMessage, FrameBytes, HookTarget, PacketHeader, PacketType, ProtocolFault, decode_frame, + encode_packet, + }; use super::{EndpointState, NodeRuntime, NodeRuntimeError, TickBudget}; @@ -1304,6 +1405,232 @@ mod tests { assert!(runtime.transport().sent.is_empty()); } + #[test] + fn leaf_hook_data_reduces_to_parent_transport_frame() { + let parent = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::registered( + parent, + ConnectionDirection::Parent, + vec![], + ConnectionGeneration::INITIAL, + )); + + let leaf_name = "org.example.v1.echo"; + let endpoint = ProtocolEndpoint::new( + vec![String::from("agent")], + Some(vec![]), + vec![], + vec![LeafSpec { + name: String::from(leaf_name), + procedures: vec![String::from("org.example.v1.echo.invoke")], + }], + ); + let frame = encode_packet( + &PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent")], + dst_leaf: Some(String::from(leaf_name)), + hook_id: None, + }, + &CallMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![9], + response_hook: Some(HookTarget { + hook_id: 7, + return_path: vec![], + }), + }, + ) + .expect("frame encodes"); + let calls = Rc::new(RefCell::new(Vec::new())); + let mut runtime = NodeRuntime::new( + EndpointState::new(endpoint), + connections, + RecordingTransport::default(), + ); + runtime.register_leaf(RecordingLeaf::new(leaf_name, Rc::clone(&calls))); + + runtime + .receive_frame(parent, frame) + .expect("frame processes"); + runtime.dispatch_local_effects().expect("dispatch succeeds"); + let reduced = runtime.reduce_leaf_actions().expect("hook data reduces"); + let outcome = runtime.tick(TickBudget::default()).expect("tick flushes"); + + assert_eq!(reduced, 1); + assert!(runtime.leaf_actions().is_empty()); + assert_eq!(outcome.outbound_frames, 1); + assert_eq!(runtime.transport().sent.len(), 1); + assert_eq!(runtime.transport().sent[0].0, parent); + let parsed = decode_frame(&runtime.transport().sent[0].1).expect("sent data decodes"); + let header = parsed.header(); + assert_eq!(header.packet_type, PacketType::Data); + assert_eq!(header.src_path, [String::from("agent")]); + assert_eq!(header.dst_path, Vec::::new()); + assert_eq!(header.hook_id, Some(7)); + let data = parsed.deserialize_data().expect("payload is data"); + assert_eq!(data.procedure_id, "org.example.v1.echo.invoke"); + assert_eq!(data.data, [1, 2, 3]); + assert!(data.end_hook); + } + + #[test] + fn unsupported_leaf_action_is_reported_and_retained() { + let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.echo")); + let mut runtime = NodeRuntime::new( + EndpointState::new(ProtocolEndpoint::new( + vec![String::from("agent")], + Some(vec![]), + vec![], + vec![], + )), + Connections::new(), + RecordingTransport::default(), + ); + runtime.leaf_actions.push(( + leaf_id.clone(), + LeafAction::SendCall(OutboundCall { + dst_path: vec![], + dst_leaf: None, + procedure_id: String::from("org.example.v1.echo.invoke"), + payload: vec![], + expects_response: false, + }), + )); + runtime.leaf_actions.push(( + leaf_id.clone(), + LeafAction::Connection(ConnectionAction::Unregister { + connection: ConnectionId::new(99), + }), + )); + + let error = runtime + .reduce_leaf_actions() + .expect_err("unsupported action is reported"); + + assert!(matches!( + error, + NodeRuntimeError::UnsupportedLeafAction { ref leaf_id, action } + if leaf_id.as_str() == "org.example.v1.echo" && action == "SendCall" + )); + assert_eq!(runtime.leaf_actions().len(), 2); + assert!(matches!( + runtime.leaf_actions()[0].1, + LeafAction::SendCall(_) + )); + assert!(matches!( + runtime.leaf_actions()[1].1, + LeafAction::Connection(_) + )); + } + + #[test] + fn failed_leaf_hook_data_routing_retains_failed_and_remaining_actions() { + let parent = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::registered( + parent, + ConnectionDirection::Parent, + vec![], + ConnectionGeneration::INITIAL, + )); + + let leaf_name = "org.example.v1.echo"; + let endpoint = ProtocolEndpoint::new( + vec![String::from("agent")], + Some(vec![]), + vec![], + vec![LeafSpec { + name: String::from(leaf_name), + procedures: vec![String::from("org.example.v1.echo.invoke")], + }], + ); + let frame = encode_packet( + &PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent")], + dst_leaf: Some(String::from(leaf_name)), + hook_id: None, + }, + &CallMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![], + response_hook: Some(HookTarget { + hook_id: 7, + return_path: vec![], + }), + }, + ) + .expect("frame encodes"); + let calls = Rc::new(RefCell::new(Vec::new())); + let mut runtime = NodeRuntime::new( + EndpointState::new(endpoint), + connections, + RecordingTransport::default(), + ); + runtime.register_leaf(RecordingLeaf::new(leaf_name, Rc::clone(&calls))); + runtime + .receive_frame(parent, frame) + .expect("frame processes and activates response hook"); + runtime.dispatch_local_effects().expect("dispatch succeeds"); + runtime.leaf_actions.push(( + crate::leaf::LeafId::new(String::from(leaf_name)), + LeafAction::FailHook { + hook_id: 7, + fault: ProtocolFault::INTERNAL_ERROR, + }, + )); + runtime + .connections + .get_mut(parent) + .expect("parent connection exists") + .set_state(ConnectionState::Connected { + generation: ConnectionGeneration::INITIAL, + }); + + let error = runtime + .reduce_leaf_actions() + .expect_err("missing route connection is reported"); + + assert!(matches!(error, NodeRuntimeError::MissingRouteConnection)); + assert_eq!(runtime.leaf_actions().len(), 2); + assert!(matches!( + runtime.leaf_actions()[0].1, + LeafAction::SendHookData(_) + )); + assert!(matches!( + runtime.leaf_actions()[1].1, + LeafAction::FailHook { .. } + )); + + runtime + .register_parent_connection(parent, vec![], ConnectionGeneration::INITIAL) + .expect("parent route restored"); + let retry_error = runtime + .reduce_leaf_actions() + .expect_err("later unsupported action is still reported"); + + assert!(matches!( + retry_error, + NodeRuntimeError::UnsupportedLeafAction { + action: "FailHook", + .. + } + )); + assert_eq!(runtime.leaf_actions().len(), 1); + assert!(matches!( + runtime.leaf_actions()[0].1, + LeafAction::FailHook { .. } + )); + assert!(matches!( + runtime.effects()[0], + RuntimeEffect::SendFrame { connection, .. } if connection == parent + )); + } + #[test] fn unmatched_local_event_remains_queued() { let mut runtime = NodeRuntime::new(