From 31a0bd39b0aa60cdad3741093f46fd01e32273c0 Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Sat, 25 Apr 2026 20:47:37 -0600 Subject: [PATCH] Simplify endpoint outcome state handling --- examples/protocol/bench/bench.rs | 12 ++-- .../protocol/bench/support/bench_common.rs | 24 +++---- examples/protocol/leaf_derive.rs | 9 +-- examples/protocol/remote_shell_receive.rs | 4 +- src/leaf/remote_shell/transport.rs | 12 ++-- src/protocol/tests/call.rs | 6 +- src/protocol/tests/procedure.rs | 28 +++++--- src/protocol/tests/tree.rs | 20 +++--- src/protocol/tree/call.rs | 69 +++++++++++++++---- src/protocol/tree/endpoint/builders.rs | 18 ++--- src/protocol/tree/endpoint/core.rs | 52 +++----------- src/protocol/tree/endpoint/hooks.rs | 24 +++---- src/protocol/tree/endpoint/introspection.rs | 12 ++-- src/protocol/tree/endpoint/mod.rs | 4 +- src/protocol/tree/endpoint/receive.rs | 50 ++++++++------ src/protocol/tree/mod.rs | 4 +- src/protocol/tree/procedure.rs | 56 ++++++++++----- treetest/src/sim/build.rs | 4 +- treetest/src/sim/runtime/dispatch.rs | 18 +++-- 19 files changed, 234 insertions(+), 192 deletions(-) diff --git a/examples/protocol/bench/bench.rs b/examples/protocol/bench/bench.rs index c072e00..122fc7c 100644 --- a/examples/protocol/bench/bench.rs +++ b/examples/protocol/bench/bench.rs @@ -4,7 +4,7 @@ use std::process::Command; use std::time::Instant; use unshell::protocol::tree::{ - ChildRoute, Endpoint, Ingress, LeafSpec, LocalEvent, ProtocolEndpoint, + ChildRoute, Endpoint, EndpointOutcome, Ingress, LeafSpec, LocalEvent, ProtocolEndpoint, }; use unshell::protocol::{CallMessage, PacketHeader, PacketType, decode_frame, encode_packet}; @@ -105,7 +105,7 @@ fn bench_forward_call_receive() -> BenchResult { let outcome = root .receive(&Ingress::Local, frame) .expect("forward receive should work"); - black_box(outcome.forward.is_some()); + black_box(matches!(outcome, EndpointOutcome::Forward { .. })); }, ) } @@ -118,8 +118,8 @@ fn bench_local_call_receive() -> BenchResult { let outcome = endpoint .receive(&Ingress::Parent, frame) .expect("local call should work"); - match black_box(outcome.event) { - Some(LocalEvent::Call { .. }) => {} + match black_box(outcome) { + EndpointOutcome::Local(LocalEvent::Call { .. }) => {} other => panic!("expected local call event, got {other:?}"), } }, @@ -134,8 +134,8 @@ fn bench_hook_data_receive() -> BenchResult { let outcome = host .receive(&Ingress::Child(path(&["worker"])), frame) .expect("hook data should work"); - match black_box(outcome.event) { - Some(LocalEvent::Data { .. }) => {} + match black_box(outcome) { + EndpointOutcome::Local(LocalEvent::Data { .. }) => {} other => panic!("expected local data event, got {other:?}"), } }, diff --git a/examples/protocol/bench/support/bench_common.rs b/examples/protocol/bench/support/bench_common.rs index 3f5d826..d581e42 100644 --- a/examples/protocol/bench/support/bench_common.rs +++ b/examples/protocol/bench/support/bench_common.rs @@ -3,7 +3,7 @@ use std::hint::black_box; use unshell::protocol::tree::{ - ChildRoute, Endpoint, Ingress, LeafSpec, LocalEvent, ProtocolEndpoint, + ChildRoute, Endpoint, EndpointOutcome, Ingress, LeafSpec, LocalEvent, ProtocolEndpoint, }; use unshell::protocol::{CallMessage, PacketHeader, PacketType, decode_frame, encode_packet}; @@ -94,14 +94,12 @@ pub fn run_forward_call_receive(iterations: usize) -> usize { let outcome = root .receive(&Ingress::Local, frame) .expect("forward receive should work"); - let forwarded = outcome - .forward - .as_ref() - .map(|(route, frame)| route_value(*route).wrapping_add(frame.len())) - .unwrap_or_default(); - checksum = checksum - .wrapping_add(forwarded) - .wrapping_add(outcome.dropped as usize); + let forwarded = match outcome { + EndpointOutcome::Forward { route, frame } => route_value(route).wrapping_add(frame.len()), + EndpointOutcome::Local(_) => 0, + EndpointOutcome::Dropped => usize::from(true), + }; + checksum = checksum.wrapping_add(forwarded); } black_box(checksum) } @@ -141,8 +139,8 @@ pub fn run_local_call_receive(iterations: usize) -> usize { let outcome = endpoint .receive(&Ingress::Parent, frame) .expect("local call should work"); - match outcome.event { - Some(LocalEvent::Call { header, message }) => { + match outcome { + EndpointOutcome::Local(LocalEvent::Call { header, message }) => { checksum = checksum .wrapping_add(header.dst_path.len()) .wrapping_add(header.src_path.len()) @@ -194,8 +192,8 @@ pub fn run_hook_data_receive(iterations: usize) -> usize { let outcome = host .receive(&Ingress::Child(path(&["worker"])), frame) .expect("hook data should work"); - match outcome.event { - Some(LocalEvent::Data { + match outcome { + EndpointOutcome::Local(LocalEvent::Data { header, message, .. }) => { checksum = checksum diff --git a/examples/protocol/leaf_derive.rs b/examples/protocol/leaf_derive.rs index 9ad5ccd..4c48e45 100644 --- a/examples/protocol/leaf_derive.rs +++ b/examples/protocol/leaf_derive.rs @@ -2,8 +2,9 @@ use std::error::Error; use std::{convert::Infallible, string::String}; use rkyv::{Archive, Deserialize, Serialize}; -use unshell::protocol::tree::{Call, CallLeaf, Ingress, LeafRuntime, ProtocolEndpoint}; -use unshell::protocol::tree::{ChildRoute, ConnectionState}; +use unshell::protocol::tree::{ + Call, CallLeaf, ChildRoute, EndpointOutcome, Ingress, LeafRuntime, ProtocolEndpoint, +}; use unshell::protocol::{PacketType, decode_frame}; use unshell::{Leaf, procedures}; @@ -60,7 +61,7 @@ fn main() -> Result<(), Box> { None, vec![ChildRoute { path: path(&["agent"]), - state: ConnectionState::Registered, + registered: true, }], Vec::new(), ); @@ -74,7 +75,7 @@ fn main() -> Result<(), Box> { text: String::from("hello leaf"), })?, )?; - let Some((_, frame)) = controller_outcome.forward else { + let EndpointOutcome::Forward { frame, .. } = controller_outcome else { return Err("expected controller to forward call".into()); }; diff --git a/examples/protocol/remote_shell_receive.rs b/examples/protocol/remote_shell_receive.rs index bbc8cbb..003899d 100644 --- a/examples/protocol/remote_shell_receive.rs +++ b/examples/protocol/remote_shell_receive.rs @@ -4,7 +4,7 @@ mod remote_shell; use std::error::Error; use std::net::TcpListener; -use unshell::protocol::tree::{Endpoint, Ingress, LocalEvent}; +use unshell::protocol::tree::{Endpoint, EndpointOutcome, Ingress, LocalEvent}; fn main() -> Result<(), Box> { let listener = TcpListener::bind(remote_shell::LISTEN_ADDR)?; @@ -46,7 +46,7 @@ fn main() -> Result<(), Box> { for result in frame_rx { let frame = result?; let outcome = endpoint.receive(&Ingress::Child(remote_shell::agent_path()), frame)?; - let Some(event) = outcome.event else { + let EndpointOutcome::Local(event) = outcome else { continue; }; diff --git a/src/leaf/remote_shell/transport.rs b/src/leaf/remote_shell/transport.rs index 1b0d80e..a6117bb 100644 --- a/src/leaf/remote_shell/transport.rs +++ b/src/leaf/remote_shell/transport.rs @@ -11,14 +11,10 @@ const MAX_FRAME_BYTES: usize = 1024 * 1024; #[allow(dead_code)] pub fn send_forward(stream: &mut TcpStream, outcome: EndpointOutcome) -> io::Result<()> { - write_frames( - stream, - &outcome - .forward - .into_iter() - .map(|(_, frame)| frame) - .collect::>(), - ) + match outcome { + EndpointOutcome::Forward { frame, .. } => write_frames(stream, &[frame]), + EndpointOutcome::Local(_) | EndpointOutcome::Dropped => write_frames(stream, &[]), + } } pub fn write_frames(stream: &mut TcpStream, frames: &[FrameBytes]) -> io::Result<()> { diff --git a/src/protocol/tests/call.rs b/src/protocol/tests/call.rs index 04f501e..7cb629c 100644 --- a/src/protocol/tests/call.rs +++ b/src/protocol/tests/call.rs @@ -4,7 +4,7 @@ use core::convert::Infallible; use rkyv::{Archive, Deserialize, Serialize}; use crate::protocol::tree::{ - Call, CallLeaf, ChildRoute, ConnectionState, Ingress, LeafRuntime, ProtocolEndpoint, + Call, CallLeaf, ChildRoute, EndpointOutcome, Ingress, LeafRuntime, ProtocolEndpoint, decode_call_input, encode_call_reply, }; use crate::protocol::{PacketType, decode_frame}; @@ -64,7 +64,7 @@ fn leaf_runtime_dispatches_generated_call_procedure() { None, vec![ChildRoute { path: path(&["agent"]), - state: ConnectionState::Registered, + registered: true, }], Vec::new(), ); @@ -81,7 +81,7 @@ fn leaf_runtime_dispatches_generated_call_procedure() { .expect("request should encode"), ) .expect("call should encode"); - let Some((_, frame)) = controller_outcome.forward else { + let EndpointOutcome::Forward { frame, .. } = controller_outcome else { panic!("controller should forward call to child"); }; diff --git a/src/protocol/tests/procedure.rs b/src/protocol/tests/procedure.rs index ee21fde..0d12f42 100644 --- a/src/protocol/tests/procedure.rs +++ b/src/protocol/tests/procedure.rs @@ -2,7 +2,7 @@ use alloc::{borrow::ToOwned, collections::BTreeMap, format, string::String, vec, use core::convert::Infallible; use crate::protocol::tree::{ - Call, ChildRoute, ConnectionState, Endpoint, HookKey, Ingress, OutgoingData, Procedure, + Call, ChildRoute, Endpoint, EndpointOutcome, HookKey, Ingress, OutgoingData, Procedure, ProcedureEffect, ProcedureRuntime, ProcedureStore, ProtocolEndpoint, encode_call_reply, }; use crate::protocol::{PacketType, decode_frame}; @@ -80,7 +80,7 @@ fn procedure_runtime_routes_data_to_stored_session() { None, vec![ChildRoute { path: path(&["agent"]), - state: ConnectionState::Registered, + registered: true, }], Vec::new(), ); @@ -94,7 +94,10 @@ fn procedure_runtime_routes_data_to_stored_session() { encode_call_reply(&String::from("prefix:")).expect("procedure input should encode"), ) .expect("open call should encode"); - let Some((_, open_frame)) = open.forward else { + let EndpointOutcome::Forward { + frame: open_frame, .. + } = open + else { panic!("controller should forward opening call"); }; runtime @@ -110,7 +113,10 @@ fn procedure_runtime_routes_data_to_stored_session() { true, ) .expect("data should encode"); - let Some((_, data_frame)) = data.forward else { + let EndpointOutcome::Forward { + frame: data_frame, .. + } = data + else { panic!("controller should forward data frame"); }; let outcome = runtime @@ -129,7 +135,7 @@ fn procedure_runtime_routes_data_to_stored_session() { let forwarded = controller .receive(&Ingress::Child(path(&["agent"])), response_frame.clone()) .expect("controller should receive session response"); - assert!(forwarded.event.is_some()); + assert!(matches!(forwarded, EndpointOutcome::Local(_))); assert!(runtime.leaf_mut().procedure_sessions().is_empty()); } @@ -204,7 +210,7 @@ fn procedure_runtime_keeps_session_after_local_end_until_explicit_close() { None, vec![ChildRoute { path: path(&["agent"]), - state: ConnectionState::Registered, + registered: true, }], Vec::new(), ); @@ -218,7 +224,10 @@ fn procedure_runtime_keeps_session_after_local_end_until_explicit_close() { encode_call_reply(&()).expect("unit call should encode"), ) .expect("open call should encode"); - let Some((_, open_frame)) = open.forward else { + let EndpointOutcome::Forward { + frame: open_frame, .. + } = open + else { panic!("controller should forward opening call"); }; runtime @@ -234,7 +243,10 @@ fn procedure_runtime_keeps_session_after_local_end_until_explicit_close() { false, ) .expect("local end trigger should encode"); - let Some((_, local_end_frame)) = local_end.forward else { + let EndpointOutcome::Forward { + frame: local_end_frame, .. + } = local_end + else { panic!("controller should forward local end trigger"); }; let outcome = runtime diff --git a/src/protocol/tests/tree.rs b/src/protocol/tests/tree.rs index 85a03fe..ced1ea9 100644 --- a/src/protocol/tests/tree.rs +++ b/src/protocol/tests/tree.rs @@ -1,8 +1,8 @@ use alloc::{borrow::ToOwned, string::String, vec, vec::Vec}; use crate::protocol::tree::{ - ChildRoute, DefaultRouteProvider, Endpoint, Ingress, LeafNode, LeafSpec, LocalEvent, - ProtocolEndpoint, RouteDecision, RouteProvider, TreeNode, + ChildRoute, DefaultRouteProvider, Endpoint, EndpointOutcome, Ingress, LeafNode, LeafSpec, + LocalEvent, ProtocolEndpoint, RouteDecision, RouteProvider, TreeNode, }; use crate::protocol::{ DataMessage, EndpointIntrospection, FaultMessage, PacketHeader, PacketType, ProtocolFault, @@ -76,13 +76,11 @@ fn protocol_endpoint_introspection_returns_leaf_summary() { .receive(&Ingress::Local, frame) .expect("endpoint should handle introspection"); - assert!(outcome.forward.is_none()); - - let LocalEvent::Data { + let EndpointOutcome::Local(LocalEvent::Data { header, message: response, .. - } = outcome.event.as_ref().expect("expected local data event") + }) = &outcome else { panic!("expected local data event"); }; @@ -167,10 +165,8 @@ fn invalid_hook_peer_emits_local_fault_event() { .receive(&Ingress::Child(path(&["intruder"])), frame) .expect("invalid peer should be handled"); - assert!(outcome.forward.is_none()); - assert!(!outcome.dropped); - - match outcome.event.as_ref().expect("expected local fault event") { + match &outcome { + EndpointOutcome::Local(event) => match event { LocalEvent::Fault { header, message, .. } => { @@ -184,6 +180,8 @@ fn invalid_hook_peer_emits_local_fault_event() { ); } other => panic!("expected fault event, got {other:?}"), + }, + other => panic!("expected local fault event, got {other:?}"), } } @@ -302,7 +300,7 @@ fn pending_hook_fault_is_delivered_before_activation() { ) .expect("introspection should handle pending hook"); - assert!(outcome.forward.is_some() || outcome.event.is_some()); + assert!(!matches!(outcome, EndpointOutcome::Dropped)); } #[test] diff --git a/src/protocol/tree/call.rs b/src/protocol/tree/call.rs index 033a905..802277a 100644 --- a/src/protocol/tree/call.rs +++ b/src/protocol/tree/call.rs @@ -16,65 +16,90 @@ use super::{ /// One typed incoming `Call` passed to a leaf procedure. #[derive(Debug, Clone, PartialEq, Eq)] pub struct Call { + /// Decoded application input payload. pub input: T, + /// Endpoint path of the caller that opened this call. pub caller_path: Vec, + /// Canonical procedure identifier chosen by the caller. pub procedure_id: String, + /// Optional destination leaf targeted by the call. pub dst_leaf: Option, + /// Hook key declared by the caller when it expects a response. pub response_hook: Option, } /// One incoming local call event that already passed protocol validation. #[derive(Debug, Clone, PartialEq, Eq)] pub struct IncomingCall { + /// Validated protocol header for the call. pub header: PacketHeader, + /// Application payload for the call. pub message: CallMessage, } /// One incoming local data event tied to an active hook. #[derive(Debug, Clone, PartialEq, Eq)] pub struct IncomingData { + /// Validated protocol header for the data packet. pub header: PacketHeader, + /// Hook-associated data payload. pub message: DataMessage, + /// Resolved hook key for the active session. pub hook_key: HookKey, } /// One incoming local fault event tied to a pending or active hook. #[derive(Debug, Clone, PartialEq, Eq)] pub struct IncomingFault { + /// Validated protocol header for the fault packet. pub header: PacketHeader, + /// Fault payload emitted by the peer. pub fault: crate::protocol::FaultMessage, + /// Hook key for the pending or active session that faulted. pub hook_key: HookKey, } /// Outcome of one generated initial call procedure. #[derive(Debug, Clone, PartialEq, Eq)] pub enum CallResult { + /// Return one reply payload to the caller. Reply(T), + /// Complete the call without any response data. NoReply, } /// One hook-associated `Data` packet emitted by leaf code. #[derive(Debug, Clone, PartialEq, Eq)] pub struct OutgoingData { + /// Destination endpoint path for the hook packet. pub dst_path: Vec, + /// Hook identifier scoped to the receiving endpoint. pub hook_id: u64, + /// Procedure identifier that owns this hook stream. pub procedure_id: String, + /// Serialized application data to send. pub data: Vec, + /// Whether this packet closes the local side of the hook. pub end_hook: bool, } /// One runtime-normalized reply produced by generated call dispatch. #[derive(Debug, Clone, PartialEq, Eq)] pub enum CallReply { + /// Serialized reply bytes that should be returned upstream. Reply(Vec), + /// Complete without emitting any reply packet. NoReply, } /// Error surfaced while decoding one incoming call or encoding one generated reply. #[derive(Debug)] pub enum DispatchError { + /// Failed to decode the typed call input. Decode(FrameError), + /// Failed to encode the typed call output. Encode(FrameError), + /// The leaf-specific call handler returned an error. Handler(E), } @@ -96,8 +121,11 @@ impl core::error::Error for DispatchError where E: core::error::Error + 's /// Error surfaced by the stateful leaf runtime. #[derive(Debug)] pub enum LeafRuntimeError { + /// Protocol endpoint routing or framing failed. Endpoint(EndpointError), + /// Typed call dispatch failed. Dispatch(DispatchError), + /// Leaf-local data or fault handling failed. Leaf(E), } @@ -124,6 +152,7 @@ impl From for LeafRuntimeError { /// High-level leaf behavior layered on top of validated protocol events. pub trait CallLeaf: ProtocolLeaf { + /// Leaf-specific error surfaced by call, data, or fault handling. type Error; /// Handles hook-associated inbound `Data` after protocol validation. @@ -152,30 +181,37 @@ pub struct LeafRuntime { /// Frames emitted by the runtime after one receive or poll step. #[derive(Debug, Default)] pub struct RuntimeOutcome { + /// Frames emitted while processing the step. pub frames: Vec, + /// Whether the endpoint dropped the incoming packet. pub dropped: bool, } impl LeafRuntime { + /// Builds a runtime from one endpoint and one leaf instance. #[must_use] pub fn new(endpoint: ProtocolEndpoint, leaf: L) -> Self { Self { endpoint, leaf } } + /// Returns the underlying protocol endpoint. #[must_use] pub fn endpoint(&self) -> &ProtocolEndpoint { &self.endpoint } + /// Returns a mutable reference to the underlying endpoint. pub fn endpoint_mut(&mut self) -> &mut ProtocolEndpoint { &mut self.endpoint } + /// Returns the hosted leaf instance. #[must_use] pub fn leaf(&self) -> &L { &self.leaf } + /// Returns a mutable reference to the hosted leaf instance. pub fn leaf_mut(&mut self) -> &mut L { &mut self.leaf } @@ -203,20 +239,23 @@ where &mut self, outcome: crate::protocol::tree::EndpointOutcome, ) -> Result::Error>> { - let mut runtime = RuntimeOutcome { - frames: Vec::new(), - dropped: outcome.dropped, - }; + match outcome { + crate::protocol::tree::EndpointOutcome::Forward { frame, .. } => { + let mut frames = Vec::with_capacity(1); + frames.push(frame); + Ok(RuntimeOutcome { + frames, + dropped: false, + }) + } + crate::protocol::tree::EndpointOutcome::Dropped => Ok(RuntimeOutcome { + frames: Vec::new(), + dropped: true, + }), + crate::protocol::tree::EndpointOutcome::Local(event) => { + let mut runtime = RuntimeOutcome::default(); - if let Some((_route, frame)) = outcome.forward { - runtime.frames.push(frame); - } - - let Some(event) = outcome.event else { - return Ok(runtime); - }; - - match event { + match event { LocalEvent::Call { header, message } => { let incoming = IncomingCall { header, @@ -272,7 +311,9 @@ where } } - Ok(runtime) + Ok(runtime) + } + } } fn emit_outgoing( diff --git a/src/protocol/tree/endpoint/builders.rs b/src/protocol/tree/endpoint/builders.rs index cad7f5d..cb5cca3 100644 --- a/src/protocol/tree/endpoint/builders.rs +++ b/src/protocol/tree/endpoint/builders.rs @@ -111,7 +111,7 @@ impl ProtocolEndpoint { ) -> Self { let registered_child_paths = children .iter() - .filter(|child| child.state == super::core::ConnectionState::Registered) + .filter(|child| child.registered) .map(|child| child.path.clone()) .collect::>(); @@ -180,12 +180,12 @@ impl ProtocolEndpoint { self.hooks .remove_pending(&HookKey::new(hook.return_path.clone(), hook.hook_id)); } - Ok(EndpointOutcome::dropped()) + Ok(EndpointOutcome::Dropped) } - route => Ok(EndpointOutcome::forward( + route => Ok(EndpointOutcome::Forward { route, - encode_packet(&header, &call)?, - )), + frame: encode_packet(&header, &call)?, + }), } } @@ -246,11 +246,11 @@ impl ProtocolEndpoint { match self.decide_route(&header.dst_path) { RouteDecision::Local => self.handle_local_data(header, message), - RouteDecision::Drop => Ok(EndpointOutcome::dropped()), - route => Ok(EndpointOutcome::forward( + RouteDecision::Drop => Ok(EndpointOutcome::Dropped), + route => Ok(EndpointOutcome::Forward { route, - encode_packet(&header, &message)?, - )), + frame: encode_packet(&header, &message)?, + }), } } } diff --git a/src/protocol/tree/endpoint/core.rs b/src/protocol/tree/endpoint/core.rs index 0b4482c..5be4e42 100644 --- a/src/protocol/tree/endpoint/core.rs +++ b/src/protocol/tree/endpoint/core.rs @@ -13,20 +13,13 @@ use crate::protocol::{ use super::super::{CompiledRoutes, HookKey, HookTable, RouteDecision}; -/// Registration state for a direct child endpoint. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ConnectionState { - Unregistered, - Registered, -} - /// Routing metadata for one direct child endpoint. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ChildRoute { /// Absolute path for the child endpoint inside the protocol tree. pub path: Vec, /// Whether this child currently participates in routing decisions. - pub state: ConnectionState, + pub registered: bool, } impl ChildRoute { @@ -34,7 +27,7 @@ impl ChildRoute { pub fn registered(path: Vec) -> Self { Self { path, - state: ConnectionState::Registered, + registered: true, } } } @@ -76,43 +69,14 @@ pub enum LocalEvent { } /// Result of processing a frame or building a locally-sent packet. -#[derive(Debug, Default)] -pub struct EndpointOutcome { +#[derive(Debug)] +pub enum EndpointOutcome { /// Frame to forward, together with the next routing decision. - pub forward: Option<(RouteDecision, FrameBytes)>, + Forward { route: RouteDecision, frame: FrameBytes }, /// Locally-delivered protocol event. - pub event: Option, - /// Whether the packet was intentionally discarded. - pub dropped: bool, -} - -impl EndpointOutcome { - #[must_use] - pub fn forward(route: RouteDecision, frame: FrameBytes) -> Self { - Self { - forward: Some((route, frame)), - event: None, - dropped: false, - } - } - - #[must_use] - pub fn event(event: LocalEvent) -> Self { - Self { - forward: None, - event: Some(event), - dropped: false, - } - } - - #[must_use] - pub fn dropped() -> Self { - Self { - forward: None, - event: None, - dropped: true, - } - } + Local(LocalEvent), + /// Packet intentionally discarded. + Dropped, } /// Error surfaced while validating or encoding protocol frames. diff --git a/src/protocol/tree/endpoint/hooks.rs b/src/protocol/tree/endpoint/hooks.rs index a39d277..892439d 100644 --- a/src/protocol/tree/endpoint/hooks.rs +++ b/src/protocol/tree/endpoint/hooks.rs @@ -16,7 +16,7 @@ impl ProtocolEndpoint { fault: ProtocolFault, ) -> Result { let Some(key) = key else { - return Ok(EndpointOutcome::dropped()); + return Ok(EndpointOutcome::Dropped); }; self.hooks.remove_pending(&key); @@ -32,15 +32,15 @@ impl ProtocolEndpoint { let message = FaultMessage { fault }; match self.decide_route(&key.return_path) { - RouteDecision::Local => Ok(EndpointOutcome::event(LocalEvent::Fault { + RouteDecision::Local => Ok(EndpointOutcome::Local(LocalEvent::Fault { header, message, hook_key: key, })), - route => Ok(EndpointOutcome::forward( + route => Ok(EndpointOutcome::Forward { route, - encode_packet(&header, &message)?, - )), + frame: encode_packet(&header, &message)?, + }), } } @@ -64,12 +64,12 @@ impl ProtocolEndpoint { self.hooks.activate_pending(&pending_key); pending_key } else { - return Ok(EndpointOutcome::dropped()); + return Ok(EndpointOutcome::Dropped); } }; let Some(active) = self.hooks.active(&key) else { - return Ok(EndpointOutcome::dropped()); + return Ok(EndpointOutcome::Dropped); }; if active.peer_path != header.src_path { @@ -81,14 +81,14 @@ impl ProtocolEndpoint { if active.procedure_id != message.procedure_id { // Data frames stay bound to the procedure chosen by the original call. - return Ok(EndpointOutcome::dropped()); + return Ok(EndpointOutcome::Dropped); } if message.end_hook && self.hooks.mark_peer_end(&key) { self.hooks.remove_active(&key); } - Ok(EndpointOutcome::event(LocalEvent::Data { + Ok(EndpointOutcome::Local(LocalEvent::Data { header, message, hook_key: key, @@ -106,7 +106,7 @@ impl ProtocolEndpoint { .resolve_active_key(&self.path, hook_id, &header.src_path) { self.hooks.remove_active(&key); - return Ok(EndpointOutcome::event(LocalEvent::Fault { + return Ok(EndpointOutcome::Local(LocalEvent::Fault { header, message, hook_key: key, @@ -120,14 +120,14 @@ impl ProtocolEndpoint { .is_some_and(|pending| pending.caller_src_path == header.src_path) { self.hooks.remove_pending(&pending_key); - return Ok(EndpointOutcome::event(LocalEvent::Fault { + return Ok(EndpointOutcome::Local(LocalEvent::Fault { header, message, hook_key: pending_key, })); } - Ok(EndpointOutcome::dropped()) + Ok(EndpointOutcome::Dropped) } pub(crate) fn decide_route(&self, dst_path: &[String]) -> RouteDecision { diff --git a/src/protocol/tree/endpoint/introspection.rs b/src/protocol/tree/endpoint/introspection.rs index 486ccea..ff7cb6c 100644 --- a/src/protocol/tree/endpoint/introspection.rs +++ b/src/protocol/tree/endpoint/introspection.rs @@ -18,7 +18,7 @@ impl ProtocolEndpoint { key: Option, ) -> Result { let Some(key) = key else { - return Ok(EndpointOutcome::dropped()); + return Ok(EndpointOutcome::Dropped); }; let response_payload = if let Some(leaf_name) = &header.dst_leaf { @@ -36,7 +36,7 @@ impl ProtocolEndpoint { sub_endpoints: self .children .iter() - .filter(|child| child.state == super::core::ConnectionState::Registered) + .filter(|child| child.registered) .filter_map(|child| child.path.get(self.path.len()).cloned()) .collect(), leaves: self @@ -72,16 +72,16 @@ impl ProtocolEndpoint { match self.decide_route(&key.return_path) { super::super::RouteDecision::Local => { - Ok(EndpointOutcome::event(super::core::LocalEvent::Data { + Ok(EndpointOutcome::Local(super::core::LocalEvent::Data { header: response_header, message: response, hook_key: key, })) } - route => Ok(EndpointOutcome::forward( + route => Ok(EndpointOutcome::Forward { route, - encode_packet(&response_header, &response)?, - )), + frame: encode_packet(&response_header, &response)?, + }), } } } diff --git a/src/protocol/tree/endpoint/mod.rs b/src/protocol/tree/endpoint/mod.rs index e75f143..0e3a03b 100644 --- a/src/protocol/tree/endpoint/mod.rs +++ b/src/protocol/tree/endpoint/mod.rs @@ -11,6 +11,6 @@ mod introspection; mod receive; pub use core::{ - ChildRoute, ConnectionState, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec, - LocalEvent, ProtocolEndpoint, + ChildRoute, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec, LocalEvent, + ProtocolEndpoint, }; diff --git a/src/protocol/tree/endpoint/receive.rs b/src/protocol/tree/endpoint/receive.rs index 3f07153..43c8f58 100644 --- a/src/protocol/tree/endpoint/receive.rs +++ b/src/protocol/tree/endpoint/receive.rs @@ -75,7 +75,7 @@ impl ProtocolEndpoint { return self.emit_fault_if_possible(key, ProtocolFault::INTERNAL_ERROR); } - Ok(EndpointOutcome::event(LocalEvent::Call { header, message })) + Ok(EndpointOutcome::Local(LocalEvent::Call { header, message })) } } @@ -94,7 +94,7 @@ impl Endpoint for ProtocolEndpoint { validate_header(header)?; if !self.valid_source_for_ingress(ingress, &header.src_path) { - return Ok(EndpointOutcome::dropped()); + return Ok(EndpointOutcome::Dropped); } match header.packet_type { @@ -103,17 +103,19 @@ impl Endpoint for ProtocolEndpoint { // itself. Children can return data/faults, but they do not initiate new // calls through this node. if !matches!(ingress, Ingress::Parent | Ingress::Local) { - return Ok(EndpointOutcome::dropped()); + return Ok(EndpointOutcome::Dropped); } match self.decide_route(&header.dst_path) { - RouteDecision::Child(index) => { - Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame)) - } - RouteDecision::Parent => { - Ok(EndpointOutcome::forward(RouteDecision::Parent, frame)) - } - RouteDecision::Drop => Ok(EndpointOutcome::dropped()), + RouteDecision::Child(index) => Ok(EndpointOutcome::Forward { + route: RouteDecision::Child(index), + frame, + }), + RouteDecision::Parent => Ok(EndpointOutcome::Forward { + route: RouteDecision::Parent, + frame, + }), + RouteDecision::Drop => Ok(EndpointOutcome::Dropped), RouteDecision::Local => { let (header, payload) = parsed.into_parts(); let message = deserialize_archived_bytes::( @@ -133,11 +135,15 @@ impl Endpoint for ProtocolEndpoint { >(payload)?; self.handle_local_data(header, message) } - RouteDecision::Child(index) => { - Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame)) - } - RouteDecision::Parent => Ok(EndpointOutcome::forward(RouteDecision::Parent, frame)), - RouteDecision::Drop => Ok(EndpointOutcome::dropped()), + RouteDecision::Child(index) => Ok(EndpointOutcome::Forward { + route: RouteDecision::Child(index), + frame, + }), + RouteDecision::Parent => Ok(EndpointOutcome::Forward { + route: RouteDecision::Parent, + frame, + }), + RouteDecision::Drop => Ok(EndpointOutcome::Dropped), }, PacketType::Fault => match self.decide_route(&header.dst_path) { RouteDecision::Local => { @@ -148,11 +154,15 @@ impl Endpoint for ProtocolEndpoint { >(payload)?; self.handle_local_fault(header, message) } - RouteDecision::Child(index) => { - Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame)) - } - RouteDecision::Parent => Ok(EndpointOutcome::forward(RouteDecision::Parent, frame)), - RouteDecision::Drop => Ok(EndpointOutcome::dropped()), + RouteDecision::Child(index) => Ok(EndpointOutcome::Forward { + route: RouteDecision::Child(index), + frame, + }), + RouteDecision::Parent => Ok(EndpointOutcome::Forward { + route: RouteDecision::Parent, + frame, + }), + RouteDecision::Drop => Ok(EndpointOutcome::Dropped), }, } } diff --git a/src/protocol/tree/mod.rs b/src/protocol/tree/mod.rs index 9b9c93e..10d1bd7 100644 --- a/src/protocol/tree/mod.rs +++ b/src/protocol/tree/mod.rs @@ -18,8 +18,8 @@ pub use call::{ encode_call_reply, }; pub use endpoint::{ - ChildRoute, ConnectionState, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec, - LocalEvent, ProtocolEndpoint, + ChildRoute, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec, LocalEvent, + ProtocolEndpoint, }; pub use hook::{ActiveHook, HookConflict, HookKey, HookTable, PendingHook}; pub use leaf::{CallProcedures, ProtocolLeaf, derive_leaf_name}; diff --git a/src/protocol/tree/procedure.rs b/src/protocol/tree/procedure.rs index f59d018..86a9e0d 100644 --- a/src/protocol/tree/procedure.rs +++ b/src/protocol/tree/procedure.rs @@ -64,8 +64,8 @@ pub trait ProcedureStore

{ /// /// # Example /// ```rust -/// use alloc::collections::BTreeMap; -/// use alloc::string::String; +/// use std::collections::BTreeMap; +/// use std::string::String; /// use unshell::{Leaf, Procedure}; /// use unshell::protocol::tree::{Call, HookKey, Procedure, ProcedureEffect, ProcedureStore}; /// @@ -110,7 +110,9 @@ pub trait Procedure: StatefulProcedureMetadata + Sized where L: ProtocolLeaf, { + /// Leaf-specific error surfaced while opening or advancing the session. type Error; + /// Typed input payload decoded from the opening call. type Input; /// Creates one session from the opening `Call`. @@ -159,6 +161,7 @@ pub struct ProcedureEffect { } impl ProcedureEffect { + /// Builds an effect that keeps the session alive after emitting `outgoing`. #[must_use] pub fn outgoing(outgoing: Vec) -> Self { Self { @@ -167,6 +170,7 @@ impl ProcedureEffect { } } + /// Builds an effect that closes the session after emitting `outgoing`. #[must_use] pub fn close(outgoing: Vec) -> Self { Self { @@ -179,7 +183,9 @@ impl ProcedureEffect { /// Error surfaced by the procedure runtime. #[derive(Debug)] pub enum ProcedureRuntimeError { + /// Protocol endpoint routing or framing failed. Endpoint(EndpointError), + /// The opening call failed to decode or open cleanly. Decode(super::DispatchError), } @@ -206,7 +212,9 @@ impl From for ProcedureRuntimeError { /// Frames emitted while advancing one stateful procedure runtime. #[derive(Debug, Default)] pub struct ProcedureRuntimeOutcome { + /// Frames emitted while processing the current step. pub frames: Vec, + /// Whether the endpoint dropped the incoming packet. pub dropped: bool, } @@ -223,6 +231,7 @@ pub struct ProcedureRuntime { } impl ProcedureRuntime { + /// Builds a procedure runtime from one endpoint and one leaf instance. #[must_use] pub fn new(endpoint: ProtocolEndpoint, leaf: L) -> Self { Self { @@ -232,20 +241,24 @@ impl ProcedureRuntime { } } + /// Returns the underlying protocol endpoint. #[must_use] pub fn endpoint(&self) -> &ProtocolEndpoint { &self.endpoint } + /// Returns a mutable reference to the protocol endpoint. pub fn endpoint_mut(&mut self) -> &mut ProtocolEndpoint { &mut self.endpoint } + /// Returns the hosted leaf instance. #[must_use] pub fn leaf(&self) -> &L { &self.leaf } + /// Returns a mutable reference to the hosted leaf instance. pub fn leaf_mut(&mut self) -> &mut L { &mut self.leaf } @@ -327,20 +340,23 @@ where &mut self, outcome: super::EndpointOutcome, ) -> Result> { - let mut runtime = ProcedureRuntimeOutcome { - frames: Vec::new(), - dropped: outcome.dropped, - }; + match outcome { + super::EndpointOutcome::Forward { frame, .. } => { + let mut frames = Vec::with_capacity(1); + frames.push(frame); + Ok(ProcedureRuntimeOutcome { + frames, + dropped: false, + }) + } + super::EndpointOutcome::Dropped => Ok(ProcedureRuntimeOutcome { + frames: Vec::new(), + dropped: true, + }), + super::EndpointOutcome::Local(event) => { + let mut runtime = ProcedureRuntimeOutcome::default(); - if let Some((_route, frame)) = outcome.forward { - runtime.frames.push(frame); - } - - let Some(event) = outcome.event else { - return Ok(runtime); - }; - - match event { + match event { LocalEvent::Call { header, message } => { if message.procedure_id != P::procedure_id() { runtime @@ -446,7 +462,9 @@ where } } - Ok(runtime) + Ok(runtime) + } + } } fn open_session(&mut self, call: IncomingCall) -> Result> { @@ -523,6 +541,8 @@ where hook_key: &HookKey, mut effect: ProcedureEffect, ) -> ProcedureEffect { + // Once a session emits `end_hook`, later packets would violate the protocol, + // so the runtime keeps only the prefix through that terminal packet. if let Some(index) = effect.outgoing.iter().position(|packet| packet.end_hook) { effect.outgoing.truncate(index + 1); } @@ -535,6 +555,9 @@ where && !effect.outgoing.iter().any(|packet| packet.end_hook) && !local_end_already_sent { + // Closing a session without an explicit terminal packet would leave the + // protocol hook half-open, so emit an empty terminal frame on behalf of + // the procedure unless the local side already ended earlier. effect.outgoing.push(OutgoingData { dst_path: hook_key.return_path.clone(), hook_id: hook_key.hook_id, @@ -545,4 +568,5 @@ where } effect } + } diff --git a/treetest/src/sim/build.rs b/treetest/src/sim/build.rs index 3888e64..5e78882 100644 --- a/treetest/src/sim/build.rs +++ b/treetest/src/sim/build.rs @@ -6,7 +6,7 @@ use std::collections::{BTreeMap, VecDeque}; use crossbeam_channel::unbounded; -use unshell::protocol::tree::{ChildRoute, ConnectionState, ProtocolEndpoint}; +use unshell::protocol::tree::{ChildRoute, ProtocolEndpoint}; use crate::model::{DemoTree, NodeId, ScenarioDefinition, Selection}; @@ -42,7 +42,7 @@ impl Simulation { .iter() .map(|child_id| ChildRoute { path: tree.node(*child_id).path.clone(), - state: ConnectionState::Registered, + registered: true, }) .collect::>(); diff --git a/treetest/src/sim/runtime/dispatch.rs b/treetest/src/sim/runtime/dispatch.rs index d3bb2e5..d40e114 100644 --- a/treetest/src/sim/runtime/dispatch.rs +++ b/treetest/src/sim/runtime/dispatch.rs @@ -87,12 +87,11 @@ impl Simulation { node_id: NodeId, outcome: unshell::protocol::tree::EndpointOutcome, ) -> Result<(), SimError> { - if outcome.dropped { - self.record_trace(node_id, "packet dropped".to_owned()); - } - - if let Some((route, frame)) = outcome.forward { - match route { + match outcome { + unshell::protocol::tree::EndpointOutcome::Dropped => { + self.record_trace(node_id, "packet dropped".to_owned()); + } + unshell::protocol::tree::EndpointOutcome::Forward { route, frame } => match route { RouteDecision::Child(index) => { let child_id = self.nodes[node_id.0] .children @@ -147,13 +146,12 @@ impl Simulation { RouteDecision::Drop => { self.record_trace(node_id, "route decision dropped frame".to_owned()); } + }, + unshell::protocol::tree::EndpointOutcome::Local(event) => { + self.handle_local_event(node_id, event)?; } } - if let Some(event) = outcome.event { - self.handle_local_event(node_id, event)?; - } - Ok(()) }