From fcde5d66d27f20e8b23131799f410c7981a046fa Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Sat, 9 May 2026 12:59:07 -0600 Subject: [PATCH] Add local runtime effect draining --- API.md | 3 + unshell-runtime/src/effects.rs | 59 +++++++++++++++ unshell-runtime/src/node/runtime.rs | 111 ++++++++++++++++++++++++++++ 3 files changed, 173 insertions(+) diff --git a/API.md b/API.md index e6af581..a30a8d1 100644 --- a/API.md +++ b/API.md @@ -144,6 +144,8 @@ Primary operations: impl NodeRuntime { pub fn tick(&mut self, budget: TickBudget) -> Result>; + pub fn drain_local_effects(&mut self) -> impl Iterator; + pub fn receive_frame( &mut self, connection: ConnectionId, @@ -168,6 +170,7 @@ Rules: - Callers never pass `Ingress` into `NodeRuntime`. - Runtime counts per-tick progress, not retained backlog. - Local events should be dispatched to leaves, not retained forever. +- Until leaf dispatch exists, callers may drain local/dropped effects; outbound sends remain runtime-owned. - Send failures must not drop unrelated queued effects. ## Leaf API diff --git a/unshell-runtime/src/effects.rs b/unshell-runtime/src/effects.rs index cd39455..cf1d5e0 100644 --- a/unshell-runtime/src/effects.rs +++ b/unshell-runtime/src/effects.rs @@ -53,4 +53,63 @@ impl EffectQueue { pub fn drain(&mut self) -> impl Iterator + '_ { self.entries.drain(..) } + + /// Drains local-dispatch effects in FIFO order, leaving outbound sends queued. + pub fn drain_local(&mut self) -> impl Iterator { + let mut drained = Vec::new(); + let mut retained = Vec::with_capacity(self.entries.len()); + + for effect in self.entries.drain(..) { + match effect { + RuntimeEffect::Local(_) | RuntimeEffect::Dropped => drained.push(effect), + RuntimeEffect::SendFrame { .. } => retained.push(effect), + } + } + + self.entries = retained; + drained.into_iter() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn drain_local_leaves_outbound_sends_queued() { + let first = ConnectionId::new(1); + let second = ConnectionId::new(2); + let mut queue = EffectQueue::new(); + + queue.push(RuntimeEffect::SendFrame { + connection: first, + generation: ConnectionGeneration::INITIAL, + frame: FrameBytes::new(), + }); + queue.push(RuntimeEffect::Dropped); + queue.push(RuntimeEffect::SendFrame { + connection: second, + generation: ConnectionGeneration::INITIAL, + frame: FrameBytes::new(), + }); + queue.push(RuntimeEffect::Dropped); + + let drained: Vec<_> = queue.drain_local().collect(); + + assert_eq!(drained.len(), 2); + assert!( + drained + .iter() + .all(|effect| matches!(effect, RuntimeEffect::Dropped)) + ); + assert_eq!(queue.entries().len(), 2); + assert!(matches!( + queue.entries()[0], + RuntimeEffect::SendFrame { connection, .. } if connection == first + )); + assert!(matches!( + queue.entries()[1], + RuntimeEffect::SendFrame { connection, .. } if connection == second + )); + } } diff --git a/unshell-runtime/src/node/runtime.rs b/unshell-runtime/src/node/runtime.rs index 585d708..903611f 100644 --- a/unshell-runtime/src/node/runtime.rs +++ b/unshell-runtime/src/node/runtime.rs @@ -141,6 +141,13 @@ impl NodeRuntime { pub fn effects(&self) -> &[RuntimeEffect] { self.effects.entries() } + + /// Drains queued local-dispatch effects in FIFO order. + /// + /// Outbound frame effects remain queued for runtime-owned transport flushing. + pub fn drain_local_effects(&mut self) -> impl Iterator { + self.effects.drain_local() + } } impl NodeRuntime @@ -649,6 +656,58 @@ mod tests { assert!(matches!(runtime.effects()[0], RuntimeEffect::Local(_))); } + #[test] + fn drained_local_event_is_not_peeked_or_recounted() { + let parent = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::registered( + parent, + ConnectionDirection::Parent, + vec![], + ConnectionGeneration::INITIAL, + )); + + let mut endpoint = + ProtocolEndpoint::new(vec![String::from("agent")], Some(vec![]), vec![], vec![]); + endpoint + .add_endpoint_procedure("org.example.v1.echo.invoke") + .expect("procedure registers"); + let frame = encode_packet( + &PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent")], + dst_leaf: None, + hook_id: None, + }, + &CallMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![], + response_hook: None, + }, + ) + .expect("frame encodes"); + + let transport = RecordingTransport { + inbound: Some((parent, frame)), + sent: Vec::new(), + fail_send: false, + }; + let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport); + + let first = runtime.tick(TickBudget::default()).expect("tick succeeds"); + assert_eq!(first.local_events, 1); + + let drained: Vec<_> = runtime.drain_local_effects().collect(); + assert_eq!(drained.len(), 1); + assert!(matches!(drained[0], RuntimeEffect::Local(_))); + assert!(runtime.effects().is_empty()); + + let second = runtime.tick(TickBudget::default()).expect("tick succeeds"); + assert_eq!(second.local_events, 0); + assert!(runtime.effects().is_empty()); + } + #[test] fn tick_counts_only_new_dropped_frames() { let child = ConnectionId::new(1); @@ -696,4 +755,56 @@ mod tests { assert_eq!(second.dropped_frames, 0); assert!(matches!(runtime.effects()[0], RuntimeEffect::Dropped)); } + + #[test] + fn drained_dropped_effect_is_not_peeked_or_recounted() { + let child = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::registered( + child, + ConnectionDirection::Child, + vec![String::from("agent"), String::from("kid")], + ConnectionGeneration::INITIAL, + )); + + let mut endpoint = + ProtocolEndpoint::new(vec![String::from("agent")], Some(vec![]), vec![], vec![]); + endpoint + .add_endpoint_procedure("org.example.v1.echo.invoke") + .expect("procedure registers"); + let frame = encode_packet( + &PacketHeader { + packet_type: PacketType::Call, + src_path: vec![String::from("agent"), String::from("kid")], + dst_path: vec![String::from("agent")], + dst_leaf: None, + hook_id: None, + }, + &CallMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![], + response_hook: None, + }, + ) + .expect("frame encodes"); + + let transport = RecordingTransport { + inbound: Some((child, frame)), + sent: Vec::new(), + fail_send: false, + }; + let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport); + + let first = runtime.tick(TickBudget::default()).expect("tick succeeds"); + assert_eq!(first.dropped_frames, 1); + + let drained: Vec<_> = runtime.drain_local_effects().collect(); + assert_eq!(drained.len(), 1); + assert!(matches!(drained[0], RuntimeEffect::Dropped)); + assert!(runtime.effects().is_empty()); + + let second = runtime.tick(TickBudget::default()).expect("tick succeeds"); + assert_eq!(second.dropped_frames, 0); + assert!(runtime.effects().is_empty()); + } }