Reduce leaf hook data actions

This commit is contained in:
Michael Mikovsky
2026-05-09 13:25:16 -06:00
parent 97f3e305bb
commit a68e86ef6d
4 changed files with 399 additions and 37 deletions
+16 -9
View File
@@ -168,6 +168,10 @@ impl<T, LeafError> NodeRuntime<T, LeafError> {
pub fn dispatch_local_effects(&mut self) -> Result<usize, LeafDispatchError<LeafError>>; pub fn dispatch_local_effects(&mut self) -> Result<usize, LeafDispatchError<LeafError>>;
pub fn reduce_leaf_actions(&mut self) -> Result<usize, NodeRuntimeError<T::Error>>
where
T: Transport;
pub fn drain_leaf_actions(&mut self) -> impl Iterator<Item = (LeafId, LeafAction)>; pub fn drain_leaf_actions(&mut self) -> impl Iterator<Item = (LeafId, LeafAction)>;
} }
@@ -325,21 +329,24 @@ connection closes or unregisters
## Known Gaps In The Current Branch ## Known Gaps In The Current Branch
- `LeafAction` values are queued by `LeafContext` but not yet applied by - `LeafAction::SendHookData` is reduced by `NodeRuntime`; other action variants
`NodeRuntime`. are still unsupported and must remain queued when encountered.
- Local outbound calls through the runtime are not implemented. - Local outbound calls through the runtime are not implemented.
- Hook fault actions through the runtime are not implemented.
- Connection actions through the runtime are not implemented.
- Disconnect does not yet clean hooks, sessions, route state, and queued effects. - Disconnect does not yet clean hooks, sessions, route state, and queued effects.
- Child ingress still allocates because the existing `Ingress::Child` owns a - Child ingress still allocates because the existing `Ingress::Child` owns a
`Vec<String>`. `Vec<String>`.
## Next Implementation Slice ## Next Implementation Slice
Implement one narrow end-to-end path: Implement the next narrow leaf-action path:
1. Apply queued `LeafAction::SendHookData` through endpoint packet state. 1. Apply queued `LeafAction::SendCall` through endpoint packet state.
2. Route the produced frame through `Transport`. 2. Preserve hook reservation and routing failure semantics without dropping
3. Add tests proving a leaf reply is framed and unprocessed actions.
sent through a registered connection. 3. Add tests proving a local leaf can initiate an outbound call and receive the
response through the existing dispatch path.
That slice forces the real architecture to work without overbuilding the rest of That slice should continue the one-variant-at-a-time reducer approach without
the migration. implementing hook faults or connection actions early.
@@ -133,10 +133,19 @@ impl ProtocolEndpoint {
Ok(EndpointOutcome::Dropped) Ok(EndpointOutcome::Dropped)
} }
pub(crate) fn decide_route(&self, dst_path: &[String]) -> RouteDecision { /// Returns the current route decision for an absolute destination path.
///
/// Runtime owners use this to validate transport availability before invoking
/// endpoint operations that also mutate hook state.
#[must_use]
pub fn route_decision(&self, dst_path: &[String]) -> RouteDecision {
self.routing.route(dst_path) self.routing.route(dst_path)
} }
pub(crate) fn decide_route(&self, dst_path: &[String]) -> RouteDecision {
self.route_decision(dst_path)
}
/// Returns whether one `src_path` is topologically valid for the ingress side that delivered /// Returns whether one `src_path` is topologically valid for the ingress side that delivered
/// the frame. /// the frame.
/// ///
+19
View File
@@ -68,6 +68,25 @@ impl EndpointState {
&mut self.endpoint &mut self.endpoint
} }
/// Returns the endpoint's current route decision for an absolute path.
#[must_use]
pub fn route_decision(&self, dst_path: &[alloc::string::String]) -> RouteDecision {
self.endpoint.route_decision(dst_path)
}
/// Builds and routes one hook-data packet through the wrapped endpoint state.
pub fn send_hook_data(
&mut self,
dst_path: alloc::vec::Vec<alloc::string::String>,
hook_id: u64,
procedure_id: alloc::string::String,
data: alloc::vec::Vec<u8>,
end_hook: bool,
) -> Result<EndpointOutcome, EndpointError> {
self.endpoint
.send_data(dst_path, hook_id, procedure_id, data, end_hook)
}
/// Consumes the wrapper and returns the underlying protocol endpoint. /// Consumes the wrapper and returns the underlying protocol endpoint.
#[must_use] #[must_use]
pub fn into_endpoint(self) -> ProtocolEndpoint { pub fn into_endpoint(self) -> ProtocolEndpoint {
+352 -25
View File
@@ -2,8 +2,8 @@
//! //!
//! This first slice owns transport and connection metadata, derives ingress from //! This first slice owns transport and connection metadata, derives ingress from
//! registered connections, delegates packet invariants to [`EndpointState`], and //! registered connections, delegates packet invariants to [`EndpointState`], and
//! queues concrete runtime effects. Leaf action application is intentionally not //! queues concrete runtime effects. Leaf action reduction is intentionally
//! implemented in this slice. //! narrow: this slice only turns hook-data replies into endpoint outcomes.
use crate::alloc::{string::String, vec::Vec}; use crate::alloc::{string::String, vec::Vec};
use crate::connections::{ use crate::connections::{
@@ -65,6 +65,13 @@ pub enum NodeRuntimeError<TransportError> {
Endpoint(EndpointError), Endpoint(EndpointError),
/// Transport send, receive, or flush failed. /// Transport send, receive, or flush failed.
Transport(TransportError), Transport(TransportError),
/// A queued leaf action is not implemented by this runtime slice.
UnsupportedLeafAction {
/// Leaf id that requested the action.
leaf_id: LeafId,
/// Stable action name for diagnostics.
action: &'static str,
},
} }
/// Error returned when a leaf callback rejects a local event. /// Error returned when a leaf callback rejects a local event.
@@ -107,6 +114,13 @@ where
Self::MissingRouteConnection => f.write_str("route has no registered connection"), Self::MissingRouteConnection => f.write_str("route has no registered connection"),
Self::Endpoint(error) => write!(f, "{error}"), Self::Endpoint(error) => write!(f, "{error}"),
Self::Transport(error) => write!(f, "{error}"), Self::Transport(error) => write!(f, "{error}"),
Self::UnsupportedLeafAction { leaf_id, action } => {
write!(
f,
"leaf {} requested unsupported action {action}",
leaf_id.as_str()
)
}
} }
} }
} }
@@ -343,9 +357,6 @@ impl<T, LeafError> NodeRuntime<T, LeafError> {
} }
/// Returns leaf actions queued by dispatched callbacks. /// Returns leaf actions queued by dispatched callbacks.
///
/// These actions are intentionally only retained here; reducing them into
/// endpoint packets or connection changes belongs to a later runtime slice.
#[must_use] #[must_use]
pub fn leaf_actions(&self) -> &[(LeafId, LeafAction)] { pub fn leaf_actions(&self) -> &[(LeafId, LeafAction)] {
&self.leaf_actions &self.leaf_actions
@@ -530,29 +541,76 @@ where
self.apply_outcome(outcome) self.apply_outcome(outcome)
} }
fn apply_outcome( /// Reduces queued leaf actions through endpoint packet state.
&mut self, ///
outcome: EndpointOutcome, /// Only [`LeafAction::SendHookData`] is implemented in this slice. Unsupported
) -> Result<(), NodeRuntimeError<T::Error>> { /// actions stop reduction and remain queued with all later actions so callers
match outcome { /// can retry after a future runtime gains support.
EndpointOutcome::Forward { route, frame } => self.queue_forward(route, frame), pub fn reduce_leaf_actions(&mut self) -> Result<usize, NodeRuntimeError<T::Error>> {
EndpointOutcome::Local(event) => { let mut reduced = 0usize;
self.effects.push(RuntimeEffect::Local(event)); let mut retained = Vec::new();
Ok(()) let mut pending = core::mem::take(&mut self.leaf_actions).into_iter();
while let Some((leaf_id, action)) = pending.next() {
match action {
LeafAction::SendHookData(data) => {
let original_action = LeafAction::SendHookData(data.clone());
let route = self.endpoint.route_decision(&data.dst_path);
if route_requires_connection(route)
&& self.connection_for_route(route).is_none()
{
retained.push((leaf_id, original_action));
retained.extend(pending);
self.leaf_actions = retained;
return Err(NodeRuntimeError::MissingRouteConnection);
} }
EndpointOutcome::Dropped => {
self.effects.push(RuntimeEffect::Dropped); let outcome = match self.endpoint.send_hook_data(
Ok(()) data.dst_path,
data.hook_id,
data.procedure_id,
data.payload,
data.end_hook,
) {
Ok(outcome) => outcome,
Err(error) => {
retained.push((leaf_id, original_action));
retained.extend(pending);
self.leaf_actions = retained;
return Err(NodeRuntimeError::Endpoint(error));
}
};
if let Err(error) = self.apply_outcome(outcome) {
retained.push((leaf_id, original_action));
retained.extend(pending);
self.leaf_actions = retained;
return Err(error);
}
reduced += 1;
}
unsupported => {
let action_name = leaf_action_name(&unsupported);
retained.push((leaf_id.clone(), unsupported));
retained.extend(pending);
self.leaf_actions = retained;
return Err(NodeRuntimeError::UnsupportedLeafAction {
leaf_id,
action: action_name,
});
} }
} }
} }
fn queue_forward( self.leaf_actions = retained;
&mut self, Ok(reduced)
}
fn connection_for_route(
&self,
route: RouteDecision, route: RouteDecision,
frame: FrameBytes, ) -> Option<(ConnectionId, ConnectionGeneration)> {
) -> Result<(), NodeRuntimeError<T::Error>> { match route {
let (connection, generation) = match route {
RouteDecision::Parent => self RouteDecision::Parent => self
.connections .connections
.registered_by_direction(ConnectionDirection::Parent) .registered_by_direction(ConnectionDirection::Parent)
@@ -582,6 +640,32 @@ where
}), }),
RouteDecision::Local | RouteDecision::Drop => None, RouteDecision::Local | RouteDecision::Drop => None,
} }
}
fn apply_outcome(
&mut self,
outcome: EndpointOutcome,
) -> Result<(), NodeRuntimeError<T::Error>> {
match outcome {
EndpointOutcome::Forward { route, frame } => self.queue_forward(route, frame),
EndpointOutcome::Local(event) => {
self.effects.push(RuntimeEffect::Local(event));
Ok(())
}
EndpointOutcome::Dropped => {
self.effects.push(RuntimeEffect::Dropped);
Ok(())
}
}
}
fn queue_forward(
&mut self,
route: RouteDecision,
frame: FrameBytes,
) -> Result<(), NodeRuntimeError<T::Error>> {
let (connection, generation) = self
.connection_for_route(route)
.ok_or(NodeRuntimeError::MissingRouteConnection)?; .ok_or(NodeRuntimeError::MissingRouteConnection)?;
self.effects.push(RuntimeEffect::SendFrame { self.effects.push(RuntimeEffect::SendFrame {
@@ -649,6 +733,19 @@ fn local_event_leaf_name(event: &LocalEvent) -> Option<&str> {
} }
} }
fn leaf_action_name(action: &LeafAction) -> &'static str {
match action {
LeafAction::SendCall(_) => "SendCall",
LeafAction::SendHookData(_) => "SendHookData",
LeafAction::FailHook { .. } => "FailHook",
LeafAction::Connection(_) => "Connection",
}
}
const fn route_requires_connection(route: RouteDecision) -> bool {
matches!(route, RouteDecision::Parent | RouteDecision::Child(_))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use core::cell::RefCell; use core::cell::RefCell;
@@ -659,16 +756,20 @@ mod tests {
use crate::alloc::vec; use crate::alloc::vec;
use crate::alloc::vec::Vec; use crate::alloc::vec::Vec;
use crate::connections::{ use crate::connections::{
Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, Connections, Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, ConnectionState,
Connections,
}; };
use crate::context::{LeafAction, OutboundHookData}; use crate::context::{ConnectionAction, LeafAction, OutboundCall, OutboundHookData};
use crate::effects::RuntimeEffect; use crate::effects::RuntimeEffect;
use crate::leaf::{Leaf, LeafCapabilities, LeafPermissions}; use crate::leaf::{Leaf, LeafCapabilities, LeafPermissions};
use crate::transport::Transport; use crate::transport::Transport;
use unshell_protocol::tree::{ use unshell_protocol::tree::{
ChildRoute, EndpointError, IncomingCall, LeafSpec, LocalEvent, ProtocolEndpoint, ChildRoute, EndpointError, IncomingCall, LeafSpec, LocalEvent, ProtocolEndpoint,
}; };
use unshell_protocol::{CallMessage, FrameBytes, PacketHeader, PacketType, encode_packet}; use unshell_protocol::{
CallMessage, FrameBytes, HookTarget, PacketHeader, PacketType, ProtocolFault, decode_frame,
encode_packet,
};
use super::{EndpointState, NodeRuntime, NodeRuntimeError, TickBudget}; use super::{EndpointState, NodeRuntime, NodeRuntimeError, TickBudget};
@@ -1304,6 +1405,232 @@ mod tests {
assert!(runtime.transport().sent.is_empty()); assert!(runtime.transport().sent.is_empty());
} }
#[test]
fn leaf_hook_data_reduces_to_parent_transport_frame() {
let parent = ConnectionId::new(1);
let mut connections = Connections::new();
connections.push(Connection::registered(
parent,
ConnectionDirection::Parent,
vec![],
ConnectionGeneration::INITIAL,
));
let leaf_name = "org.example.v1.echo";
let endpoint = ProtocolEndpoint::new(
vec![String::from("agent")],
Some(vec![]),
vec![],
vec![LeafSpec {
name: String::from(leaf_name),
procedures: vec![String::from("org.example.v1.echo.invoke")],
}],
);
let frame = encode_packet(
&PacketHeader {
packet_type: PacketType::Call,
src_path: vec![],
dst_path: vec![String::from("agent")],
dst_leaf: Some(String::from(leaf_name)),
hook_id: None,
},
&CallMessage {
procedure_id: String::from("org.example.v1.echo.invoke"),
data: vec![9],
response_hook: Some(HookTarget {
hook_id: 7,
return_path: vec![],
}),
},
)
.expect("frame encodes");
let calls = Rc::new(RefCell::new(Vec::new()));
let mut runtime = NodeRuntime::new(
EndpointState::new(endpoint),
connections,
RecordingTransport::default(),
);
runtime.register_leaf(RecordingLeaf::new(leaf_name, Rc::clone(&calls)));
runtime
.receive_frame(parent, frame)
.expect("frame processes");
runtime.dispatch_local_effects().expect("dispatch succeeds");
let reduced = runtime.reduce_leaf_actions().expect("hook data reduces");
let outcome = runtime.tick(TickBudget::default()).expect("tick flushes");
assert_eq!(reduced, 1);
assert!(runtime.leaf_actions().is_empty());
assert_eq!(outcome.outbound_frames, 1);
assert_eq!(runtime.transport().sent.len(), 1);
assert_eq!(runtime.transport().sent[0].0, parent);
let parsed = decode_frame(&runtime.transport().sent[0].1).expect("sent data decodes");
let header = parsed.header();
assert_eq!(header.packet_type, PacketType::Data);
assert_eq!(header.src_path, [String::from("agent")]);
assert_eq!(header.dst_path, Vec::<String>::new());
assert_eq!(header.hook_id, Some(7));
let data = parsed.deserialize_data().expect("payload is data");
assert_eq!(data.procedure_id, "org.example.v1.echo.invoke");
assert_eq!(data.data, [1, 2, 3]);
assert!(data.end_hook);
}
#[test]
fn unsupported_leaf_action_is_reported_and_retained() {
let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.echo"));
let mut runtime = NodeRuntime::new(
EndpointState::new(ProtocolEndpoint::new(
vec![String::from("agent")],
Some(vec![]),
vec![],
vec![],
)),
Connections::new(),
RecordingTransport::default(),
);
runtime.leaf_actions.push((
leaf_id.clone(),
LeafAction::SendCall(OutboundCall {
dst_path: vec![],
dst_leaf: None,
procedure_id: String::from("org.example.v1.echo.invoke"),
payload: vec![],
expects_response: false,
}),
));
runtime.leaf_actions.push((
leaf_id.clone(),
LeafAction::Connection(ConnectionAction::Unregister {
connection: ConnectionId::new(99),
}),
));
let error = runtime
.reduce_leaf_actions()
.expect_err("unsupported action is reported");
assert!(matches!(
error,
NodeRuntimeError::UnsupportedLeafAction { ref leaf_id, action }
if leaf_id.as_str() == "org.example.v1.echo" && action == "SendCall"
));
assert_eq!(runtime.leaf_actions().len(), 2);
assert!(matches!(
runtime.leaf_actions()[0].1,
LeafAction::SendCall(_)
));
assert!(matches!(
runtime.leaf_actions()[1].1,
LeafAction::Connection(_)
));
}
#[test]
fn failed_leaf_hook_data_routing_retains_failed_and_remaining_actions() {
let parent = ConnectionId::new(1);
let mut connections = Connections::new();
connections.push(Connection::registered(
parent,
ConnectionDirection::Parent,
vec![],
ConnectionGeneration::INITIAL,
));
let leaf_name = "org.example.v1.echo";
let endpoint = ProtocolEndpoint::new(
vec![String::from("agent")],
Some(vec![]),
vec![],
vec![LeafSpec {
name: String::from(leaf_name),
procedures: vec![String::from("org.example.v1.echo.invoke")],
}],
);
let frame = encode_packet(
&PacketHeader {
packet_type: PacketType::Call,
src_path: vec![],
dst_path: vec![String::from("agent")],
dst_leaf: Some(String::from(leaf_name)),
hook_id: None,
},
&CallMessage {
procedure_id: String::from("org.example.v1.echo.invoke"),
data: vec![],
response_hook: Some(HookTarget {
hook_id: 7,
return_path: vec![],
}),
},
)
.expect("frame encodes");
let calls = Rc::new(RefCell::new(Vec::new()));
let mut runtime = NodeRuntime::new(
EndpointState::new(endpoint),
connections,
RecordingTransport::default(),
);
runtime.register_leaf(RecordingLeaf::new(leaf_name, Rc::clone(&calls)));
runtime
.receive_frame(parent, frame)
.expect("frame processes and activates response hook");
runtime.dispatch_local_effects().expect("dispatch succeeds");
runtime.leaf_actions.push((
crate::leaf::LeafId::new(String::from(leaf_name)),
LeafAction::FailHook {
hook_id: 7,
fault: ProtocolFault::INTERNAL_ERROR,
},
));
runtime
.connections
.get_mut(parent)
.expect("parent connection exists")
.set_state(ConnectionState::Connected {
generation: ConnectionGeneration::INITIAL,
});
let error = runtime
.reduce_leaf_actions()
.expect_err("missing route connection is reported");
assert!(matches!(error, NodeRuntimeError::MissingRouteConnection));
assert_eq!(runtime.leaf_actions().len(), 2);
assert!(matches!(
runtime.leaf_actions()[0].1,
LeafAction::SendHookData(_)
));
assert!(matches!(
runtime.leaf_actions()[1].1,
LeafAction::FailHook { .. }
));
runtime
.register_parent_connection(parent, vec![], ConnectionGeneration::INITIAL)
.expect("parent route restored");
let retry_error = runtime
.reduce_leaf_actions()
.expect_err("later unsupported action is still reported");
assert!(matches!(
retry_error,
NodeRuntimeError::UnsupportedLeafAction {
action: "FailHook",
..
}
));
assert_eq!(runtime.leaf_actions().len(), 1);
assert!(matches!(
runtime.leaf_actions()[0].1,
LeafAction::FailHook { .. }
));
assert!(matches!(
runtime.effects()[0],
RuntimeEffect::SendFrame { connection, .. } if connection == parent
));
}
#[test] #[test]
fn unmatched_local_event_remains_queued() { fn unmatched_local_event_remains_queued() {
let mut runtime = NodeRuntime::new( let mut runtime = NodeRuntime::new(