From 97f3e305bb8101fa497078467e4c5f38adba4764 Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Sat, 9 May 2026 13:14:34 -0600 Subject: [PATCH] Dispatch local runtime effects to leaves --- API.md | 42 ++- unshell-runtime/src/leaf.rs | 67 ++++ unshell-runtime/src/lib.rs | 5 +- unshell-runtime/src/node/mod.rs | 2 +- unshell-runtime/src/node/runtime.rs | 505 +++++++++++++++++++++++++++- 5 files changed, 601 insertions(+), 20 deletions(-) diff --git a/API.md b/API.md index e54ae89..f20b8cf 100644 --- a/API.md +++ b/API.md @@ -118,11 +118,13 @@ Rules: effects. ```rust -pub struct NodeRuntime { +pub struct NodeRuntime { endpoint: EndpointState, connections: Connections, transport: T, effects: EffectQueue, + leaves: Vec>, + leaf_actions: Vec<(LeafId, LeafAction)>, } pub struct TickBudget { @@ -144,8 +146,6 @@ Primary operations: impl NodeRuntime { pub fn tick(&mut self, budget: TickBudget) -> Result>; - pub fn drain_local_effects(&mut self) -> impl Iterator; - pub fn receive_frame( &mut self, connection: ConnectionId, @@ -153,6 +153,24 @@ impl NodeRuntime { ) -> Result<(), NodeRuntimeError>; } +impl NodeRuntime { + pub fn new_with_leaf_error( + endpoint: EndpointState, + connections: Connections, + transport: T, + ) -> Self; + + pub fn drain_local_effects(&mut self) -> impl Iterator; + + pub fn register_leaf(&mut self, leaf: L) -> LeafId + where + L: Leaf + 'static; + + pub fn dispatch_local_effects(&mut self) -> Result>; + + pub fn drain_leaf_actions(&mut self) -> impl Iterator; +} + impl NodeRuntime { pub fn register_parent_connection( &mut self, @@ -190,7 +208,12 @@ Rules: unroutable or a route without a registered connection. - Runtime counts per-tick progress, not retained backlog. - Local events should be dispatched to leaves, not retained forever. -- Until leaf dispatch exists, callers may drain local/dropped effects; outbound sends remain runtime-owned. +- `dispatch_local_effects` attempts queued `RuntimeEffect::Local` values in FIFO + order, calls the matching leaf callback, records queued `LeafAction` values for + later reducer work, and leaves unmatched locals queued for a future attempt. +- Dispatch does not consume `SendFrame` or `Dropped` effects. Outbound sends remain + runtime-owned, and drop notifications remain available to callers that drain + local/drop effects. - Send failures must not drop unrelated queued effects. ## Leaf API @@ -275,7 +298,7 @@ parent frame for local endpoint -> EndpointState validates and returns Local(Call) -> NodeRuntime dispatches to matching Leaf::on_call -> leaf queues LeafAction values - -> runtime validates and applies actions + -> runtime retains actions for a later reducer pass ``` ### Outbound Leaf Call @@ -302,7 +325,6 @@ connection closes or unregisters ## Known Gaps In The Current Branch -- `Leaf` is defined but not yet registered or dispatched by `NodeRuntime`. - `LeafAction` values are queued by `LeafContext` but not yet applied by `NodeRuntime`. - Local outbound calls through the runtime are not implemented. @@ -314,11 +336,9 @@ connection closes or unregisters Implement one narrow end-to-end path: -1. Add a leaf registry to `NodeRuntime`. -2. Dispatch `RuntimeEffect::Local(Call)` into `Leaf::on_call`. -3. Apply `LeafAction::SendHookData` through endpoint packet state. -4. Route the produced frame through `Transport`. -5. Add tests proving a local call reaches a leaf and the leaf reply is framed and +1. Apply queued `LeafAction::SendHookData` through endpoint packet state. +2. Route the produced frame through `Transport`. +3. Add tests proving a leaf reply is framed and sent through a registered connection. That slice forces the real architecture to work without overbuilding the rest of diff --git a/unshell-runtime/src/leaf.rs b/unshell-runtime/src/leaf.rs index c637b73..49f6dfa 100644 --- a/unshell-runtime/src/leaf.rs +++ b/unshell-runtime/src/leaf.rs @@ -1,5 +1,6 @@ //! Leaf-facing runtime types. +use crate::alloc::boxed::Box; use crate::alloc::string::String; use crate::alloc::vec::Vec; use crate::context::LeafContext; @@ -108,3 +109,69 @@ pub trait Leaf { Ok(()) } } + +/// One leaf handler registered with a runtime-local dispatch key. +/// +/// The id is the packet `dst_leaf` name used by [`unshell_protocol::tree::LocalEvent`] +/// call headers. The runtime keeps this intentionally small: it only finds the +/// target callback and records requested [`crate::context::LeafAction`] values. +pub struct RegisteredLeaf { + id: LeafId, + capabilities: LeafCapabilities, + handler: Box>, +} + +impl RegisteredLeaf { + /// Creates a registered leaf from an explicit dispatch id and handler. + #[must_use] + pub fn new(id: LeafId, handler: L) -> Self + where + L: Leaf + 'static, + { + let capabilities = handler.capabilities().clone(); + Self { + id, + capabilities, + handler: Box::new(handler), + } + } + + /// Returns the dispatch id used for local packet matching. + #[must_use] + pub const fn id(&self) -> &LeafId { + &self.id + } + + /// Returns the capabilities cached at registration time. + #[must_use] + pub const fn capabilities(&self) -> &LeafCapabilities { + &self.capabilities + } + + /// Returns immutable access to the hosted leaf. + #[must_use] + pub fn handler(&self) -> &dyn Leaf { + self.handler.as_ref() + } + + /// Returns mutable access to the hosted leaf. + #[must_use] + pub fn handler_mut(&mut self) -> &mut dyn Leaf { + self.handler.as_mut() + } + + /// Returns all fields needed to invoke a leaf without cloning metadata. + pub(crate) fn dispatch_parts_mut( + &mut self, + ) -> (&LeafId, &LeafCapabilities, &mut dyn Leaf) { + (&self.id, &self.capabilities, self.handler.as_mut()) + } +} + +impl core::fmt::Debug for RegisteredLeaf { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("RegisteredLeaf") + .field("id", &self.id) + .finish_non_exhaustive() + } +} diff --git a/unshell-runtime/src/lib.rs b/unshell-runtime/src/lib.rs index 94e6893..9c4e381 100644 --- a/unshell-runtime/src/lib.rs +++ b/unshell-runtime/src/lib.rs @@ -24,9 +24,10 @@ pub use context::{ RuntimeCapability, }; pub use effects::{EffectQueue, RuntimeEffect}; -pub use leaf::{Leaf, LeafCapabilities, LeafId, LeafPermissions}; +pub use leaf::{Leaf, LeafCapabilities, LeafId, LeafPermissions, RegisteredLeaf}; pub use node::{ - EndpointState, Node, NodeId, NodeRuntime, NodeRuntimeError, NodeState, TickBudget, TickOutcome, + EndpointState, LeafDispatchError, Node, NodeId, NodeRuntime, NodeRuntimeError, NodeState, + TickBudget, TickOutcome, }; pub use transport::Transport; diff --git a/unshell-runtime/src/node/mod.rs b/unshell-runtime/src/node/mod.rs index 4bfd5bf..b070851 100644 --- a/unshell-runtime/src/node/mod.rs +++ b/unshell-runtime/src/node/mod.rs @@ -8,7 +8,7 @@ pub mod runtime; pub mod state; pub use packet::{EndpointState, PacketProcessor}; -pub use runtime::{NodeRuntime, NodeRuntimeError, TickBudget, TickOutcome}; +pub use runtime::{LeafDispatchError, NodeRuntime, NodeRuntimeError, TickBudget, TickOutcome}; pub use state::NodeState; use crate::alloc::string::String; diff --git a/unshell-runtime/src/node/runtime.rs b/unshell-runtime/src/node/runtime.rs index 7c18d2a..86b2d2c 100644 --- a/unshell-runtime/src/node/runtime.rs +++ b/unshell-runtime/src/node/runtime.rs @@ -2,19 +2,24 @@ //! //! This first slice owns transport and connection metadata, derives ingress from //! registered connections, delegates packet invariants to [`EndpointState`], and -//! queues concrete runtime effects. Leaf dispatch and leaf-action application are -//! intentionally not implemented in this slice. +//! queues concrete runtime effects. Leaf action application is intentionally not +//! implemented in this slice. use crate::alloc::{string::String, vec::Vec}; use crate::connections::{ Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, ConnectionState, Connections, RegisteredConnection, }; +use crate::context::{LeafAction, LeafContext}; use crate::effects::{EffectQueue, RuntimeEffect}; +use crate::leaf::{Leaf, LeafId, RegisteredLeaf}; use crate::transport::Transport; use unshell_protocol::FrameBytes; use unshell_protocol::tree::ChildRoute; -use unshell_protocol::tree::{EndpointError, EndpointOutcome, Ingress, RouteDecision}; +use unshell_protocol::tree::{ + Endpoint, EndpointError, EndpointOutcome, IncomingCall, IncomingData, IncomingFault, Ingress, + LocalEvent, RouteDecision, +}; use super::{EndpointState, PacketProcessor}; @@ -62,6 +67,34 @@ pub enum NodeRuntimeError { Transport(TransportError), } +/// Error returned when a leaf callback rejects a local event. +#[derive(Debug)] +pub struct LeafDispatchError { + /// Leaf id that received the event. + pub leaf_id: LeafId, + /// Callback-specific error returned by the leaf. + pub source: LeafError, +} + +impl core::fmt::Display for LeafDispatchError +where + LeafError: core::fmt::Display, +{ + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!( + f, + "leaf {} failed during dispatch: {}", + self.leaf_id.as_str(), + self.source + ) + } +} + +impl core::error::Error for LeafDispatchError where + LeafError: core::error::Error + 'static +{ +} + impl core::fmt::Display for NodeRuntimeError where TransportError: core::fmt::Display, @@ -85,11 +118,13 @@ impl core::error::Error for NodeRuntimeError whe /// Runtime owner for one endpoint, transport, and connection table. #[derive(Debug)] -pub struct NodeRuntime { +pub struct NodeRuntime { endpoint: EndpointState, connections: Connections, transport: T, effects: EffectQueue, + leaves: Vec>, + leaf_actions: Vec<(LeafId, LeafAction)>, } impl NodeRuntime { @@ -102,6 +137,27 @@ impl NodeRuntime { connections, transport, effects: EffectQueue::new(), + leaves: Vec::new(), + leaf_actions: Vec::new(), + } + } +} + +impl NodeRuntime { + /// Creates a runtime with an explicit leaf callback error type. + #[must_use] + pub const fn new_with_leaf_error( + endpoint: EndpointState, + connections: Connections, + transport: T, + ) -> Self { + Self { + endpoint, + connections, + transport, + effects: EffectQueue::new(), + leaves: Vec::new(), + leaf_actions: Vec::new(), } } @@ -251,9 +307,170 @@ impl NodeRuntime { pub fn drain_local_effects(&mut self) -> impl Iterator { self.effects.drain_local() } + + /// Registers a leaf under its declared `leaf_name` dispatch id. + /// + /// If the id already exists, the new handler replaces the previous one. This + /// keeps local dispatch deterministic without adding a broader registry API. + pub fn register_leaf(&mut self, leaf: L) -> LeafId + where + L: Leaf + 'static, + { + let id = LeafId::new(leaf.capabilities().leaf_name.clone()); + self.register_leaf_as(id.clone(), leaf); + id + } + + /// Registers a leaf under an explicit dispatch id. + /// + /// This is useful when tests or adapters already hold the exact `dst_leaf` + /// string from protocol metadata. Duplicate ids are replaced. + pub fn register_leaf_as(&mut self, id: LeafId, leaf: L) + where + L: Leaf + 'static, + { + if let Some(existing) = self.leaves.iter_mut().find(|entry| entry.id() == &id) { + *existing = RegisteredLeaf::new(id, leaf); + } else { + self.leaves.push(RegisteredLeaf::new(id, leaf)); + } + } + + /// Returns registered leaf handlers. + #[must_use] + pub fn leaves(&self) -> &[RegisteredLeaf] { + &self.leaves + } + + /// Returns leaf actions queued by dispatched callbacks. + /// + /// These actions are intentionally only retained here; reducing them into + /// endpoint packets or connection changes belongs to a later runtime slice. + #[must_use] + pub fn leaf_actions(&self) -> &[(LeafId, LeafAction)] { + &self.leaf_actions + } + + /// Drains leaf actions queued by dispatched callbacks. + pub fn drain_leaf_actions(&mut self) -> impl Iterator { + let actions = core::mem::take(&mut self.leaf_actions); + actions.into_iter() + } + + /// Dispatches currently queued local effects to matching leaf handlers. + /// + /// Local events are attempted in FIFO queue order. A matched event is removed + /// only after the leaf callback succeeds. Unmatched local events, outbound + /// sends, and drop notifications remain queued for future runtime work. + pub fn dispatch_local_effects(&mut self) -> Result> { + let mut retained = EffectQueue::new(); + let mut dispatched = 0usize; + let mut pending = core::mem::take(&mut self.effects); + let mut drained = pending.drain(); + + while let Some(effect) = drained.next() { + match effect { + RuntimeEffect::Local(event) => { + let Some(leaf_index) = self.leaf_index_for_event(&event) else { + retained.push(RuntimeEffect::Local(event)); + continue; + }; + + if let Err(error) = self.dispatch_event_to_leaf(leaf_index, &event) { + retained.push(RuntimeEffect::Local(event)); + for remaining in drained { + retained.push(remaining); + } + self.effects = retained; + return Err(error); + } + dispatched += 1; + } + other => retained.push(other), + } + } + + self.effects = retained; + Ok(dispatched) + } + + fn leaf_index_for_event(&self, event: &LocalEvent) -> Option { + let leaf_name = local_event_leaf_name(event)?; + self.leaves + .iter() + .position(|entry| entry.id().as_str() == leaf_name) + } + + fn dispatch_event_to_leaf( + &mut self, + leaf_index: usize, + event: &LocalEvent, + ) -> Result<(), LeafDispatchError> { + let local_path = self.endpoint.endpoint().path(); + let (leaf_id, actions) = { + let leaf = &mut self.leaves[leaf_index]; + let (leaf_id, capabilities, handler) = leaf.dispatch_parts_mut(); + let mut ctx = LeafContext::new(local_path, leaf_id, capabilities, &self.connections); + + match event { + LocalEvent::Call { header, message } => handler + .on_call( + &mut ctx, + IncomingCall { + header: header.clone(), + message: message.clone(), + }, + ) + .map_err(|source| LeafDispatchError { + leaf_id: leaf_id.clone(), + source, + })?, + LocalEvent::Data { + header, + message, + hook_key, + } => handler + .on_data( + &mut ctx, + IncomingData { + header: header.clone(), + message: message.clone(), + hook_key: hook_key.clone(), + }, + ) + .map_err(|source| LeafDispatchError { + leaf_id: leaf_id.clone(), + source, + })?, + LocalEvent::Fault { + header, + message, + hook_key, + } => handler + .on_fault( + &mut ctx, + IncomingFault { + header: header.clone(), + fault: message.clone(), + hook_key: hook_key.clone(), + }, + ) + .map_err(|source| LeafDispatchError { + leaf_id: leaf_id.clone(), + source, + })?, + } + + (leaf_id.clone(), ctx.into_actions()) + }; + + self.leaf_actions + .extend(actions.into_iter().map(|action| (leaf_id.clone(), action))); + Ok(()) + } } -impl NodeRuntime +impl NodeRuntime where T: Transport, { @@ -424,17 +641,33 @@ fn ingress_for(registered: &RegisteredConnection) -> Ingress { } } +fn local_event_leaf_name(event: &LocalEvent) -> Option<&str> { + match event { + LocalEvent::Call { header, .. } + | LocalEvent::Data { header, .. } + | LocalEvent::Fault { header, .. } => header.dst_leaf.as_deref(), + } +} + #[cfg(test)] mod tests { + use core::cell::RefCell; + use core::convert::Infallible; + + use crate::alloc::rc::Rc; use crate::alloc::string::String; use crate::alloc::vec; use crate::alloc::vec::Vec; use crate::connections::{ Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, Connections, }; + use crate::context::{LeafAction, OutboundHookData}; use crate::effects::RuntimeEffect; + use crate::leaf::{Leaf, LeafCapabilities, LeafPermissions}; use crate::transport::Transport; - use unshell_protocol::tree::{ChildRoute, EndpointError, ProtocolEndpoint}; + use unshell_protocol::tree::{ + ChildRoute, EndpointError, IncomingCall, LeafSpec, LocalEvent, ProtocolEndpoint, + }; use unshell_protocol::{CallMessage, FrameBytes, PacketHeader, PacketType, encode_packet}; use super::{EndpointState, NodeRuntime, NodeRuntimeError, TickBudget}; @@ -469,6 +702,81 @@ mod tests { } } + struct RecordingLeaf { + capabilities: LeafCapabilities, + calls: Rc>>, + } + + impl RecordingLeaf { + fn new(leaf_name: &str, calls: Rc>>) -> Self { + Self { + capabilities: LeafCapabilities { + leaf_name: String::from(leaf_name), + procedures: vec![String::from("org.example.v1.echo.invoke")], + permissions: LeafPermissions::REPLY_ONLY, + }, + calls, + } + } + } + + impl Leaf for RecordingLeaf { + type Error = Infallible; + + fn capabilities(&self) -> &LeafCapabilities { + &self.capabilities + } + + fn on_call( + &mut self, + ctx: &mut crate::LeafContext<'_>, + call: IncomingCall, + ) -> Result<(), Self::Error> { + self.calls.borrow_mut().push(call.clone()); + ctx.hook_data(OutboundHookData { + dst_path: call.header.src_path, + hook_id: 7, + procedure_id: call.message.procedure_id, + payload: vec![1, 2, 3], + end_hook: true, + }) + .expect("reply-only leaf can queue hook data"); + Ok(()) + } + } + + struct FailingLeaf { + capabilities: LeafCapabilities, + } + + impl FailingLeaf { + fn new(leaf_name: &str) -> Self { + Self { + capabilities: LeafCapabilities { + leaf_name: String::from(leaf_name), + procedures: vec![String::from("org.example.v1.fail.invoke")], + permissions: LeafPermissions::REPLY_ONLY, + }, + } + } + } + + impl Leaf for FailingLeaf { + type Error = &'static str; + + fn capabilities(&self) -> &LeafCapabilities { + &self.capabilities + } + + fn on_call( + &mut self, + _ctx: &mut crate::LeafContext<'_>, + _call: IncomingCall, + ) -> Result<(), Self::Error> { + Err("leaf failed") + } + } + #[test] fn tick_derives_ingress_and_sends_forwarded_child_frame() { let parent = ConnectionId::new(1); @@ -929,6 +1237,191 @@ mod tests { assert!(matches!(runtime.effects()[0], RuntimeEffect::Local(_))); } + #[test] + fn dispatch_local_call_reaches_registered_leaf() { + let parent = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::registered( + parent, + ConnectionDirection::Parent, + vec![], + ConnectionGeneration::INITIAL, + )); + + let leaf_name = "org.example.v1.echo"; + let endpoint = ProtocolEndpoint::new( + vec![String::from("agent")], + Some(vec![]), + vec![], + vec![LeafSpec { + name: String::from(leaf_name), + procedures: vec![String::from("org.example.v1.echo.invoke")], + }], + ); + let frame = encode_packet( + &PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent")], + dst_leaf: Some(String::from(leaf_name)), + hook_id: None, + }, + &CallMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![9], + response_hook: None, + }, + ) + .expect("frame encodes"); + let calls = Rc::new(RefCell::new(Vec::new())); + let mut runtime = NodeRuntime::new( + EndpointState::new(endpoint), + connections, + RecordingTransport::default(), + ); + runtime.register_leaf(RecordingLeaf::new(leaf_name, Rc::clone(&calls))); + + runtime + .receive_frame(parent, frame) + .expect("frame processes"); + let dispatched = runtime.dispatch_local_effects().expect("dispatch succeeds"); + + assert_eq!(dispatched, 1); + assert!(runtime.effects().is_empty()); + assert_eq!(calls.borrow().len(), 1); + assert_eq!(calls.borrow()[0].message.data, [9]); + assert_eq!(runtime.leaf_actions().len(), 1); + let (action_leaf, action) = &runtime.leaf_actions()[0]; + assert_eq!(action_leaf.as_str(), leaf_name); + let LeafAction::SendHookData(data) = action else { + panic!("leaf action should be retained hook data"); + }; + assert_eq!(data.dst_path, Vec::::new()); + assert_eq!(data.hook_id, 7); + assert_eq!(data.procedure_id, "org.example.v1.echo.invoke"); + assert_eq!(data.payload, [1, 2, 3]); + assert!(data.end_hook); + assert!(runtime.transport().sent.is_empty()); + } + + #[test] + fn unmatched_local_event_remains_queued() { + let mut runtime = NodeRuntime::new( + EndpointState::new(ProtocolEndpoint::new( + vec![String::from("agent")], + Some(vec![]), + vec![], + vec![], + )), + Connections::new(), + RecordingTransport::default(), + ); + runtime.effects.push(RuntimeEffect::Local(LocalEvent::Call { + header: PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent")], + dst_leaf: Some(String::from("org.example.v1.missing")), + hook_id: None, + }, + message: CallMessage { + procedure_id: String::from("org.example.v1.missing.invoke"), + data: vec![], + response_hook: None, + }, + })); + + let dispatched = runtime.dispatch_local_effects().expect("dispatch succeeds"); + + assert_eq!(dispatched, 0); + assert_eq!(runtime.effects().len(), 1); + assert!(matches!(runtime.effects()[0], RuntimeEffect::Local(_))); + } + + #[test] + fn local_dispatch_preserves_send_frame_and_dropped_effects() { + let parent = ConnectionId::new(1); + let frame = FrameBytes::new(); + let mut runtime = NodeRuntime::new( + EndpointState::new(ProtocolEndpoint::new( + vec![String::from("agent")], + Some(vec![]), + vec![], + vec![], + )), + Connections::new(), + RecordingTransport::default(), + ); + runtime.effects.push(RuntimeEffect::SendFrame { + connection: parent, + generation: ConnectionGeneration::INITIAL, + frame, + }); + runtime.effects.push(RuntimeEffect::Dropped); + + let dispatched = runtime.dispatch_local_effects().expect("dispatch succeeds"); + + assert_eq!(dispatched, 0); + assert_eq!(runtime.effects().len(), 2); + assert!(matches!( + runtime.effects()[0], + RuntimeEffect::SendFrame { .. } + )); + assert!(matches!(runtime.effects()[1], RuntimeEffect::Dropped)); + } + + #[test] + fn failed_local_dispatch_preserves_failed_and_remaining_effects() { + let parent = ConnectionId::new(1); + let leaf_name = "org.example.v1.fail"; + let mut runtime = NodeRuntime::<_, &'static str>::new_with_leaf_error( + EndpointState::new(ProtocolEndpoint::new( + vec![String::from("agent")], + Some(vec![]), + vec![], + vec![], + )), + Connections::new(), + RecordingTransport::default(), + ); + runtime.register_leaf(FailingLeaf::new(leaf_name)); + runtime.effects.push(RuntimeEffect::Local(LocalEvent::Call { + header: PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent")], + dst_leaf: Some(String::from(leaf_name)), + hook_id: None, + }, + message: CallMessage { + procedure_id: String::from("org.example.v1.fail.invoke"), + data: vec![], + response_hook: None, + }, + })); + runtime.effects.push(RuntimeEffect::Dropped); + runtime.effects.push(RuntimeEffect::SendFrame { + connection: parent, + generation: ConnectionGeneration::INITIAL, + frame: FrameBytes::new(), + }); + + let error = runtime + .dispatch_local_effects() + .expect_err("leaf callback failure is returned"); + + assert_eq!(error.leaf_id.as_str(), leaf_name); + assert_eq!(error.source, "leaf failed"); + assert!(runtime.leaf_actions().is_empty()); + assert_eq!(runtime.effects().len(), 3); + assert!(matches!(runtime.effects()[0], RuntimeEffect::Local(_))); + assert!(matches!(runtime.effects()[1], RuntimeEffect::Dropped)); + assert!(matches!( + runtime.effects()[2], + RuntimeEffect::SendFrame { .. } + )); + } + #[test] fn failed_send_preserves_failed_and_unprocessed_effects() { let parent = ConnectionId::new(1);