Reduce leaf send call actions

This commit is contained in:
Michael Mikovsky
2026-05-09 13:40:21 -06:00
parent a68e86ef6d
commit 71d1aee235
5 changed files with 410 additions and 26 deletions
+9 -9
View File
@@ -329,9 +329,9 @@ connection closes or unregisters
## Known Gaps In The Current Branch ## Known Gaps In The Current Branch
- `LeafAction::SendHookData` is reduced by `NodeRuntime`; other action variants - `LeafAction::SendCall` and `LeafAction::SendHookData` are reduced by
are still unsupported and must remain queued when encountered. `NodeRuntime`; hook fault and connection action variants are still unsupported
- Local outbound calls through the runtime are not implemented. and must remain queued when encountered.
- Hook fault actions through the runtime are not implemented. - Hook fault actions through the runtime are not implemented.
- Connection actions through the runtime are not implemented. - Connection actions through the runtime are not implemented.
- Disconnect does not yet clean hooks, sessions, route state, and queued effects. - Disconnect does not yet clean hooks, sessions, route state, and queued effects.
@@ -342,11 +342,11 @@ connection closes or unregisters
Implement the next narrow leaf-action path: Implement the next narrow leaf-action path:
1. Apply queued `LeafAction::SendCall` through endpoint packet state. 1. Apply queued `LeafAction::FailHook` through endpoint packet state.
2. Preserve hook reservation and routing failure semantics without dropping 2. Preserve pending/active hook cleanup semantics without dropping unprocessed
unprocessed actions. actions.
3. Add tests proving a local leaf can initiate an outbound call and receive the 3. Keep connection registration actions queued until runtime-owned disconnect
response through the existing dispatch path. cleanup can update connections, routes, hooks, and queued effects atomically.
That slice should continue the one-variant-at-a-time reducer approach without That slice should continue the one-variant-at-a-time reducer approach without
implementing hook faults or connection actions early. implementing connection actions early.
@@ -311,7 +311,7 @@ pub trait Endpoint {
/// let endpoint = ProtocolEndpoint::new(vec!["worker".into()], Some(Vec::new()), Vec::new(), Vec::new()); /// let endpoint = ProtocolEndpoint::new(vec!["worker".into()], Some(Vec::new()), Vec::new(), Vec::new());
/// let _ = endpoint; /// let _ = endpoint;
/// ``` /// ```
#[derive(Debug, Default)] #[derive(Debug, Clone, Default)]
pub struct ProtocolEndpoint { pub struct ProtocolEndpoint {
pub(crate) local_id: Option<String>, pub(crate) local_id: Option<String>,
pub(crate) path: Vec<String>, pub(crate) path: Vec<String>,
+1 -1
View File
@@ -130,7 +130,7 @@ pub struct HookConflict;
/// }).unwrap(); /// }).unwrap();
/// assert_eq!(hooks.pending_len(), 1); /// assert_eq!(hooks.pending_len(), 1);
/// ``` /// ```
#[derive(Debug, Default)] #[derive(Debug, Clone, Default)]
pub struct HookTable { pub struct HookTable {
pending: BTreeMap<HookKey, PendingHook>, pending: BTreeMap<HookKey, PendingHook>,
active: BTreeMap<HookKey, ActiveHook>, active: BTreeMap<HookKey, ActiveHook>,
+56 -2
View File
@@ -5,7 +5,10 @@
//! into packet-only and runtime-owned layers. The wrapper does not own transport //! into packet-only and runtime-owned layers. The wrapper does not own transport
//! handles, does not dispatch leaves, and does not make admission decisions. //! handles, does not dispatch leaves, and does not make admission decisions.
use unshell_protocol::{FrameBytes, tree::Endpoint as ProtocolEndpointTrait}; use unshell_protocol::{
CallMessage, FrameBytes, PacketHeader, PacketType, tree::Endpoint as ProtocolEndpointTrait,
validate_call, validate_header, validate_procedure_id,
};
pub use unshell_protocol::tree::{ pub use unshell_protocol::tree::{
ChildRoute, EndpointError, EndpointOutcome, HookKey, Ingress, LeafSpec, LocalEvent, ChildRoute, EndpointError, EndpointOutcome, HookKey, Ingress, LeafSpec, LocalEvent,
@@ -32,7 +35,7 @@ pub trait PacketProcessor {
/// This is a compatibility shell around [`ProtocolEndpoint`]. It exists so new /// This is a compatibility shell around [`ProtocolEndpoint`]. It exists so new
/// runtime code can depend on `unshell_runtime::node::EndpointState` while the /// runtime code can depend on `unshell_runtime::node::EndpointState` while the
/// old protocol-tree endpoint remains the source of truth for packet invariants. /// old protocol-tree endpoint remains the source of truth for packet invariants.
#[derive(Debug, Default)] #[derive(Clone, Debug, Default)]
pub struct EndpointState { pub struct EndpointState {
endpoint: ProtocolEndpoint, endpoint: ProtocolEndpoint,
} }
@@ -87,6 +90,57 @@ impl EndpointState {
.send_data(dst_path, hook_id, procedure_id, data, end_hook) .send_data(dst_path, hook_id, procedure_id, data, end_hook)
} }
/// Builds and routes one call packet through the wrapped endpoint state.
pub fn send_call(
&mut self,
dst_path: alloc::vec::Vec<alloc::string::String>,
dst_leaf: Option<alloc::string::String>,
procedure_id: alloc::string::String,
response_hook_id: Option<u64>,
data: alloc::vec::Vec<u8>,
) -> Result<EndpointOutcome, EndpointError> {
self.endpoint
.send_call(dst_path, dst_leaf, procedure_id, response_hook_id, data)
}
/// Validates an outbound call request before allocating response hook state.
pub fn validate_call_request(
&self,
dst_path: &[alloc::string::String],
dst_leaf: Option<&alloc::string::String>,
procedure_id: &str,
data: &[u8],
expects_response: bool,
) -> Result<(), EndpointError> {
validate_procedure_id(procedure_id)?;
let header = PacketHeader {
packet_type: PacketType::Call,
src_path: self.endpoint.path().to_vec(),
dst_path: dst_path.to_vec(),
dst_leaf: dst_leaf.cloned(),
hook_id: None,
};
let call = CallMessage {
procedure_id: procedure_id.into(),
data: data.to_vec(),
response_hook: expects_response.then(|| unshell_protocol::HookTarget {
hook_id: 1,
return_path: self.endpoint.path().to_vec(),
}),
};
validate_header(&header)?;
validate_call(&header, &call)?;
Ok(())
}
/// Allocates a response hook id scoped to this endpoint path.
#[must_use]
pub fn allocate_hook_id(&mut self) -> u64 {
self.endpoint.allocate_hook_id()
}
/// Consumes the wrapper and returns the underlying protocol endpoint. /// Consumes the wrapper and returns the underlying protocol endpoint.
#[must_use] #[must_use]
pub fn into_endpoint(self) -> ProtocolEndpoint { pub fn into_endpoint(self) -> ProtocolEndpoint {
+343 -13
View File
@@ -3,7 +3,8 @@
//! This first slice owns transport and connection metadata, derives ingress from //! This first slice owns transport and connection metadata, derives ingress from
//! registered connections, delegates packet invariants to [`EndpointState`], and //! registered connections, delegates packet invariants to [`EndpointState`], and
//! queues concrete runtime effects. Leaf action reduction is intentionally //! queues concrete runtime effects. Leaf action reduction is intentionally
//! narrow: this slice only turns hook-data replies into endpoint outcomes. //! narrow: this slice only turns outbound calls and hook-data replies into
//! endpoint outcomes.
use crate::alloc::{string::String, vec::Vec}; use crate::alloc::{string::String, vec::Vec};
use crate::connections::{ use crate::connections::{
@@ -543,9 +544,10 @@ where
/// Reduces queued leaf actions through endpoint packet state. /// Reduces queued leaf actions through endpoint packet state.
/// ///
/// Only [`LeafAction::SendHookData`] is implemented in this slice. Unsupported /// [`LeafAction::SendCall`] and [`LeafAction::SendHookData`] are implemented
/// actions stop reduction and remain queued with all later actions so callers /// in this slice. Unsupported actions stop reduction and remain queued with
/// can retry after a future runtime gains support. /// all later actions so callers can retry after a future runtime gains
/// support.
pub fn reduce_leaf_actions(&mut self) -> Result<usize, NodeRuntimeError<T::Error>> { pub fn reduce_leaf_actions(&mut self) -> Result<usize, NodeRuntimeError<T::Error>> {
let mut reduced = 0usize; let mut reduced = 0usize;
let mut retained = Vec::new(); let mut retained = Vec::new();
@@ -553,6 +555,64 @@ where
while let Some((leaf_id, action)) = pending.next() { while let Some((leaf_id, action)) = pending.next() {
match action { match action {
LeafAction::SendCall(call) => {
let original_action = LeafAction::SendCall(call.clone());
let route = self.endpoint.route_decision(&call.dst_path);
if route_requires_connection(route)
&& self.connection_for_route(route).is_none()
{
retained.push((leaf_id, original_action));
retained.extend(pending);
self.leaf_actions = retained;
return Err(NodeRuntimeError::MissingRouteConnection);
}
if let Err(error) = self.endpoint.validate_call_request(
&call.dst_path,
call.dst_leaf.as_ref(),
&call.procedure_id,
&call.payload,
call.expects_response,
) {
retained.push((leaf_id, original_action));
retained.extend(pending);
self.leaf_actions = retained;
return Err(NodeRuntimeError::Endpoint(error));
}
// Allocate only after transport availability is known. A
// failed preflight must leave the queued call retryable
// without consuming a hook id or reserving pending hook state.
let endpoint_checkpoint = self.endpoint.clone();
let response_hook_id = call
.expects_response
.then(|| self.endpoint.allocate_hook_id());
let outcome = match self.endpoint.send_call(
call.dst_path,
call.dst_leaf,
call.procedure_id,
response_hook_id,
call.payload,
) {
Ok(outcome) => outcome,
Err(error) => {
self.endpoint = endpoint_checkpoint;
retained.push((leaf_id, original_action));
retained.extend(pending);
self.leaf_actions = retained;
return Err(NodeRuntimeError::Endpoint(error));
}
};
if let Err(error) = self.apply_outcome(outcome) {
self.endpoint = endpoint_checkpoint;
retained.push((leaf_id, original_action));
retained.extend(pending);
self.leaf_actions = retained;
return Err(error);
}
reduced += 1;
}
LeafAction::SendHookData(data) => { LeafAction::SendHookData(data) => {
let original_action = LeafAction::SendHookData(data.clone()); let original_action = LeafAction::SendHookData(data.clone());
let route = self.endpoint.route_decision(&data.dst_path); let route = self.endpoint.route_decision(&data.dst_path);
@@ -1476,6 +1536,279 @@ mod tests {
assert!(data.end_hook); assert!(data.end_hook);
} }
#[test]
fn leaf_send_call_reduces_to_child_transport_frame() {
let child = ConnectionId::new(1);
let mut connections = Connections::new();
connections.push(Connection::registered(
child,
ConnectionDirection::Child,
vec![String::from("agent"), String::from("worker")],
ConnectionGeneration::INITIAL,
));
let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.client"));
let endpoint = ProtocolEndpoint::new(
vec![String::from("agent")],
None,
vec![ChildRoute::registered(vec![
String::from("agent"),
String::from("worker"),
])],
Vec::new(),
);
let mut runtime = NodeRuntime::new(
EndpointState::new(endpoint),
connections,
RecordingTransport::default(),
);
runtime.leaf_actions.push((
leaf_id,
LeafAction::SendCall(OutboundCall {
dst_path: vec![String::from("agent"), String::from("worker")],
dst_leaf: Some(String::from("org.example.v1.echo")),
procedure_id: String::from("org.example.v1.echo.invoke"),
payload: vec![4, 5, 6],
expects_response: false,
}),
));
let reduced = runtime.reduce_leaf_actions().expect("call reduces");
let outcome = runtime.tick(TickBudget::default()).expect("tick flushes");
assert_eq!(reduced, 1);
assert!(runtime.leaf_actions().is_empty());
assert_eq!(outcome.outbound_frames, 1);
assert_eq!(runtime.transport().sent.len(), 1);
assert_eq!(runtime.transport().sent[0].0, child);
let parsed = decode_frame(&runtime.transport().sent[0].1).expect("sent call decodes");
let header = parsed.header();
assert_eq!(header.packet_type, PacketType::Call);
assert_eq!(header.src_path, [String::from("agent")]);
assert_eq!(
header.dst_path,
[String::from("agent"), String::from("worker")]
);
assert_eq!(header.dst_leaf.as_deref(), Some("org.example.v1.echo"));
let call = parsed.deserialize_call().expect("payload is call");
assert_eq!(call.procedure_id, "org.example.v1.echo.invoke");
assert_eq!(call.data, [4, 5, 6]);
assert!(call.response_hook.is_none());
}
#[test]
fn expected_response_send_call_preflights_route_and_uses_retry_hook() {
let child = ConnectionId::new(1);
let mut connections = Connections::new();
connections.push(Connection::connected(child, ConnectionGeneration::INITIAL));
let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.client"));
let endpoint = ProtocolEndpoint::new(
vec![String::from("agent")],
None,
vec![ChildRoute::registered(vec![
String::from("agent"),
String::from("worker"),
])],
Vec::new(),
);
let mut runtime = NodeRuntime::new(
EndpointState::new(endpoint),
connections,
RecordingTransport::default(),
);
runtime.leaf_actions.push((
leaf_id,
LeafAction::SendCall(OutboundCall {
dst_path: vec![String::from("agent"), String::from("worker")],
dst_leaf: Some(String::from("org.example.v1.echo")),
procedure_id: String::from("org.example.v1.echo.invoke"),
payload: vec![],
expects_response: true,
}),
));
let error = runtime
.reduce_leaf_actions()
.expect_err("missing child connection is reported");
assert!(matches!(error, NodeRuntimeError::MissingRouteConnection));
assert_eq!(runtime.leaf_actions().len(), 1);
assert!(runtime.effects().is_empty());
runtime
.register_child_connection(
child,
vec![String::from("agent"), String::from("worker")],
ConnectionGeneration::INITIAL,
)
.expect("child route restored");
let reduced = runtime
.reduce_leaf_actions()
.expect("retry reduces after route exists");
let outcome = runtime.tick(TickBudget::default()).expect("tick flushes");
assert_eq!(reduced, 1);
assert_eq!(outcome.outbound_frames, 1);
let parsed = decode_frame(&runtime.transport().sent[0].1).expect("sent call decodes");
let call = parsed.deserialize_call().expect("payload is call");
assert_eq!(
call.response_hook,
Some(HookTarget {
hook_id: 1,
return_path: vec![String::from("agent")],
})
);
let response = encode_packet(
&PacketHeader {
packet_type: PacketType::Data,
src_path: vec![String::from("agent"), String::from("worker")],
dst_path: vec![String::from("agent")],
dst_leaf: None,
hook_id: Some(1),
},
&unshell_protocol::DataMessage {
procedure_id: String::from("org.example.v1.echo.invoke"),
data: vec![9],
end_hook: true,
},
)
.expect("response encodes");
runtime
.receive_frame(child, response)
.expect("response hook is accepted");
assert!(
matches!(runtime.effects()[0], RuntimeEffect::Local(LocalEvent::Data { ref hook_key, .. }) if hook_key.hook_id == 1)
);
}
#[test]
fn invalid_send_call_does_not_affect_next_response_hook_id() {
let child = ConnectionId::new(1);
let mut connections = Connections::new();
connections.push(Connection::registered(
child,
ConnectionDirection::Child,
vec![String::from("agent"), String::from("worker")],
ConnectionGeneration::INITIAL,
));
let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.client"));
let endpoint = ProtocolEndpoint::new(
vec![String::from("agent")],
None,
vec![ChildRoute::registered(vec![
String::from("agent"),
String::from("worker"),
])],
Vec::new(),
);
let mut runtime = NodeRuntime::new(
EndpointState::new(endpoint),
connections,
RecordingTransport::default(),
);
runtime.leaf_actions.push((
leaf_id.clone(),
LeafAction::SendCall(OutboundCall {
dst_path: vec![String::from("agent"), String::from("worker")],
dst_leaf: Some(String::from("org.example.v1.echo")),
procedure_id: String::new(),
payload: vec![],
expects_response: false,
}),
));
let error = runtime
.reduce_leaf_actions()
.expect_err("invalid procedure is rejected");
assert!(matches!(error, NodeRuntimeError::Endpoint(_)));
assert_eq!(runtime.leaf_actions().len(), 1);
runtime.leaf_actions.clear();
runtime.leaf_actions.push((
leaf_id,
LeafAction::SendCall(OutboundCall {
dst_path: vec![String::from("agent"), String::from("worker")],
dst_leaf: Some(String::from("org.example.v1.echo")),
procedure_id: String::from("org.example.v1.echo.invoke"),
payload: vec![],
expects_response: true,
}),
));
runtime.reduce_leaf_actions().expect("valid retry reduces");
runtime.tick(TickBudget::default()).expect("tick flushes");
let parsed = decode_frame(&runtime.transport().sent[0].1).expect("sent call decodes");
let call = parsed.deserialize_call().expect("payload is call");
assert_eq!(
call.response_hook,
Some(HookTarget {
hook_id: 1,
return_path: vec![String::from("agent")],
})
);
}
#[test]
fn failed_leaf_send_call_routing_retains_failed_and_remaining_actions() {
let child = ConnectionId::new(1);
let mut connections = Connections::new();
connections.push(Connection::connected(child, ConnectionGeneration::INITIAL));
let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.client"));
let endpoint = ProtocolEndpoint::new(
vec![String::from("agent")],
None,
vec![ChildRoute::registered(vec![
String::from("agent"),
String::from("worker"),
])],
Vec::new(),
);
let mut runtime = NodeRuntime::new(
EndpointState::new(endpoint),
connections,
RecordingTransport::default(),
);
runtime.leaf_actions.push((
leaf_id.clone(),
LeafAction::SendCall(OutboundCall {
dst_path: vec![String::from("agent"), String::from("worker")],
dst_leaf: Some(String::from("org.example.v1.echo")),
procedure_id: String::from("org.example.v1.echo.invoke"),
payload: vec![],
expects_response: true,
}),
));
runtime.leaf_actions.push((
leaf_id,
LeafAction::FailHook {
hook_id: 7,
fault: ProtocolFault::INTERNAL_ERROR,
},
));
let error = runtime
.reduce_leaf_actions()
.expect_err("missing child connection is reported");
assert!(matches!(error, NodeRuntimeError::MissingRouteConnection));
assert_eq!(runtime.leaf_actions().len(), 2);
assert!(matches!(
runtime.leaf_actions()[0].1,
LeafAction::SendCall(_)
));
assert!(matches!(
runtime.leaf_actions()[1].1,
LeafAction::FailHook { .. }
));
assert!(runtime.effects().is_empty());
}
#[test] #[test]
fn unsupported_leaf_action_is_reported_and_retained() { fn unsupported_leaf_action_is_reported_and_retained() {
let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.echo")); let leaf_id = crate::leaf::LeafId::new(String::from("org.example.v1.echo"));
@@ -1491,13 +1824,10 @@ mod tests {
); );
runtime.leaf_actions.push(( runtime.leaf_actions.push((
leaf_id.clone(), leaf_id.clone(),
LeafAction::SendCall(OutboundCall { LeafAction::FailHook {
dst_path: vec![], hook_id: 7,
dst_leaf: None, fault: ProtocolFault::INTERNAL_ERROR,
procedure_id: String::from("org.example.v1.echo.invoke"), },
payload: vec![],
expects_response: false,
}),
)); ));
runtime.leaf_actions.push(( runtime.leaf_actions.push((
leaf_id.clone(), leaf_id.clone(),
@@ -1513,12 +1843,12 @@ mod tests {
assert!(matches!( assert!(matches!(
error, error,
NodeRuntimeError::UnsupportedLeafAction { ref leaf_id, action } NodeRuntimeError::UnsupportedLeafAction { ref leaf_id, action }
if leaf_id.as_str() == "org.example.v1.echo" && action == "SendCall" if leaf_id.as_str() == "org.example.v1.echo" && action == "FailHook"
)); ));
assert_eq!(runtime.leaf_actions().len(), 2); assert_eq!(runtime.leaf_actions().len(), 2);
assert!(matches!( assert!(matches!(
runtime.leaf_actions()[0].1, runtime.leaf_actions()[0].1,
LeafAction::SendCall(_) LeafAction::FailHook { .. }
)); ));
assert!(matches!( assert!(matches!(
runtime.leaf_actions()[1].1, runtime.leaf_actions()[1].1,