From 7b5b148ef36b3c5ec21dfcb9bf6b060bb8c33300 Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Sat, 25 Apr 2026 11:46:45 -0600 Subject: [PATCH] Reduce protocol packet-flow allocations Replace vector-backed endpoint outcomes with single-action results, skip payload deserialization on forwarded data and faults, and route local call and data emissions without encode/decode roundtrips. --- src/protocol/tests/tree.rs | 10 +- src/protocol/tree/endpoint/builders.rs | 183 ++++++++++++------ src/protocol/tree/endpoint/core.rs | 40 +++- src/protocol/tree/endpoint/hooks.rs | 69 ++----- src/protocol/tree/endpoint/introspection.rs | 23 +-- src/protocol/tree/endpoint/receive.rs | 123 +++++------- treetest/src/sim/actions/calls.rs | 6 +- treetest/src/sim/runtime/dispatch.rs | 10 +- .../src/sim/runtime/events/application.rs | 24 +-- treetest/src/sim/runtime/events/local.rs | 6 +- 10 files changed, 266 insertions(+), 228 deletions(-) diff --git a/src/protocol/tests/tree.rs b/src/protocol/tests/tree.rs index 4293c4d..3d1aa7b 100644 --- a/src/protocol/tests/tree.rs +++ b/src/protocol/tests/tree.rs @@ -76,13 +76,12 @@ fn protocol_endpoint_introspection_returns_leaf_summary() { .receive(&Ingress::Local, frame) .expect("endpoint should handle introspection"); - assert!(outcome.forwards.is_empty()); - assert_eq!(outcome.events.len(), 1); + assert!(outcome.forward.is_none()); let LocalEvent::Data { header, message: response, - } = &outcome.events[0] + } = outcome.event.as_ref().expect("expected local data event") else { panic!("expected local data event"); }; @@ -139,11 +138,10 @@ fn invalid_hook_peer_emits_local_fault_event() { .receive(&Ingress::Local, frame) .expect("invalid peer should be handled"); - assert!(outcome.forwards.is_empty()); - assert_eq!(outcome.events.len(), 1); + assert!(outcome.forward.is_none()); assert!(!outcome.dropped); - match &outcome.events[0] { + match outcome.event.as_ref().expect("expected event") { LocalEvent::Fault { header, message } => { assert_eq!(header.packet_type, PacketType::Fault); assert_eq!(header.hook_id, Some(hook_id)); diff --git a/src/protocol/tree/endpoint/builders.rs b/src/protocol/tree/endpoint/builders.rs index e265f05..036fc81 100644 --- a/src/protocol/tree/endpoint/builders.rs +++ b/src/protocol/tree/endpoint/builders.rs @@ -11,10 +11,95 @@ use crate::protocol::{ encode_packet, validate_call, validate_header, validate_procedure_id, }; -use super::core::{ChildRoute, EndpointError, ProtocolEndpoint}; +use super::super::RouteDecision; +use super::core::{ChildRoute, EndpointError, EndpointOutcome, ProtocolEndpoint}; use crate::protocol::tree::LeafSpec; impl ProtocolEndpoint { + fn prepare_call( + &self, + dst_path: Vec, + dst_leaf: Option, + procedure_id: impl Into, + response_hook_id: Option, + data: Vec, + ) -> Result<(PacketHeader, CallMessage), EndpointError> { + let procedure_id = procedure_id.into(); + validate_procedure_id(&procedure_id)?; + + let response_hook = response_hook_id.map(|hook_id| HookTarget { + hook_id, + return_path: self.path.clone(), + }); + let header = PacketHeader { + packet_type: PacketType::Call, + src_path: self.path.clone(), + dst_path, + dst_leaf, + hook_id: None, + }; + let call = CallMessage { + procedure_id, + data, + response_hook, + }; + + validate_header(&header)?; + validate_call(&header, &call)?; + Ok((header, call)) + } + + fn register_outbound_call_hook( + &mut self, + header: &PacketHeader, + call: &CallMessage, + ) -> Result<(), EndpointError> { + if let Some(hook) = &call.response_hook + && self + .hooks + .insert_active(ActiveHook { + return_path: hook.return_path.clone(), + hook_id: hook.hook_id, + peer_path: header.dst_path.clone(), + procedure_id: call.procedure_id.clone(), + dst_leaf: header.dst_leaf.clone(), + peer_finished: false, + }) + .is_err() + { + return Err(EndpointError::Validation(ValidationError::InvalidHookId)); + } + Ok(()) + } + + fn prepare_data( + &self, + dst_path: Vec, + hook_id: u64, + procedure_id: impl Into, + data: Vec, + end_hook: bool, + ) -> Result<(PacketHeader, DataMessage), EndpointError> { + let procedure_id = procedure_id.into(); + validate_procedure_id(&procedure_id)?; + + let header = PacketHeader { + packet_type: PacketType::Data, + src_path: self.path.clone(), + dst_path, + dst_leaf: None, + hook_id: Some(hook_id), + }; + let message = DataMessage { + procedure_id, + data, + end_hook, + }; + + validate_header(&header)?; + Ok((header, message)) + } + /// Creates a runtime endpoint with static tree topology and leaf metadata. /// /// ``` @@ -69,48 +154,31 @@ impl ProtocolEndpoint { response_hook_id: Option, data: Vec, ) -> Result { - let procedure_id = procedure_id.into(); - validate_procedure_id(&procedure_id)?; - - let response_hook = response_hook_id.map(|hook_id| HookTarget { - hook_id, - return_path: self.path.clone(), - }); - let header = PacketHeader { - packet_type: PacketType::Call, - src_path: self.path.clone(), - dst_path: dst_path.clone(), - dst_leaf: dst_leaf.clone(), - hook_id: None, - }; - let call = CallMessage { - procedure_id: procedure_id.clone(), - data, - response_hook, - }; - - validate_header(&header)?; - validate_call(&header, &call)?; - - if let Some(hook) = &call.response_hook - && self - .hooks - .insert_active(ActiveHook { - return_path: hook.return_path.clone(), - hook_id: hook.hook_id, - peer_path: dst_path, - procedure_id, - dst_leaf, - peer_finished: false, - }) - .is_err() - { - return Err(EndpointError::Validation(ValidationError::InvalidHookId)); - } - + let (header, call) = + self.prepare_call(dst_path, dst_leaf, procedure_id, response_hook_id, data)?; + self.register_outbound_call_hook(&header, &call)?; Ok(encode_packet(&header, &call)?) } + /// Routes one locally originated `Call` without an encode/decode roundtrip. + pub fn send_call( + &mut self, + dst_path: Vec, + dst_leaf: Option, + procedure_id: impl Into, + response_hook_id: Option, + data: Vec, + ) -> Result { + let (header, call) = + self.prepare_call(dst_path, dst_leaf, procedure_id, response_hook_id, data)?; + self.register_outbound_call_hook(&header, &call)?; + + match self.decide_route(&header.dst_path) { + RouteDecision::Local => self.handle_local_call(header, call), + route => Ok(EndpointOutcome::forward(route, encode_packet(&header, &call)?)), + } + } + /// Builds an outbound `Data` packet for an existing hook. pub fn make_data( &self, @@ -120,23 +188,24 @@ impl ProtocolEndpoint { data: Vec, end_hook: bool, ) -> Result { - let procedure_id = procedure_id.into(); - validate_procedure_id(&procedure_id)?; - - let header = PacketHeader { - packet_type: PacketType::Data, - src_path: self.path.clone(), - dst_path, - dst_leaf: None, - hook_id: Some(hook_id), - }; - let message = DataMessage { - procedure_id, - data, - end_hook, - }; - - validate_header(&header)?; + let (header, message) = self.prepare_data(dst_path, hook_id, procedure_id, data, end_hook)?; Ok(encode_packet(&header, &message)?) } + + /// Routes one locally originated `Data` packet without an encode/decode roundtrip. + pub fn send_data( + &mut self, + dst_path: Vec, + hook_id: u64, + procedure_id: impl Into, + data: Vec, + end_hook: bool, + ) -> Result { + let (header, message) = self.prepare_data(dst_path, hook_id, procedure_id, data, end_hook)?; + + match self.decide_route(&header.dst_path) { + RouteDecision::Local => self.handle_local_data(header, message), + route => Ok(EndpointOutcome::forward(route, encode_packet(&header, &message)?)), + } + } } diff --git a/src/protocol/tree/endpoint/core.rs b/src/protocol/tree/endpoint/core.rs index c02085c..c7bfd61 100644 --- a/src/protocol/tree/endpoint/core.rs +++ b/src/protocol/tree/endpoint/core.rs @@ -93,14 +93,46 @@ pub enum LocalEvent { /// Result of processing one framed packet. #[derive(Debug, Default)] pub struct EndpointOutcome { - /// Forwarding actions to perform after local processing. - pub forwards: Vec<(RouteDecision, FrameBytes)>, - /// Events delivered to local runtime consumers. - pub events: Vec, + /// Forwarding action to perform after local processing. + pub forward: Option<(RouteDecision, FrameBytes)>, + /// Event delivered to the local runtime consumer. + pub event: Option, /// Whether the packet was intentionally dropped with no other side effects. pub dropped: bool, } +impl EndpointOutcome { + /// Returns an outcome that only forwards one frame. + #[must_use] + pub fn forward(route: RouteDecision, frame: FrameBytes) -> Self { + Self { + forward: Some((route, frame)), + event: None, + dropped: false, + } + } + + /// Returns an outcome that only delivers one local event. + #[must_use] + pub fn event(event: LocalEvent) -> Self { + Self { + forward: None, + event: Some(event), + dropped: false, + } + } + + /// Returns an outcome that silently drops the packet. + #[must_use] + pub fn dropped() -> Self { + Self { + forward: None, + event: None, + dropped: true, + } + } +} + /// Errors returned while decoding or validating a packet. #[derive(Debug)] pub enum EndpointError { diff --git a/src/protocol/tree/endpoint/hooks.rs b/src/protocol/tree/endpoint/hooks.rs index 8f14b95..7db1387 100644 --- a/src/protocol/tree/endpoint/hooks.rs +++ b/src/protocol/tree/endpoint/hooks.rs @@ -3,7 +3,7 @@ //! These methods implement the hook lifecycle described in `PROTOCOL.md`: //! pending contexts, active contexts, peer validation, and fault emission. -use alloc::{string::String, vec}; +use alloc::string::String; use crate::protocol::{ DataMessage, FaultMessage, PacketHeader, PacketType, ProtocolFault, encode_packet, @@ -20,10 +20,7 @@ impl ProtocolEndpoint { fault: ProtocolFault, ) -> Result { let Some(key) = key else { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); + return Ok(EndpointOutcome::dropped()); }; self.hooks.remove_pending(&key); @@ -40,16 +37,10 @@ impl ProtocolEndpoint { let route = self.decide_route(&key.return_path); match route { - RouteDecision::Local => Ok(EndpointOutcome { - events: vec![LocalEvent::Fault { header, message }], - ..EndpointOutcome::default() - }), + RouteDecision::Local => Ok(EndpointOutcome::event(LocalEvent::Fault { header, message })), _ => { let frame = encode_packet(&header, &message)?; - Ok(EndpointOutcome { - forwards: vec![(route, frame)], - ..EndpointOutcome::default() - }) + Ok(EndpointOutcome::forward(route, frame)) } } } @@ -85,47 +76,35 @@ impl ProtocolEndpoint { } let Some(active) = self.hooks.active(&key) else { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); + return Ok(EndpointOutcome::dropped()); }; if active.peer_path != header.src_path { self.hooks.remove_active(&key); self.hooks.remove_pending(&key); - return Ok(EndpointOutcome { - events: vec![LocalEvent::Fault { - header: PacketHeader { - packet_type: PacketType::Fault, - src_path: header.src_path, - dst_path: self.path.clone(), - dst_leaf: None, - hook_id: Some(key.hook_id), - }, - message: FaultMessage { - fault: ProtocolFault::INVALID_HOOK_PEER, - }, - }], - ..EndpointOutcome::default() - }); + return Ok(EndpointOutcome::event(LocalEvent::Fault { + header: PacketHeader { + packet_type: PacketType::Fault, + src_path: header.src_path, + dst_path: self.path.clone(), + dst_leaf: None, + hook_id: Some(key.hook_id), + }, + message: FaultMessage { + fault: ProtocolFault::INVALID_HOOK_PEER, + }, + })); } if active.procedure_id != message.procedure_id { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); + return Ok(EndpointOutcome::dropped()); } if message.end_hook { self.hooks.remove_active(&key); } - Ok(EndpointOutcome { - events: vec![LocalEvent::Data { header, message }], - ..EndpointOutcome::default() - }) + Ok(EndpointOutcome::event(LocalEvent::Data { header, message })) } /// Handles locally delivered hook `Fault` packets. @@ -145,19 +124,13 @@ impl ProtocolEndpoint { .is_some_and(|pending| pending.caller_src_path == header.src_path); if !matches { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); + return Ok(EndpointOutcome::dropped()); } self.hooks.remove_active(&key); self.hooks.remove_pending(&key); - Ok(EndpointOutcome { - events: vec![LocalEvent::Fault { header, message }], - ..EndpointOutcome::default() - }) + Ok(EndpointOutcome::event(LocalEvent::Fault { header, message })) } /// Chooses the next hop using the protocol's longest-prefix routing rule. diff --git a/src/protocol/tree/endpoint/introspection.rs b/src/protocol/tree/endpoint/introspection.rs index 2aa1198..ed5e9a1 100644 --- a/src/protocol/tree/endpoint/introspection.rs +++ b/src/protocol/tree/endpoint/introspection.rs @@ -3,7 +3,7 @@ //! This code implements the reserved empty-procedure behavior from the //! introspection sections of `PROTOCOL.md`. -use alloc::{string::String, vec}; +use alloc::string::String; use rkyv::{rancor::Error as RkyvError, to_bytes}; use crate::protocol::{ @@ -22,14 +22,9 @@ impl ProtocolEndpoint { key: Option, ) -> Result { let Some(key) = key else { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); + return Ok(EndpointOutcome::dropped()); }; - self.hooks.activate_pending(&key, header.src_path.clone()); - let payload = if let Some(leaf_name) = &header.dst_leaf { let Some(leaf) = self.leaves.get(leaf_name) else { return self.emit_fault_if_possible(Some(key), ProtocolFault::UNKNOWN_LEAF); @@ -77,19 +72,15 @@ impl ProtocolEndpoint { let route = self.decide_route(&key.return_path); match route { - super::super::RouteDecision::Local => Ok(EndpointOutcome { - events: vec![super::core::LocalEvent::Data { + super::super::RouteDecision::Local => Ok(EndpointOutcome::event( + super::core::LocalEvent::Data { header: response_header, message: response, - }], - ..EndpointOutcome::default() - }), + }, + )), _ => { let frame = encode_packet(&response_header, &response)?; - Ok(EndpointOutcome { - forwards: vec![(route, frame)], - ..EndpointOutcome::default() - }) + Ok(EndpointOutcome::forward(route, frame)) } } } diff --git a/src/protocol/tree/endpoint/receive.rs b/src/protocol/tree/endpoint/receive.rs index 84c2279..ccb94bf 100644 --- a/src/protocol/tree/endpoint/receive.rs +++ b/src/protocol/tree/endpoint/receive.rs @@ -3,14 +3,12 @@ //! This file implements the transport-facing packet entry point and maps it to //! the `Call`, `Data`, and `Fault` sections of `PROTOCOL.md`. -use alloc::vec; - use crate::protocol::{ CallMessage, PacketType, ProtocolFault, decode_frame, introspection::INTROSPECTION_PROCEDURE_ID, validate_call, validate_header, }; -use super::super::{HookKey, PendingHook, RouteDecision}; +use super::super::{ActiveHook, HookKey, RouteDecision}; use super::core::{ Endpoint, EndpointError, EndpointOutcome, Ingress, LocalEvent, ProtocolEndpoint, }; @@ -27,22 +25,6 @@ impl ProtocolEndpoint { .as_ref() .map(|hook| HookKey::new(hook.return_path.clone(), hook.hook_id)); - if let Some(hook) = &message.response_hook - && hook.return_path != self.path - && self - .hooks - .insert_pending(PendingHook { - caller_src_path: header.src_path.clone(), - return_path: hook.return_path.clone(), - hook_id: hook.hook_id, - procedure_id: message.procedure_id.clone(), - dst_leaf: header.dst_leaf.clone(), - }) - .is_err() - { - return self.emit_fault_if_possible(key, ProtocolFault::INTERNAL_ERROR); - } - if message.procedure_id == INTROSPECTION_PROCEDURE_ID { return self.handle_introspection(&header, key); } @@ -73,14 +55,24 @@ impl ProtocolEndpoint { return self.emit_fault_if_possible(key, fault); } - if let Some(key) = &key { - self.hooks.activate_pending(key, header.src_path.clone()); + if let Some(hook) = &message.response_hook + && hook.return_path != self.path + && self + .hooks + .insert_active(ActiveHook { + return_path: hook.return_path.clone(), + hook_id: hook.hook_id, + peer_path: header.src_path.clone(), + procedure_id: message.procedure_id.clone(), + dst_leaf: header.dst_leaf.clone(), + peer_finished: false, + }) + .is_err() + { + return self.emit_fault_if_possible(key, ProtocolFault::INTERNAL_ERROR); } - Ok(EndpointOutcome { - events: vec![LocalEvent::Call { header, message }], - ..EndpointOutcome::default() - }) + Ok(EndpointOutcome::event(LocalEvent::Call { header, message })) } } @@ -99,73 +91,56 @@ impl Endpoint for ProtocolEndpoint { validate_header(header)?; if !self.valid_source_for_ingress(ingress, &header.src_path) { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); + return Ok(EndpointOutcome::dropped()); } match header.packet_type { PacketType::Call => { let message = parsed.deserialize_call()?; if !matches!(ingress, Ingress::Parent | Ingress::Local) { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); + return Ok(EndpointOutcome::dropped()); } validate_call(header, &message)?; match self.decide_route(&header.dst_path) { - RouteDecision::Child(index) => Ok(EndpointOutcome { - forwards: vec![(RouteDecision::Child(index), frame)], - ..EndpointOutcome::default() - }), - RouteDecision::Parent => Ok(EndpointOutcome { - forwards: vec![(RouteDecision::Parent, frame)], - ..EndpointOutcome::default() - }), - RouteDecision::Drop => Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }), - RouteDecision::Local => self.handle_local_call(header.clone(), message), + RouteDecision::Child(index) => { + Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame)) + } + RouteDecision::Parent => { + Ok(EndpointOutcome::forward(RouteDecision::Parent, frame)) + } + RouteDecision::Drop => Ok(EndpointOutcome::dropped()), + RouteDecision::Local => self.handle_local_call(parsed.deserialize_header(), message), } } PacketType::Data => { - let message = parsed.deserialize_data()?; match self.decide_route(&header.dst_path) { - RouteDecision::Local => self.handle_local_data(header.clone(), message), - RouteDecision::Child(index) => Ok(EndpointOutcome { - forwards: vec![(RouteDecision::Child(index), frame)], - ..EndpointOutcome::default() - }), - RouteDecision::Parent => Ok(EndpointOutcome { - forwards: vec![(RouteDecision::Parent, frame)], - ..EndpointOutcome::default() - }), - RouteDecision::Drop => Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }), + RouteDecision::Local => { + let message = parsed.deserialize_data()?; + self.handle_local_data(parsed.deserialize_header(), message) + } + RouteDecision::Child(index) => { + Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame)) + } + RouteDecision::Parent => { + Ok(EndpointOutcome::forward(RouteDecision::Parent, frame)) + } + RouteDecision::Drop => Ok(EndpointOutcome::dropped()), } } PacketType::Fault => { - let message = parsed.deserialize_fault()?; match self.decide_route(&header.dst_path) { - RouteDecision::Local => self.handle_local_fault(header.clone(), message), - RouteDecision::Child(index) => Ok(EndpointOutcome { - forwards: vec![(RouteDecision::Child(index), frame)], - ..EndpointOutcome::default() - }), - RouteDecision::Parent => Ok(EndpointOutcome { - forwards: vec![(RouteDecision::Parent, frame)], - ..EndpointOutcome::default() - }), - RouteDecision::Drop => Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }), + RouteDecision::Local => { + let message = parsed.deserialize_fault()?; + self.handle_local_fault(parsed.deserialize_header(), message) + } + RouteDecision::Child(index) => { + Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame)) + } + RouteDecision::Parent => { + Ok(EndpointOutcome::forward(RouteDecision::Parent, frame)) + } + RouteDecision::Drop => Ok(EndpointOutcome::dropped()), } } } diff --git a/treetest/src/sim/actions/calls.rs b/treetest/src/sim/actions/calls.rs index 06da104..763f82f 100644 --- a/treetest/src/sim/actions/calls.rs +++ b/treetest/src/sim/actions/calls.rs @@ -190,9 +190,9 @@ impl Simulation { .cloned() .ok_or(SimError::UnknownHook(hook_id))?; - let frame = self.nodes[self.root_id.0] + let outcome = self.nodes[self.root_id.0] .endpoint - .make_data( + .send_data( snapshot.peer_path.clone(), hook_id, snapshot.procedure_id.clone(), @@ -208,7 +208,7 @@ impl Simulation { format_hook_ref(self.node(self.root_id).path.as_slice(), hook_id) ), ); - self.process_local_frame(self.root_id, frame)?; + self.process_outcome(self.root_id, outcome)?; Ok(ActionResult { label: format!("Send hook data {hook_id}"), hook_id: Some(hook_id), diff --git a/treetest/src/sim/runtime/dispatch.rs b/treetest/src/sim/runtime/dispatch.rs index 89ce720..d3bb2e5 100644 --- a/treetest/src/sim/runtime/dispatch.rs +++ b/treetest/src/sim/runtime/dispatch.rs @@ -23,9 +23,9 @@ impl Simulation { // Hook allocation happens on the root host because the root is the hook // owner for every user-driven action in the demo. let hook_id = self.nodes[self.root_id.0].endpoint.allocate_hook_id(); - let frame = self.nodes[self.root_id.0] + let outcome = self.nodes[self.root_id.0] .endpoint - .make_call( + .send_call( dst_path.clone(), dst_leaf.clone(), procedure_id.to_owned(), @@ -65,7 +65,7 @@ impl Simulation { .unwrap_or_default() ), ); - self.process_local_frame(self.root_id, frame) + self.process_outcome(self.root_id, outcome) } /// Delivers a frame into one endpoint as locally-originated traffic. @@ -91,7 +91,7 @@ impl Simulation { self.record_trace(node_id, "packet dropped".to_owned()); } - for (route, frame) in outcome.forwards { + if let Some((route, frame)) = outcome.forward { match route { RouteDecision::Child(index) => { let child_id = self.nodes[node_id.0] @@ -150,7 +150,7 @@ impl Simulation { } } - for event in outcome.events { + if let Some(event) = outcome.event { self.handle_local_event(node_id, event)?; } diff --git a/treetest/src/sim/runtime/events/application.rs b/treetest/src/sim/runtime/events/application.rs index 29713fc..45bfe45 100644 --- a/treetest/src/sim/runtime/events/application.rs +++ b/treetest/src/sim/runtime/events/application.rs @@ -28,7 +28,7 @@ impl Simulation { let leaf = self.require_leaf(node_id, leaf_name)?.clone(); match leaf.kind { LeafKind::Echo => { - let frame = self.make_endpoint_data_frame( + let outcome = self.send_endpoint_data( node_id, hook.return_path.clone(), hook.hook_id, @@ -37,7 +37,7 @@ impl Simulation { true, )?; self.record_trace(node_id, format!("leaf {leaf_name} echoed {} bytes", message.data.len())); - self.process_local_frame(node_id, frame)?; + self.process_outcome(node_id, outcome)?; } } return Ok(()); @@ -51,7 +51,7 @@ impl Simulation { match procedure.kind { EndpointProcedureKind::Ping => { let reply = format!("pong from {}", self.node(node_id).display_path()); - let frame = self.make_endpoint_data_frame( + let outcome = self.send_endpoint_data( node_id, hook.return_path.clone(), hook.hook_id, @@ -60,7 +60,7 @@ impl Simulation { true, )?; self.record_trace(node_id, format!("endpoint sent ping reply: {reply}")); - self.process_local_frame(node_id, frame)?; + self.process_outcome(node_id, outcome)?; } EndpointProcedureKind::ChunkedGreeting => { for (index, text) in [ @@ -71,7 +71,7 @@ impl Simulation { .iter() .enumerate() { - let frame = self.make_endpoint_data_frame( + let outcome = self.send_endpoint_data( node_id, hook.return_path.clone(), hook.hook_id, @@ -80,7 +80,7 @@ impl Simulation { index == 2, )?; self.record_trace(node_id, format!("endpoint sent chunk {}", index + 1)); - self.process_local_frame(node_id, frame)?; + self.process_outcome(node_id, outcome)?; } } EndpointProcedureKind::Chat => { @@ -95,7 +95,7 @@ impl Simulation { procedure_id: procedure.procedure_id.clone(), }, ); - let frame = self.make_endpoint_data_frame( + let outcome = self.send_endpoint_data( node_id, hook.return_path.clone(), hook.hook_id, @@ -104,15 +104,15 @@ impl Simulation { false, )?; self.record_trace(node_id, "chat handler opened session".to_owned()); - self.process_local_frame(node_id, frame)?; + self.process_outcome(node_id, outcome)?; } } Ok(()) } - /// Builds one endpoint-originated data frame after application logic decides + /// Routes one endpoint-originated data packet after application logic decides /// what to send back on an already-validated hook. - fn make_endpoint_data_frame( + fn send_endpoint_data( &mut self, node_id: NodeId, return_path: Vec, @@ -120,10 +120,10 @@ impl Simulation { procedure_id: String, data: Vec, end_hook: bool, - ) -> Result { + ) -> Result { self.nodes[node_id.0] .endpoint - .make_data(return_path, hook_id, procedure_id, data, end_hook) + .send_data(return_path, hook_id, procedure_id, data, end_hook) .map_err(|error| SimError::Protocol(error.to_string())) } diff --git a/treetest/src/sim/runtime/events/local.rs b/treetest/src/sim/runtime/events/local.rs index 5da32b7..81866f2 100644 --- a/treetest/src/sim/runtime/events/local.rs +++ b/treetest/src/sim/runtime/events/local.rs @@ -40,9 +40,9 @@ impl Simulation { let reply = chat_reply_for_text(&text); if let Some((reply, end_hook)) = reply { - let frame = self.nodes[session.node_id.0] + let outcome = self.nodes[session.node_id.0] .endpoint - .make_data( + .send_data( session.host_path.clone(), session.hook_id, session.procedure_id.clone(), @@ -51,7 +51,7 @@ impl Simulation { ) .map_err(|error| SimError::Protocol(error.to_string()))?; self.record_trace(session.node_id, format!("chat handler sent: {reply}")); - self.process_local_frame(session.node_id, frame)?; + self.process_outcome(session.node_id, outcome)?; if end_hook { self.chat_sessions.remove(&session.hook_id); }