From da9166daf0a7ff05c4b621b124681f63b5ce48ce Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Sat, 9 May 2026 13:53:49 -0600 Subject: [PATCH] Reduce leaf fail hook actions --- API.md | 17 +- .../src/protocol/tree/endpoint/hooks.rs | 25 ++ unshell-protocol/src/protocol/tree/hook.rs | 19 + unshell-runtime/src/node/packet.rs | 19 +- unshell-runtime/src/node/runtime.rs | 354 ++++++++++++++++-- 5 files changed, 395 insertions(+), 39 deletions(-) diff --git a/API.md b/API.md index 65d4814..4ab300b 100644 --- a/API.md +++ b/API.md @@ -329,10 +329,9 @@ connection closes or unregisters ## Known Gaps In The Current Branch -- `LeafAction::SendCall` and `LeafAction::SendHookData` are reduced by - `NodeRuntime`; hook fault and connection action variants are still unsupported +- `LeafAction::SendCall`, `LeafAction::SendHookData`, and `LeafAction::FailHook` + are reduced by `NodeRuntime`; connection action variants are still unsupported and must remain queued when encountered. -- Hook fault actions through the runtime are not implemented. - Connection actions through the runtime are not implemented. - Disconnect does not yet clean hooks, sessions, route state, and queued effects. - Child ingress still allocates because the existing `Ingress::Child` owns a @@ -340,13 +339,13 @@ connection closes or unregisters ## Next Implementation Slice -Implement the next narrow leaf-action path: +Implement the next narrow connection-action path: -1. Apply queued `LeafAction::FailHook` through endpoint packet state. -2. Preserve pending/active hook cleanup semantics without dropping unprocessed - actions. -3. Keep connection registration actions queued until runtime-owned disconnect +1. Keep connection registration actions queued until runtime-owned disconnect cleanup can update connections, routes, hooks, and queued effects atomically. +2. Add connection registration reduction only when route, connection, hook, and + queued-effect cleanup can be updated as one runtime transaction. +3. Preserve FIFO retry semantics for unsupported or failed connection actions. That slice should continue the one-variant-at-a-time reducer approach without -implementing connection actions early. +implementing disconnect cleanup early. diff --git a/unshell-protocol/src/protocol/tree/endpoint/hooks.rs b/unshell-protocol/src/protocol/tree/endpoint/hooks.rs index 81e9512..de6353f 100644 --- a/unshell-protocol/src/protocol/tree/endpoint/hooks.rs +++ b/unshell-protocol/src/protocol/tree/endpoint/hooks.rs @@ -10,6 +10,31 @@ use super::super::{HookKey, RouteDecision}; use super::core::{EndpointError, EndpointOutcome, Ingress, LocalEvent, ProtocolEndpoint}; impl ProtocolEndpoint { + /// Returns the route that would carry a locally generated hook fault for `hook_id`. + /// + /// The method does not mutate hook state. Runtime owners use it to preflight transport + /// availability before calling [`fail_hook`](Self::fail_hook), which removes hook state when + /// the fault is emitted. + #[must_use] + pub fn hook_fault_route(&self, hook_id: u64) -> Option { + self.hooks + .key_for_hook_id(hook_id) + .map(|key| self.decide_route(&key.return_path)) + } + + /// Terminates a locally known hook with a protocol fault. + /// + /// Unknown hooks are treated as an intentional drop. Known hooks are removed before the fault + /// is routed so no further local data can be emitted after the terminal fault. + pub fn fail_hook( + &mut self, + hook_id: u64, + fault: ProtocolFault, + ) -> Result { + let key = self.hooks.key_for_hook_id(hook_id); + self.emit_fault_if_possible(key, fault) + } + pub(crate) fn emit_fault_if_possible( &mut self, key: Option, diff --git a/unshell-protocol/src/protocol/tree/hook.rs b/unshell-protocol/src/protocol/tree/hook.rs index 7b7b5f2..b099dd2 100644 --- a/unshell-protocol/src/protocol/tree/hook.rs +++ b/unshell-protocol/src/protocol/tree/hook.rs @@ -324,6 +324,25 @@ impl HookTable { Some(active) } + /// Returns a hook key matching `hook_id`, preferring active hooks over pending hooks. + /// + /// This is intentionally a narrow bridge for current leaf APIs that identify a hook only by + /// id. Hook ids are protocol-scoped by host path, so future APIs should pass the full + /// [`HookKey`] when leaf dispatch exposes it. + #[must_use] + pub fn key_for_hook_id(&self, hook_id: u64) -> Option { + self.active + .keys() + .find(|key| key.hook_id == hook_id) + .cloned() + .or_else(|| { + self.pending + .keys() + .find(|key| key.hook_id == hook_id) + .cloned() + }) + } + /// Returns the pending hook for `key`, if present. /// /// # Example diff --git a/unshell-runtime/src/node/packet.rs b/unshell-runtime/src/node/packet.rs index b1d3eaa..9502bc8 100644 --- a/unshell-runtime/src/node/packet.rs +++ b/unshell-runtime/src/node/packet.rs @@ -6,8 +6,8 @@ //! handles, does not dispatch leaves, and does not make admission decisions. use unshell_protocol::{ - CallMessage, FrameBytes, PacketHeader, PacketType, tree::Endpoint as ProtocolEndpointTrait, - validate_call, validate_header, validate_procedure_id, + CallMessage, FrameBytes, PacketHeader, PacketType, ProtocolFault, + tree::Endpoint as ProtocolEndpointTrait, validate_call, validate_header, validate_procedure_id, }; pub use unshell_protocol::tree::{ @@ -90,6 +90,21 @@ impl EndpointState { .send_data(dst_path, hook_id, procedure_id, data, end_hook) } + /// Returns the route that would carry a terminal hook fault, if the hook is known. + #[must_use] + pub fn hook_fault_route(&self, hook_id: u64) -> Option { + self.endpoint.hook_fault_route(hook_id) + } + + /// Terminates a known hook with a protocol fault, or drops unknown hook ids. + pub fn fail_hook( + &mut self, + hook_id: u64, + fault: ProtocolFault, + ) -> Result { + self.endpoint.fail_hook(hook_id, fault) + } + /// Builds and routes one call packet through the wrapped endpoint state. pub fn send_call( &mut self, diff --git a/unshell-runtime/src/node/runtime.rs b/unshell-runtime/src/node/runtime.rs index 605175a..e6ca326 100644 --- a/unshell-runtime/src/node/runtime.rs +++ b/unshell-runtime/src/node/runtime.rs @@ -3,8 +3,7 @@ //! This first slice owns transport and connection metadata, derives ingress from //! registered connections, delegates packet invariants to [`EndpointState`], and //! queues concrete runtime effects. Leaf action reduction is intentionally -//! narrow: this slice only turns outbound calls and hook-data replies into -//! endpoint outcomes. +//! narrow and grows one action family at a time. use crate::alloc::{string::String, vec::Vec}; use crate::connections::{ @@ -544,10 +543,10 @@ where /// Reduces queued leaf actions through endpoint packet state. /// - /// [`LeafAction::SendCall`] and [`LeafAction::SendHookData`] are implemented - /// in this slice. Unsupported actions stop reduction and remain queued with - /// all later actions so callers can retry after a future runtime gains - /// support. + /// [`LeafAction::SendCall`], [`LeafAction::SendHookData`], and + /// [`LeafAction::FailHook`] are implemented in this slice. Unsupported + /// actions stop reduction and remain queued with all later actions so callers + /// can retry after a future runtime gains support. pub fn reduce_leaf_actions(&mut self) -> Result> { let mut reduced = 0usize; let mut retained = Vec::new(); @@ -649,6 +648,40 @@ where } reduced += 1; } + LeafAction::FailHook { hook_id, fault } => { + let original_action = LeafAction::FailHook { hook_id, fault }; + if let Some(route) = self.endpoint.hook_fault_route(hook_id) + && (matches!(route, RouteDecision::Drop) + || (route_requires_connection(route) + && self.connection_for_route(route).is_none())) + { + retained.push((leaf_id, original_action)); + retained.extend(pending); + self.leaf_actions = retained; + return Err(NodeRuntimeError::MissingRouteConnection); + } + + let endpoint_checkpoint = self.endpoint.clone(); + let outcome = match self.endpoint.fail_hook(hook_id, fault) { + Ok(outcome) => outcome, + Err(error) => { + self.endpoint = endpoint_checkpoint; + retained.push((leaf_id, original_action)); + retained.extend(pending); + self.leaf_actions = retained; + return Err(NodeRuntimeError::Endpoint(error)); + } + }; + + if let Err(error) = self.apply_outcome(outcome) { + self.endpoint = endpoint_checkpoint; + retained.push((leaf_id, original_action)); + retained.extend(pending); + self.leaf_actions = retained; + return Err(error); + } + reduced += 1; + } unsupported => { let action_name = leaf_action_name(&unsupported); retained.push((leaf_id.clone(), unsupported)); @@ -825,6 +858,7 @@ mod tests { use crate::transport::Transport; use unshell_protocol::tree::{ ChildRoute, EndpointError, IncomingCall, LeafSpec, LocalEvent, ProtocolEndpoint, + RouteDecision, }; use unshell_protocol::{ CallMessage, FrameBytes, HookTarget, PacketHeader, PacketType, ProtocolFault, decode_frame, @@ -1536,6 +1570,82 @@ mod tests { assert!(data.end_hook); } + #[test] + fn leaf_fail_hook_reduces_to_parent_fault_frame() { + let parent = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::registered( + parent, + ConnectionDirection::Parent, + vec![], + ConnectionGeneration::INITIAL, + )); + + let leaf_name = "org.example.v1.echo"; + let endpoint = ProtocolEndpoint::new( + vec![String::from("agent")], + Some(vec![]), + vec![], + vec![LeafSpec { + name: String::from(leaf_name), + procedures: vec![String::from("org.example.v1.echo.invoke")], + }], + ); + let frame = encode_packet( + &PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent")], + dst_leaf: Some(String::from(leaf_name)), + hook_id: None, + }, + &CallMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![9], + response_hook: Some(HookTarget { + hook_id: 7, + return_path: vec![], + }), + }, + ) + .expect("frame encodes"); + let calls = Rc::new(RefCell::new(Vec::new())); + let mut runtime = NodeRuntime::new( + EndpointState::new(endpoint), + connections, + RecordingTransport::default(), + ); + runtime.register_leaf(RecordingLeaf::new(leaf_name, Rc::clone(&calls))); + runtime + .receive_frame(parent, frame) + .expect("call activates hook"); + runtime.dispatch_local_effects().expect("dispatch succeeds"); + runtime.leaf_actions.clear(); + runtime.leaf_actions.push(( + crate::leaf::LeafId::new(String::from(leaf_name)), + LeafAction::FailHook { + hook_id: 7, + fault: ProtocolFault::INTERNAL_ERROR, + }, + )); + + let reduced = runtime.reduce_leaf_actions().expect("fault reduces"); + let outcome = runtime.tick(TickBudget::default()).expect("tick flushes"); + + assert_eq!(reduced, 1); + assert!(runtime.leaf_actions().is_empty()); + assert_eq!(outcome.outbound_frames, 1); + assert_eq!(runtime.transport().sent.len(), 1); + assert_eq!(runtime.transport().sent[0].0, parent); + let parsed = decode_frame(&runtime.transport().sent[0].1).expect("fault decodes"); + assert_eq!(parsed.header().packet_type, PacketType::Fault); + assert_eq!(parsed.header().src_path, [String::from("agent")]); + assert_eq!(parsed.header().dst_path, Vec::::new()); + assert_eq!(parsed.header().hook_id, Some(7)); + let fault = parsed.deserialize_fault().expect("payload is fault"); + assert_eq!(fault.fault, ProtocolFault::INTERNAL_ERROR); + } + #[test] fn leaf_send_call_reduces_to_child_transport_frame() { let child = ConnectionId::new(1); @@ -1810,7 +1920,7 @@ mod tests { } #[test] - fn unsupported_leaf_action_is_reported_and_retained() { + fn unsupported_connection_action_is_reported_and_retained() { let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.echo")); let mut runtime = NodeRuntime::new( EndpointState::new(ProtocolEndpoint::new( @@ -1822,13 +1932,6 @@ mod tests { Connections::new(), RecordingTransport::default(), ); - runtime.leaf_actions.push(( - leaf_id.clone(), - LeafAction::FailHook { - hook_id: 7, - fault: ProtocolFault::INTERNAL_ERROR, - }, - )); runtime.leaf_actions.push(( leaf_id.clone(), LeafAction::Connection(ConnectionAction::Unregister { @@ -1843,15 +1946,11 @@ mod tests { assert!(matches!( error, NodeRuntimeError::UnsupportedLeafAction { ref leaf_id, action } - if leaf_id.as_str() == "org.example.v1.echo" && action == "FailHook" + if leaf_id.as_str() == "org.example.v1.echo" && action == "Connection" )); - assert_eq!(runtime.leaf_actions().len(), 2); + assert_eq!(runtime.leaf_actions().len(), 1); assert!(matches!( runtime.leaf_actions()[0].1, - LeafAction::FailHook { .. } - )); - assert!(matches!( - runtime.leaf_actions()[1].1, LeafAction::Connection(_) )); } @@ -1939,26 +2038,225 @@ mod tests { runtime .register_parent_connection(parent, vec![], ConnectionGeneration::INITIAL) .expect("parent route restored"); - let retry_error = runtime + let reduced = runtime .reduce_leaf_actions() - .expect_err("later unsupported action is still reported"); + .expect("remaining supported actions reduce"); + + assert_eq!(reduced, 2); + assert!(runtime.leaf_actions().is_empty()); + assert!(matches!( + runtime.effects()[0], + RuntimeEffect::SendFrame { connection, .. } if connection == parent + )); + assert!(matches!( + runtime.effects()[1], + RuntimeEffect::SendFrame { connection, .. } if connection == parent + )); + } + + #[test] + fn missing_fail_hook_route_preserves_action_and_hook_for_retry() { + let parent = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::registered( + parent, + ConnectionDirection::Parent, + vec![], + ConnectionGeneration::INITIAL, + )); + + let leaf_name = "org.example.v1.echo"; + let endpoint = ProtocolEndpoint::new( + vec![String::from("agent")], + Some(vec![]), + vec![], + vec![LeafSpec { + name: String::from(leaf_name), + procedures: vec![String::from("org.example.v1.echo.invoke")], + }], + ); + let frame = encode_packet( + &PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent")], + dst_leaf: Some(String::from(leaf_name)), + hook_id: None, + }, + &CallMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![], + response_hook: Some(HookTarget { + hook_id: 7, + return_path: vec![], + }), + }, + ) + .expect("frame encodes"); + let calls = Rc::new(RefCell::new(Vec::new())); + let mut runtime = NodeRuntime::new( + EndpointState::new(endpoint), + connections, + RecordingTransport::default(), + ); + runtime.register_leaf(RecordingLeaf::new(leaf_name, Rc::clone(&calls))); + runtime + .receive_frame(parent, frame) + .expect("call activates hook"); + runtime.dispatch_local_effects().expect("dispatch succeeds"); + runtime.leaf_actions.clear(); + runtime.leaf_actions.push(( + crate::leaf::LeafId::new(String::from(leaf_name)), + LeafAction::FailHook { + hook_id: 7, + fault: ProtocolFault::INTERNAL_ERROR, + }, + )); + runtime.leaf_actions.push(( + crate::leaf::LeafId::new(String::from(leaf_name)), + LeafAction::Connection(ConnectionAction::Unregister { connection: parent }), + )); + runtime + .connections + .get_mut(parent) + .expect("parent connection exists") + .set_state(ConnectionState::Connected { + generation: ConnectionGeneration::INITIAL, + }); + + let error = runtime + .reduce_leaf_actions() + .expect_err("missing route connection is reported"); + + assert!(matches!(error, NodeRuntimeError::MissingRouteConnection)); + assert_eq!(runtime.leaf_actions().len(), 2); + assert!(matches!( + runtime.leaf_actions()[0].1, + LeafAction::FailHook { .. } + )); + assert!(matches!( + runtime.leaf_actions()[1].1, + LeafAction::Connection(_) + )); + assert!(runtime.effects().is_empty()); + + runtime + .register_parent_connection(parent, vec![], ConnectionGeneration::INITIAL) + .expect("parent route restored"); + let error = runtime + .reduce_leaf_actions() + .expect_err("retry faults hook then stops at connection action"); + let outcome = runtime.tick(TickBudget::default()).expect("tick flushes"); assert!(matches!( - retry_error, + error, NodeRuntimeError::UnsupportedLeafAction { - action: "FailHook", + action: "Connection", .. } )); assert_eq!(runtime.leaf_actions().len(), 1); assert!(matches!( runtime.leaf_actions()[0].1, - LeafAction::FailHook { .. } + LeafAction::Connection(_) )); - assert!(matches!( - runtime.effects()[0], - RuntimeEffect::SendFrame { connection, .. } if connection == parent + assert_eq!(outcome.outbound_frames, 1); + let parsed = decode_frame(&runtime.transport().sent[0].1).expect("fault decodes"); + assert_eq!(parsed.header().packet_type, PacketType::Fault); + assert_eq!(parsed.header().hook_id, Some(7)); + } + + #[test] + fn dropped_fail_hook_route_preserves_action_and_hook_for_retry() { + let parent = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::registered( + parent, + ConnectionDirection::Parent, + vec![], + ConnectionGeneration::INITIAL, )); + + let leaf_name = "org.example.v1.echo"; + let endpoint = ProtocolEndpoint::new( + vec![String::from("agent")], + Some(vec![]), + vec![], + vec![LeafSpec { + name: String::from(leaf_name), + procedures: vec![String::from("org.example.v1.echo.invoke")], + }], + ); + let frame = encode_packet( + &PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent")], + dst_leaf: Some(String::from(leaf_name)), + hook_id: None, + }, + &CallMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![], + response_hook: Some(HookTarget { + hook_id: 7, + return_path: vec![], + }), + }, + ) + .expect("frame encodes"); + let calls = Rc::new(RefCell::new(Vec::new())); + let mut runtime = NodeRuntime::new( + EndpointState::new(endpoint), + connections, + RecordingTransport::default(), + ); + runtime.register_leaf(RecordingLeaf::new(leaf_name, Rc::clone(&calls))); + runtime + .receive_frame(parent, frame) + .expect("call activates hook with dropped return path"); + assert!(matches!(runtime.effects()[0], RuntimeEffect::Local(_))); + runtime.dispatch_local_effects().expect("dispatch succeeds"); + runtime + .endpoint + .endpoint_mut() + .set_parent_path(None) + .expect("parent route removes"); + assert_eq!( + runtime.endpoint.hook_fault_route(7), + Some(RouteDecision::Drop) + ); + runtime.leaf_actions.clear(); + runtime.leaf_actions.push(( + crate::leaf::LeafId::new(String::from(leaf_name)), + LeafAction::FailHook { + hook_id: 7, + fault: ProtocolFault::INTERNAL_ERROR, + }, + )); + + let error = runtime + .reduce_leaf_actions() + .expect_err("dropped fault route is reported before mutation"); + + assert!(matches!(error, NodeRuntimeError::MissingRouteConnection)); + assert_eq!(runtime.leaf_actions().len(), 1); + assert!(runtime.effects().is_empty()); + + runtime + .register_parent_connection(parent, vec![], ConnectionGeneration::INITIAL) + .expect("parent route restored"); + let reduced = runtime + .reduce_leaf_actions() + .expect("retained fault retries after route is restored"); + let outcome = runtime.tick(TickBudget::default()).expect("tick flushes"); + + assert_eq!(reduced, 1); + assert_eq!(outcome.outbound_frames, 1); + assert_eq!(runtime.transport().sent[0].0, parent); + let parsed = decode_frame(&runtime.transport().sent[0].1).expect("fault decodes"); + assert_eq!(parsed.header().packet_type, PacketType::Fault); + assert_eq!(parsed.header().hook_id, Some(7)); } #[test]