Add local runtime effect draining

This commit is contained in:
Michael Mikovsky
2026-05-09 12:59:07 -06:00
parent 4e3f975b54
commit fcde5d66d2
3 changed files with 173 additions and 0 deletions
+3
View File
@@ -144,6 +144,8 @@ Primary operations:
impl<T: Transport> NodeRuntime<T> { impl<T: Transport> NodeRuntime<T> {
pub fn tick(&mut self, budget: TickBudget) -> Result<TickOutcome, NodeRuntimeError<T::Error>>; pub fn tick(&mut self, budget: TickBudget) -> Result<TickOutcome, NodeRuntimeError<T::Error>>;
pub fn drain_local_effects(&mut self) -> impl Iterator<Item = RuntimeEffect>;
pub fn receive_frame( pub fn receive_frame(
&mut self, &mut self,
connection: ConnectionId, connection: ConnectionId,
@@ -168,6 +170,7 @@ Rules:
- Callers never pass `Ingress` into `NodeRuntime`. - Callers never pass `Ingress` into `NodeRuntime`.
- Runtime counts per-tick progress, not retained backlog. - Runtime counts per-tick progress, not retained backlog.
- Local events should be dispatched to leaves, not retained forever. - Local events should be dispatched to leaves, not retained forever.
- Until leaf dispatch exists, callers may drain local/dropped effects; outbound sends remain runtime-owned.
- Send failures must not drop unrelated queued effects. - Send failures must not drop unrelated queued effects.
## Leaf API ## Leaf API
+59
View File
@@ -53,4 +53,63 @@ impl EffectQueue {
pub fn drain(&mut self) -> impl Iterator<Item = RuntimeEffect> + '_ { pub fn drain(&mut self) -> impl Iterator<Item = RuntimeEffect> + '_ {
self.entries.drain(..) self.entries.drain(..)
} }
/// Drains local-dispatch effects in FIFO order, leaving outbound sends queued.
pub fn drain_local(&mut self) -> impl Iterator<Item = RuntimeEffect> {
let mut drained = Vec::new();
let mut retained = Vec::with_capacity(self.entries.len());
for effect in self.entries.drain(..) {
match effect {
RuntimeEffect::Local(_) | RuntimeEffect::Dropped => drained.push(effect),
RuntimeEffect::SendFrame { .. } => retained.push(effect),
}
}
self.entries = retained;
drained.into_iter()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn drain_local_leaves_outbound_sends_queued() {
let first = ConnectionId::new(1);
let second = ConnectionId::new(2);
let mut queue = EffectQueue::new();
queue.push(RuntimeEffect::SendFrame {
connection: first,
generation: ConnectionGeneration::INITIAL,
frame: FrameBytes::new(),
});
queue.push(RuntimeEffect::Dropped);
queue.push(RuntimeEffect::SendFrame {
connection: second,
generation: ConnectionGeneration::INITIAL,
frame: FrameBytes::new(),
});
queue.push(RuntimeEffect::Dropped);
let drained: Vec<_> = queue.drain_local().collect();
assert_eq!(drained.len(), 2);
assert!(
drained
.iter()
.all(|effect| matches!(effect, RuntimeEffect::Dropped))
);
assert_eq!(queue.entries().len(), 2);
assert!(matches!(
queue.entries()[0],
RuntimeEffect::SendFrame { connection, .. } if connection == first
));
assert!(matches!(
queue.entries()[1],
RuntimeEffect::SendFrame { connection, .. } if connection == second
));
}
} }
+111
View File
@@ -141,6 +141,13 @@ impl<T> NodeRuntime<T> {
pub fn effects(&self) -> &[RuntimeEffect] { pub fn effects(&self) -> &[RuntimeEffect] {
self.effects.entries() self.effects.entries()
} }
/// Drains queued local-dispatch effects in FIFO order.
///
/// Outbound frame effects remain queued for runtime-owned transport flushing.
pub fn drain_local_effects(&mut self) -> impl Iterator<Item = RuntimeEffect> {
self.effects.drain_local()
}
} }
impl<T> NodeRuntime<T> impl<T> NodeRuntime<T>
@@ -649,6 +656,58 @@ mod tests {
assert!(matches!(runtime.effects()[0], RuntimeEffect::Local(_))); assert!(matches!(runtime.effects()[0], RuntimeEffect::Local(_)));
} }
#[test]
fn drained_local_event_is_not_peeked_or_recounted() {
let parent = ConnectionId::new(1);
let mut connections = Connections::new();
connections.push(Connection::registered(
parent,
ConnectionDirection::Parent,
vec![],
ConnectionGeneration::INITIAL,
));
let mut endpoint =
ProtocolEndpoint::new(vec![String::from("agent")], Some(vec![]), vec![], vec![]);
endpoint
.add_endpoint_procedure("org.example.v1.echo.invoke")
.expect("procedure registers");
let frame = encode_packet(
&PacketHeader {
packet_type: PacketType::Call,
src_path: vec![],
dst_path: vec![String::from("agent")],
dst_leaf: None,
hook_id: None,
},
&CallMessage {
procedure_id: String::from("org.example.v1.echo.invoke"),
data: vec![],
response_hook: None,
},
)
.expect("frame encodes");
let transport = RecordingTransport {
inbound: Some((parent, frame)),
sent: Vec::new(),
fail_send: false,
};
let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport);
let first = runtime.tick(TickBudget::default()).expect("tick succeeds");
assert_eq!(first.local_events, 1);
let drained: Vec<_> = runtime.drain_local_effects().collect();
assert_eq!(drained.len(), 1);
assert!(matches!(drained[0], RuntimeEffect::Local(_)));
assert!(runtime.effects().is_empty());
let second = runtime.tick(TickBudget::default()).expect("tick succeeds");
assert_eq!(second.local_events, 0);
assert!(runtime.effects().is_empty());
}
#[test] #[test]
fn tick_counts_only_new_dropped_frames() { fn tick_counts_only_new_dropped_frames() {
let child = ConnectionId::new(1); let child = ConnectionId::new(1);
@@ -696,4 +755,56 @@ mod tests {
assert_eq!(second.dropped_frames, 0); assert_eq!(second.dropped_frames, 0);
assert!(matches!(runtime.effects()[0], RuntimeEffect::Dropped)); assert!(matches!(runtime.effects()[0], RuntimeEffect::Dropped));
} }
#[test]
fn drained_dropped_effect_is_not_peeked_or_recounted() {
let child = ConnectionId::new(1);
let mut connections = Connections::new();
connections.push(Connection::registered(
child,
ConnectionDirection::Child,
vec![String::from("agent"), String::from("kid")],
ConnectionGeneration::INITIAL,
));
let mut endpoint =
ProtocolEndpoint::new(vec![String::from("agent")], Some(vec![]), vec![], vec![]);
endpoint
.add_endpoint_procedure("org.example.v1.echo.invoke")
.expect("procedure registers");
let frame = encode_packet(
&PacketHeader {
packet_type: PacketType::Call,
src_path: vec![String::from("agent"), String::from("kid")],
dst_path: vec![String::from("agent")],
dst_leaf: None,
hook_id: None,
},
&CallMessage {
procedure_id: String::from("org.example.v1.echo.invoke"),
data: vec![],
response_hook: None,
},
)
.expect("frame encodes");
let transport = RecordingTransport {
inbound: Some((child, frame)),
sent: Vec::new(),
fail_send: false,
};
let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport);
let first = runtime.tick(TickBudget::default()).expect("tick succeeds");
assert_eq!(first.dropped_frames, 1);
let drained: Vec<_> = runtime.drain_local_effects().collect();
assert_eq!(drained.len(), 1);
assert!(matches!(drained[0], RuntimeEffect::Dropped));
assert!(runtime.effects().is_empty());
let second = runtime.tick(TickBudget::default()).expect("tick succeeds");
assert_eq!(second.dropped_frames, 0);
assert!(runtime.effects().is_empty());
}
} }