From 17be0f9daaeb469f06ad21a372fecffcd0802d5c Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Sun, 26 Apr 2026 01:53:37 -0600 Subject: [PATCH] Improve protocol documentation and runtime structure --- CLAUDE.md | 2 +- Cargo.toml | 4 - examples/protocol/bench/bench.rs | 2 +- src/leaf/remote_shell/session.rs | 84 +++-- src/protocol/codec.rs | 47 ++- src/protocol/introspection.rs | 13 + src/protocol/mod.rs | 13 +- src/protocol/tests/procedure.rs | 3 +- src/protocol/tree/call.rs | 163 ++++++---- src/protocol/tree/endpoint/builders.rs | 41 ++- src/protocol/tree/endpoint/core.rs | 22 +- src/protocol/tree/endpoint/hooks.rs | 9 + src/protocol/tree/endpoint/introspection.rs | 46 ++- src/protocol/tree/endpoint/receive.rs | 135 ++++---- src/protocol/tree/hook.rs | 10 + src/protocol/tree/leaf.rs | 28 ++ src/protocol/tree/mod.rs | 2 + src/protocol/tree/procedure.rs | 333 +++++++++++--------- src/protocol/tree/routing.rs | 23 +- src/protocol/types.rs | 32 +- src/protocol/validation.rs | 17 +- 21 files changed, 676 insertions(+), 353 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index f56aaee..38a0003 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -23,7 +23,7 @@ Key routing rules: - Use exact, current dependency versions. Never guess package versions. Check the latest available release before adding or updating any dependency. If a version conflict appears, first try to resolve it while keeping dependencies on their latest compatible releases. Only fall back to an older version when the conflict is truly unavoidable, and explain why. - Leave the project warning-free. Fix all compiler, linter, and tooling warnings before finishing. If a warning cannot be eliminated cleanly, silence it in the narrowest possible scope and add a short rationale. -- Document code thoroughly. Add rustdoc, module docs, examples, and inline comments where they improve comprehension. Public APIs should be documented. Non-obvious internal logic should also be documented. Comments should explain intent, invariants, and behavior, not restate syntax. +- Document code thoroughly. Add rustdoc, module docs, examples, and inline comments where they improve comprehension. Public APIs should be documented with clear meaning and examples. Non-obvious internal logic should also be documented. Comments should explain intent, invariants, and behavior, not restate syntax. - Maintain clear architecture. Do not allow files or functions to grow without bound. When code becomes too large or mixes concerns, split it into smaller modules, helper files, or folders with clear names. Prefer structure that improves readability, navigation, and maintenance. - Research library behavior when needed. Do not assume library APIs, feature flags, version compatibility, or known issues. Verify them, including online research when appropriate, before making decisions. - Commit at every real milestone. Create a local git commit each time a meaningful milestone is reached. Commit messages must be accurate, specific, and reflect the actual change. diff --git a/Cargo.toml b/Cargo.toml index 0723760..53d3a36 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,10 +56,6 @@ portable-pty = { workspace = true } name = "leaf_derive" path = "examples/protocol/leaf_derive.rs" -[[example]] -name = "protocol_leaf_derive" -path = "examples/protocol_leaf_derive.rs" - [[example]] name = "remote_shell_endpoint" path = "examples/protocol/remote_shell_endpoint.rs" diff --git a/examples/protocol/bench/bench.rs b/examples/protocol/bench/bench.rs index 122fc7c..b99071d 100644 --- a/examples/protocol/bench/bench.rs +++ b/examples/protocol/bench/bench.rs @@ -9,7 +9,7 @@ use unshell::protocol::tree::{ use unshell::protocol::{CallMessage, PacketHeader, PacketType, decode_frame, encode_packet}; const SAMPLES: usize = 500; -const ITERS: usize = 1_000; +const ITERS: usize = 10_000; const TOOL_ITERS: usize = 10_000; fn main() { diff --git a/src/leaf/remote_shell/session.rs b/src/leaf/remote_shell/session.rs index fe5e0f6..35054af 100644 --- a/src/leaf/remote_shell/session.rs +++ b/src/leaf/remote_shell/session.rs @@ -1,3 +1,14 @@ +//! Per-hook remote shell session lifecycle. +//! +//! A session opens one PTY-backed shell process and then translates protocol hook +//! traffic into stdin writes and stdout/stderr chunks. The close model is +//! intentionally two-sided: +//! - peer end: the caller sets `end_hook`, so no more stdin is accepted +//! - local end: the shell process exits and the PTY reader drains completely +//! +//! Only after both conditions are observed does the session emit its final empty +//! `end_hook` packet back through the protocol runtime. + use std::io::{self, Read, Write}; use std::process::Command; use std::sync::mpsc::{self, Receiver, SyncSender, TryRecvError}; @@ -17,18 +28,29 @@ use super::errors::ShellLeafError; #[derive(Procedure)] #[procedure(leaf = RemoteShellLeaf, name = "open")] pub struct ProcedureOpen { + /// Spawned PTY child process. pub(super) child: Box, + /// Process-group leader used for Unix hangup and kill signaling. process_group_leader: Option, + /// Buffered stdin bridge into the shell process. stdin_tx: Option>>, + /// Buffered output stream read from the PTY. output_rx: Receiver, + /// Hook return path for packets emitted by this session. return_path: Vec, + /// Hook identifier allocated by the caller. hook_id: u64, + /// Procedure id bound to this shell hook. procedure_id: String, + /// Whether the PTY reader has closed and drained. output_closed: bool, + /// Observed child exit status, once known. pub(super) exit_status: Option, + /// Whether this session already emitted its terminal local packet. pub(super) local_end_sent: bool, } +/// One event forwarded from the PTY reader thread. enum OutputEvent { Chunk(Vec), ReaderClosed, @@ -42,6 +64,7 @@ impl ProcedureOpen { hook_id: u64, procedure_id: String, ) -> Result { + let command = build_shell_command(); let pty_system = native_pty_system(); let pair = pty_system .openpty(PtySize { @@ -52,16 +75,6 @@ impl ProcedureOpen { }) .map_err(|error| io::Error::other(error.to_string()))?; - let command = if cfg!(windows) { - let mut command = CommandBuilder::new("cmd.exe"); - command.arg("/Q"); - command - } else { - let mut command = CommandBuilder::new("/bin/sh"); - command.arg("-i"); - command - }; - let child = pair .slave .spawn_command(command) @@ -76,10 +89,7 @@ impl ProcedureOpen { .try_clone_reader() .map_err(|error| io::Error::other(error.to_string()))?; - let (stdin_tx, stdin_rx) = mpsc::sync_channel(64); - let (tx, rx) = mpsc::sync_channel(64); - spawn_pipe_writer(stdin, stdin_rx); - spawn_pipe_reader(stdout, tx); + let (stdin_tx, rx) = spawn_io_threads(stdin, stdout); Ok(Self { child, @@ -95,6 +105,7 @@ impl ProcedureOpen { }) } + /// Builds one outgoing hook packet owned by this session. pub(super) fn packet(&self, data: Vec, end_hook: bool) -> OutgoingData { OutgoingData { dst_path: self.return_path.clone(), @@ -105,6 +116,7 @@ impl ProcedureOpen { } } + /// Forces the underlying shell process to stop and records its exit status. pub(super) fn terminate(&mut self) -> Result<(), ShellLeafError> { self.stdin_tx.take(); match self.child.try_wait()? { @@ -113,7 +125,7 @@ impl ProcedureOpen { Ok(()) } None => { - self.kill_process_group(); + self.signal_process_group("-KILL"); self.child .kill() .map_err(|error| io::Error::other(error.to_string()))?; @@ -127,6 +139,7 @@ impl ProcedureOpen { } } + /// Drains any currently buffered PTY output into protocol packets. pub(super) fn drain_output(&mut self, outgoing: &mut Vec) { loop { match self.output_rx.try_recv() { @@ -164,7 +177,7 @@ impl ProcedureOpen { // we also send SIGHUP so an interactive shell treats this like terminal // hangup instead of waiting forever on the still-open PTY master. self.stdin_tx.take(); - self.signal_peer_end(); + self.signal_process_group("-HUP"); Ok(ProcedureEffect::default()) } @@ -185,7 +198,7 @@ impl ProcedureOpen { } if self.exit_status.is_some() && !self.output_closed { - self.kill_process_group(); + self.signal_process_group("-KILL"); } if self.exit_status.is_some() && self.output_closed { @@ -197,21 +210,11 @@ impl ProcedureOpen { Ok(ProcedureEffect::outgoing(outgoing)) } - fn kill_process_group(&self) { + fn signal_process_group(&self, signal: &str) { #[cfg(unix)] if let Some(process_group_leader) = self.process_group_leader { let _ = Command::new("kill") - .arg("-KILL") - .arg(format!("-{}", process_group_leader)) - .status(); - } - } - - fn signal_peer_end(&self) { - #[cfg(unix)] - if let Some(process_group_leader) = self.process_group_leader { - let _ = Command::new("kill") - .arg("-HUP") + .arg(signal) .arg(format!("-{}", process_group_leader)) .status(); } @@ -237,6 +240,29 @@ fn spawn_pipe_writer(mut stdin: Box, rx: Receiver>) { }); } +fn build_shell_command() -> CommandBuilder { + if cfg!(windows) { + let mut command = CommandBuilder::new("cmd.exe"); + command.arg("/Q"); + command + } else { + let mut command = CommandBuilder::new("/bin/sh"); + command.arg("-i"); + command + } +} + +fn spawn_io_threads( + stdin: Box, + stdout: Box, +) -> (SyncSender>, Receiver) { + let (stdin_tx, stdin_rx) = mpsc::sync_channel(64); + let (tx, rx) = mpsc::sync_channel(64); + spawn_pipe_writer(stdin, stdin_rx); + spawn_pipe_reader(stdout, tx); + (stdin_tx, rx) +} + fn spawn_pipe_reader(mut reader: R, tx: mpsc::SyncSender) where R: Read + Send + 'static, diff --git a/src/protocol/codec.rs b/src/protocol/codec.rs index 3747367..b5f7f28 100644 --- a/src/protocol/codec.rs +++ b/src/protocol/codec.rs @@ -18,10 +18,15 @@ pub type FrameBytes = AlignedVec; /// Framing or archive failure. #[derive(Debug)] pub enum FrameError { + /// The byte slice ended before a full frame could be decoded. Truncated, + /// The archived header bytes failed validation or deserialization. InvalidHeader(Error), + /// The archived payload bytes failed validation or deserialization. InvalidPayload(Error), + /// Serializing one header or payload section failed. Serialize(Error), + /// One archived section grew beyond the `u32` length prefix supported by the format. LengthOverflow, } @@ -40,6 +45,9 @@ impl fmt::Display for FrameError { impl core::error::Error for FrameError {} /// Parsed frame with one owned header and a borrowed payload section. +/// +/// The frame decoder eagerly materializes the routing header into owned Rust values, but keeps +/// the payload section borrowed so callers can choose which concrete payload type to decode. pub struct ParsedFrame<'a> { header: PacketHeader, payload_bytes: &'a [u8], @@ -47,39 +55,60 @@ pub struct ParsedFrame<'a> { impl<'a> ParsedFrame<'a> { #[must_use] + /// Returns the decoded packet header. pub fn header(&self) -> &PacketHeader { &self.header } #[must_use] + /// Returns the packet class from the decoded header. pub fn packet_type(&self) -> PacketType { self.header.packet_type } #[must_use] + /// Returns the borrowed payload section bytes. pub fn payload_bytes(&self) -> &'a [u8] { self.payload_bytes } #[must_use] + /// Splits the parsed frame into its owned header and borrowed payload bytes. pub fn into_parts(self) -> (PacketHeader, &'a [u8]) { (self.header, self.payload_bytes) } + /// Deserializes the payload section as a [`CallMessage`]. pub fn deserialize_call(&self) -> Result { - deserialize_archived_bytes::(self.payload_bytes) + self.deserialize_payload::() } + /// Deserializes the payload section as a [`DataMessage`]. pub fn deserialize_data(&self) -> Result { - deserialize_archived_bytes::(self.payload_bytes) + self.deserialize_payload::() } + /// Deserializes the payload section as a [`FaultMessage`]. pub fn deserialize_fault(&self) -> Result { - deserialize_archived_bytes::(self.payload_bytes) + self.deserialize_payload::() + } + + fn deserialize_payload(&self) -> Result + where + A: rkyv::Portable + + for<'b> rkyv::bytecheck::CheckBytes>, + T: rkyv::Archive, + A: rkyv::Deserialize>, + { + deserialize_archived_bytes::(self.payload_bytes) } } /// Encodes a packet header and payload using the aligned two-section frame format. +/// +/// The frame starts with two big-endian `u32` lengths, followed by an aligned archived header +/// section and an aligned archived payload section. Both sections use [`SECTION_ALIGN`] so the +/// archived bytes can usually be accessed without a fallback copy on decode. pub fn encode_packet

(header: &PacketHeader, payload: &P) -> Result where P: for<'a> Serialize< @@ -107,6 +136,9 @@ where } /// Decodes one aligned two-section frame. +/// +/// This rejects trailing bytes instead of silently ignoring them, so callers can treat one byte +/// slice as exactly one protocol frame. pub fn decode_frame(bytes: &[u8]) -> Result, FrameError> { let (header_bytes, payload_bytes) = split_frame_sections(bytes)?; let header = deserialize_section::( @@ -121,6 +153,10 @@ pub fn decode_frame(bytes: &[u8]) -> Result, FrameError> { } /// Deserializes one archived byte section. +/// +/// Payload bytes normally come from [`decode_frame`] or one of [`ParsedFrame`]`'s` +/// `deserialize_*` helpers. This function remains public for callers that archive nested +/// application payloads inside protocol `data` fields. pub fn deserialize_archived_bytes(bytes: &[u8]) -> Result where A: rkyv::Portable @@ -158,6 +194,8 @@ fn split_frame_sections(bytes: &[u8]) -> Result<(&[u8], &[u8]), FrameError> { let payload_start = align_up(header_end, SECTION_ALIGN); let payload_end = payload_start + payload_len; if payload_end != bytes.len() { + // Framed packets do not permit trailing bytes. Treating the slice as exactly one frame + // keeps stream framing bugs visible instead of silently accepting concatenated payloads. return Err(FrameError::Truncated); } @@ -191,6 +229,9 @@ where return deserialize::(archived).map_err(invalid); } + // Archived types may require stronger alignment than a borrowed byte slice can guarantee. + // Copy into an aligned buffer so callers can still decode valid frames from arbitrary input + // sources instead of rejecting them purely for allocation layout reasons. let mut aligned: FrameBytes = FrameBytes::with_capacity(bytes.len()); aligned.extend_from_slice(bytes); let archived = access::(&aligned).map_err(invalid)?; diff --git a/src/protocol/introspection.rs b/src/protocol/introspection.rs index 12af269..d61363a 100644 --- a/src/protocol/introspection.rs +++ b/src/protocol/introspection.rs @@ -4,25 +4,38 @@ use alloc::{string::String, vec::Vec}; use rkyv::{Archive, Deserialize, Serialize}; /// Reserved procedure id for protocol introspection. +/// +/// The protocol uses the empty string here so discovery traffic stays outside the normal +/// application procedure namespace. [`crate::protocol::validate_procedure_id`] reserves that +/// value exclusively for introspection. pub const INTROSPECTION_PROCEDURE_ID: &str = ""; /// Endpoint-wide introspection payload. #[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct EndpointIntrospection { + /// Direct child endpoint segment names hosted immediately below this endpoint. pub sub_endpoints: Vec, + /// Leaf summaries hosted directly at this endpoint. pub leaves: Vec, } /// Shared per-leaf discovery record. #[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct LeafIntrospectionSummary { + /// Canonical dotted leaf identifier. pub leaf_name: String, + /// Exhaustive canonical procedure ids currently exposed by the leaf. pub procedures: Vec, } /// Leaf-specific introspection payload. +/// +/// This duplicates [`LeafIntrospectionSummary`] intentionally because the leaf-only response is +/// a distinct wire payload from the endpoint-wide discovery response. #[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct LeafIntrospection { + /// Canonical dotted leaf identifier. pub leaf_name: String, + /// Exhaustive canonical procedure ids currently exposed by the leaf. pub procedures: Vec, } diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 964a2ba..12a7ed9 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -1,4 +1,15 @@ -//! Canonical UnShell protocol modules. +//! Canonical UnShell protocol surface. +//! +//! This module is the stable facade for wire-level protocol types, framing, and +//! stateless validation helpers. Callers normally: +//! - build one [`PacketHeader`] plus payload type from this module, +//! - encode it with [`encode_packet`], +//! - decode inbound bytes with [`decode_frame`], and +//! - validate message/header shape with [`validate_header`], [`validate_call`], and +//! [`validate_procedure_id`]. +//! +//! The concrete wire structs live in the private `types` module and are re-exported here so the +//! public API stays flat while internal archived-type details remain hidden. pub mod codec; pub mod introspection; diff --git a/src/protocol/tests/procedure.rs b/src/protocol/tests/procedure.rs index 0d12f42..2ce3372 100644 --- a/src/protocol/tests/procedure.rs +++ b/src/protocol/tests/procedure.rs @@ -244,7 +244,8 @@ fn procedure_runtime_keeps_session_after_local_end_until_explicit_close() { ) .expect("local end trigger should encode"); let EndpointOutcome::Forward { - frame: local_end_frame, .. + frame: local_end_frame, + .. } = local_end else { panic!("controller should forward local end trigger"); diff --git a/src/protocol/tree/call.rs b/src/protocol/tree/call.rs index 1e8704e..31504ac 100644 --- a/src/protocol/tree/call.rs +++ b/src/protocol/tree/call.rs @@ -221,6 +221,7 @@ impl LeafRuntime where L: CallLeaf + super::CallProcedures::Error>, { + /// Delivers one inbound frame into the stateful leaf runtime. pub fn receive( &mut self, ingress: &Ingress, @@ -230,6 +231,7 @@ where self.process_endpoint_outcome(outcome) } + /// Polls the leaf for locally-generated hook traffic and routes any emitted frames. pub fn poll(&mut self) -> Result::Error>> { let outgoing = self.leaf.poll().map_err(LeafRuntimeError::Leaf)?; self.emit_outgoing(outgoing) @@ -248,80 +250,105 @@ where frames: Vec::new(), dropped: true, }), - crate::protocol::tree::EndpointOutcome::Local(event) => { - let mut runtime = RuntimeOutcome::default(); + crate::protocol::tree::EndpointOutcome::Local(event) => self.process_local_event(event), + } + } - match event { - LocalEvent::Call { header, message } => { - let CallMessage { - procedure_id, - data, - response_hook, - } = message; - let fault_hook = response_hook.as_ref(); - let incoming = IncomingCall { - header, - message: CallMessage { - procedure_id: procedure_id.clone(), - data, - response_hook: response_hook.clone(), - }, - }; - match self.leaf.dispatch_call(incoming) { - Ok(CallReply::Reply(bytes)) => { - if let Some(hook) = response_hook { - runtime.frames.extend(self.send_reply_data( - hook, - procedure_id, - bytes, - true, - )?); - } - } - Ok(CallReply::NoReply) => {} - Err(error) => { - runtime - .frames - .extend(self.emit_internal_fault_if_possible(fault_hook)?); - return Err(LeafRuntimeError::Dispatch(error)); - } - } - } - LocalEvent::Data { - header, - message, - hook_key, - } => { - let outgoing = self - .leaf - .on_data(IncomingData { - header, - message, - hook_key, - }) - .map_err(LeafRuntimeError::Leaf)?; - runtime.frames.extend(self.emit_outgoing(outgoing)?.frames); - } - LocalEvent::Fault { - header, - message, - hook_key, - } => { - self.leaf - .on_fault(IncomingFault { - header, - fault: message, - hook_key, - }) - .map_err(LeafRuntimeError::Leaf)?; - } - } + fn process_local_event( + &mut self, + event: LocalEvent, + ) -> Result::Error>> { + match event { + LocalEvent::Call { header, message } => self.process_local_call(header, message), + LocalEvent::Data { + header, + message, + hook_key, + } => self.process_local_data(header, message, hook_key), + LocalEvent::Fault { + header, + message, + hook_key, + } => self.process_local_fault(header, message, hook_key), + } + } - Ok(runtime) + fn process_local_call( + &mut self, + header: PacketHeader, + message: CallMessage, + ) -> Result::Error>> { + let CallMessage { + procedure_id, + data, + response_hook, + } = message; + let fault_hook = response_hook.as_ref(); + let incoming = IncomingCall { + header, + // Split the payload apart so the reply path can reuse the owned procedure id and + // response hook without re-decoding the incoming bytes. + message: CallMessage { + procedure_id: procedure_id.clone(), + data, + response_hook: response_hook.clone(), + }, + }; + + match self.leaf.dispatch_call(incoming) { + Ok(CallReply::Reply(bytes)) => { + let frames = if let Some(hook) = response_hook { + self.send_reply_data(hook, procedure_id, bytes, true)? + } else { + Vec::new() + }; + Ok(RuntimeOutcome { + frames, + dropped: false, + }) + } + Ok(CallReply::NoReply) => Ok(RuntimeOutcome::default()), + Err(error) => { + let frames = self.emit_internal_fault_if_possible(fault_hook)?; + let _ = frames; + Err(LeafRuntimeError::Dispatch(error)) } } } + fn process_local_data( + &mut self, + header: PacketHeader, + message: DataMessage, + hook_key: HookKey, + ) -> Result::Error>> { + let outgoing = self + .leaf + .on_data(IncomingData { + header, + message, + hook_key, + }) + .map_err(LeafRuntimeError::Leaf)?; + self.emit_outgoing(outgoing) + } + + fn process_local_fault( + &mut self, + header: PacketHeader, + message: crate::protocol::FaultMessage, + hook_key: HookKey, + ) -> Result::Error>> { + self.leaf + .on_fault(IncomingFault { + header, + fault: message, + hook_key, + }) + .map_err(LeafRuntimeError::Leaf)?; + Ok(RuntimeOutcome::default()) + } + fn emit_outgoing( &mut self, outgoing: Vec, diff --git a/src/protocol/tree/endpoint/builders.rs b/src/protocol/tree/endpoint/builders.rs index 030da32..85bdfb3 100644 --- a/src/protocol/tree/endpoint/builders.rs +++ b/src/protocol/tree/endpoint/builders.rs @@ -104,6 +104,9 @@ impl ProtocolEndpoint { #[must_use] /// Creates an endpoint with compiled routing tables for its current topology. + /// + /// `parent_path` is currently used only as a presence flag. The endpoint stores its own + /// absolute `path`, and routing only needs to know whether an upward route exists. pub fn new( path: Vec, parent_path: Option>, @@ -177,10 +180,7 @@ impl ProtocolEndpoint { match self.decide_route(&header.dst_path) { RouteDecision::Local => self.handle_local_call(header, call), RouteDecision::Drop => { - if let Some(hook) = &call.response_hook { - self.hooks - .remove_pending(&HookKey::new(hook.return_path.clone(), hook.hook_id)); - } + self.rollback_pending_call_hook(&call); Ok(EndpointOutcome::Dropped) } route => Ok(EndpointOutcome::Forward { @@ -232,17 +232,7 @@ impl ProtocolEndpoint { self.prepare_data(dst_path, hook_id, procedure_id, data, end_hook)?; if end_hook { - // Locally-originated streams may not have been resolved against a peer yet, - // so fall back to the endpoint's own hook key shape when closing them. - let local_hook_key = self - .hooks - .resolve_active_key(&local_end_dst_path, hook_id, &self.path) - .unwrap_or_else(|| host_key.clone()); - if self.hooks.pending(&host_key).is_some() { - self.hooks.mark_pending_local_end(&host_key); - } else if self.hooks.mark_local_end(&local_hook_key) { - self.hooks.remove_active(&local_hook_key); - } + self.mark_local_stream_end(&local_end_dst_path, hook_id, &host_key); } match self.decide_route(&header.dst_path) { @@ -254,4 +244,25 @@ impl ProtocolEndpoint { }), } } + + fn rollback_pending_call_hook(&mut self, call: &CallMessage) { + if let Some(hook) = &call.response_hook { + self.hooks + .remove_pending(&HookKey::new(hook.return_path.clone(), hook.hook_id)); + } + } + + fn mark_local_stream_end(&mut self, dst_path: &[String], hook_id: u64, host_key: &HookKey) { + // Locally-originated streams may not have been resolved against a peer yet, so fall + // back to the endpoint's own hook key shape when closing them. + let local_hook_key = self + .hooks + .resolve_active_key(dst_path, hook_id, &self.path) + .unwrap_or_else(|| host_key.clone()); + if self.hooks.pending(host_key).is_some() { + self.hooks.mark_pending_local_end(host_key); + } else if self.hooks.mark_local_end(&local_hook_key) { + self.hooks.remove_active(&local_hook_key); + } + } } diff --git a/src/protocol/tree/endpoint/core.rs b/src/protocol/tree/endpoint/core.rs index 5be4e42..3f1bee0 100644 --- a/src/protocol/tree/endpoint/core.rs +++ b/src/protocol/tree/endpoint/core.rs @@ -24,6 +24,7 @@ pub struct ChildRoute { impl ChildRoute { #[must_use] + /// Builds one child route that is immediately eligible for routing decisions. pub fn registered(path: Vec) -> Self { Self { path, @@ -44,26 +45,40 @@ pub struct LeafSpec { /// Where an inbound frame entered this endpoint. #[derive(Debug, Clone, PartialEq, Eq)] pub enum Ingress { + /// The frame arrived from the parent side of the tree. Parent, + /// The frame arrived from one direct child, identified by that child's absolute path. Child(Vec), + /// The frame originated locally at this endpoint. Local, } /// Event produced when the endpoint handles a packet locally. #[derive(Debug, Clone, PartialEq, Eq)] pub enum LocalEvent { + /// One opening `Call` packet validated and delivered to local code. Call { + /// Validated protocol header for the packet. header: PacketHeader, + /// Deserialized call payload. message: CallMessage, }, + /// One hook-associated `Data` packet validated and delivered locally. Data { + /// Validated protocol header for the packet. header: PacketHeader, + /// Deserialized data payload. message: DataMessage, + /// Canonical host-scoped hook key resolved for this hook stream. hook_key: HookKey, }, + /// One hook-associated `Fault` packet validated and delivered locally. Fault { + /// Validated protocol header for the packet. header: PacketHeader, + /// Deserialized fault payload. message: FaultMessage, + /// Canonical host-scoped hook key resolved for this hook stream. hook_key: HookKey, }, } @@ -72,7 +87,10 @@ pub enum LocalEvent { #[derive(Debug)] pub enum EndpointOutcome { /// Frame to forward, together with the next routing decision. - Forward { route: RouteDecision, frame: FrameBytes }, + Forward { + route: RouteDecision, + frame: FrameBytes, + }, /// Locally-delivered protocol event. Local(LocalEvent), /// Packet intentionally discarded. @@ -82,7 +100,9 @@ pub enum EndpointOutcome { /// Error surfaced while validating or encoding protocol frames. #[derive(Debug)] pub enum EndpointError { + /// Framing, archive decode, or archive encode failed. Frame(FrameError), + /// One protocol invariant failed validation. Validation(ValidationError), } diff --git a/src/protocol/tree/endpoint/hooks.rs b/src/protocol/tree/endpoint/hooks.rs index 892439d..b226428 100644 --- a/src/protocol/tree/endpoint/hooks.rs +++ b/src/protocol/tree/endpoint/hooks.rs @@ -81,6 +81,9 @@ impl ProtocolEndpoint { if active.procedure_id != message.procedure_id { // Data frames stay bound to the procedure chosen by the original call. + // A procedure mismatch is dropped rather than faulted because the wrong peer may be + // replaying stale traffic, and converting that into a terminal hook fault would let a + // stray packet tear down an otherwise valid stream. return Ok(EndpointOutcome::Dropped); } @@ -134,6 +137,12 @@ impl ProtocolEndpoint { self.routing.route(dst_path) } + /// Returns whether one `src_path` is topologically valid for the ingress side that delivered + /// the frame. + /// + /// Parent ingress may carry packets from ancestors, siblings, or the endpoint itself, but not + /// from descendants pretending to be upstream. Child ingress may only carry packets from that + /// child subtree, and local ingress must exactly match the endpoint path. pub(crate) fn valid_source_for_ingress(&self, ingress: &Ingress, src_path: &[String]) -> bool { match ingress { Ingress::Parent => { diff --git a/src/protocol/tree/endpoint/introspection.rs b/src/protocol/tree/endpoint/introspection.rs index ff7cb6c..1554bcf 100644 --- a/src/protocol/tree/endpoint/introspection.rs +++ b/src/protocol/tree/endpoint/introspection.rs @@ -1,6 +1,6 @@ //! Introspection response generation. -use alloc::string::String; +use alloc::{string::String, vec::Vec}; use rkyv::{rancor::Error as RkyvError, to_bytes}; use crate::protocol::{ @@ -25,20 +25,13 @@ impl ProtocolEndpoint { let Some(leaf) = self.leaves.get(leaf_name) else { return self.emit_fault_if_possible(Some(key), ProtocolFault::UNKNOWN_LEAF); }; - to_bytes::(&LeafIntrospection { + self.serialize_introspection(&LeafIntrospection { leaf_name: leaf_name.clone(), procedures: leaf.procedures.clone(), - }) - .map_err(|error| EndpointError::Frame(FrameError::Serialize(error)))? - .to_vec() + })? } else { - to_bytes::(&EndpointIntrospection { - sub_endpoints: self - .children - .iter() - .filter(|child| child.registered) - .filter_map(|child| child.path.get(self.path.len()).cloned()) - .collect(), + self.serialize_introspection(&EndpointIntrospection { + sub_endpoints: self.direct_registered_child_names(), leaves: self .leaves .values() @@ -47,9 +40,7 @@ impl ProtocolEndpoint { procedures: leaf.procedures.clone(), }) .collect(), - }) - .map_err(|error| EndpointError::Frame(FrameError::Serialize(error)))? - .to_vec() + })? }; let response_header = PacketHeader { @@ -84,4 +75,29 @@ impl ProtocolEndpoint { }), } } + + fn direct_registered_child_names(&self) -> Vec { + self.children + .iter() + .filter(|child| child.registered) + // Child routes store absolute endpoint paths. Index the first segment below the + // current endpoint so discovery only reports direct descendants. + .filter_map(|child| child.path.get(self.path.len()).cloned()) + .collect() + } + + fn serialize_introspection(&self, value: &T) -> Result, EndpointError> + where + T: for<'a> rkyv::Serialize< + rkyv::api::high::HighSerializer< + rkyv::util::AlignedVec, + rkyv::ser::allocator::ArenaHandle<'a>, + RkyvError, + >, + >, + { + to_bytes::(value) + .map_err(|error| EndpointError::Frame(FrameError::Serialize(error))) + .map(|bytes| bytes.to_vec()) + } } diff --git a/src/protocol/tree/endpoint/receive.rs b/src/protocol/tree/endpoint/receive.rs index af49af0..3d12275 100644 --- a/src/protocol/tree/endpoint/receive.rs +++ b/src/protocol/tree/endpoint/receive.rs @@ -54,6 +54,8 @@ impl ProtocolEndpoint { if let Some(hook) = &message.response_hook && hook.return_path != self.path { + // Calls targeting this endpoint may still ask another endpoint to host the response + // hook. Only register a local active hook when the response path escapes this node. let Some(key) = key.clone() else { unreachable!("response_hook checked above"); }; @@ -76,6 +78,65 @@ impl ProtocolEndpoint { Ok(EndpointOutcome::Local(LocalEvent::Call { header, message })) } + + fn receive_call( + &mut self, + ingress: &Ingress, + parsed: crate::protocol::ParsedFrame<'_>, + ) -> Result { + // Calls only enter from the parent side of the tree or from the endpoint itself. + // Children can return data/faults, but they do not initiate new calls through this node. + if !matches!(ingress, Ingress::Parent | Ingress::Local) { + return Ok(EndpointOutcome::Dropped); + } + + let (header, payload) = parsed.into_parts(); + let message = deserialize_archived_bytes::(payload)?; + validate_call(&header, &message)?; + self.handle_local_call(header, message) + } + + fn receive_data( + &mut self, + parsed: crate::protocol::ParsedFrame<'_>, + ) -> Result { + let (header, payload) = parsed.into_parts(); + let message = deserialize_archived_bytes::< + ArchivedDataMessage, + crate::protocol::DataMessage, + >(payload)?; + self.handle_local_data(header, message) + } + + fn receive_fault( + &mut self, + parsed: crate::protocol::ParsedFrame<'_>, + ) -> Result { + let (header, payload) = parsed.into_parts(); + let message = deserialize_archived_bytes::< + ArchivedFaultMessage, + crate::protocol::FaultMessage, + >(payload)?; + self.handle_local_fault(header, message) + } + + fn forward_or_drop( + route: RouteDecision, + frame: crate::protocol::FrameBytes, + ) -> EndpointOutcome { + match route { + RouteDecision::Child(index) => EndpointOutcome::Forward { + route: RouteDecision::Child(index), + frame, + }, + RouteDecision::Parent => EndpointOutcome::Forward { + route: RouteDecision::Parent, + frame, + }, + RouteDecision::Drop => EndpointOutcome::Dropped, + RouteDecision::Local => unreachable!("local routes are handled before forwarding"), + } + } } impl Endpoint for ProtocolEndpoint { @@ -96,73 +157,15 @@ impl Endpoint for ProtocolEndpoint { return Ok(EndpointOutcome::Dropped); } - match header.packet_type { - crate::protocol::PacketType::Call => { - // Calls only enter from the parent side of the tree or from the endpoint - // itself. Children can return data/faults, but they do not initiate new - // calls through this node. - if !matches!(ingress, Ingress::Parent | Ingress::Local) { - return Ok(EndpointOutcome::Dropped); - } + let route = self.decide_route(&header.dst_path); + if route != RouteDecision::Local { + return Ok(Self::forward_or_drop(route, frame)); + } - match self.decide_route(&header.dst_path) { - RouteDecision::Child(index) => Ok(EndpointOutcome::Forward { - route: RouteDecision::Child(index), - frame, - }), - RouteDecision::Parent => Ok(EndpointOutcome::Forward { - route: RouteDecision::Parent, - frame, - }), - RouteDecision::Drop => Ok(EndpointOutcome::Dropped), - RouteDecision::Local => { - let (header, payload) = parsed.into_parts(); - let message = deserialize_archived_bytes::( - payload, - )?; - validate_call(&header, &message)?; - self.handle_local_call(header, message) - } - } - } - crate::protocol::PacketType::Data => match self.decide_route(&header.dst_path) { - RouteDecision::Local => { - let (header, payload) = parsed.into_parts(); - let message = deserialize_archived_bytes::< - ArchivedDataMessage, - crate::protocol::DataMessage, - >(payload)?; - self.handle_local_data(header, message) - } - RouteDecision::Child(index) => Ok(EndpointOutcome::Forward { - route: RouteDecision::Child(index), - frame, - }), - RouteDecision::Parent => Ok(EndpointOutcome::Forward { - route: RouteDecision::Parent, - frame, - }), - RouteDecision::Drop => Ok(EndpointOutcome::Dropped), - }, - crate::protocol::PacketType::Fault => match self.decide_route(&header.dst_path) { - RouteDecision::Local => { - let (header, payload) = parsed.into_parts(); - let message = deserialize_archived_bytes::< - ArchivedFaultMessage, - crate::protocol::FaultMessage, - >(payload)?; - self.handle_local_fault(header, message) - } - RouteDecision::Child(index) => Ok(EndpointOutcome::Forward { - route: RouteDecision::Child(index), - frame, - }), - RouteDecision::Parent => Ok(EndpointOutcome::Forward { - route: RouteDecision::Parent, - frame, - }), - RouteDecision::Drop => Ok(EndpointOutcome::Dropped), - }, + match header.packet_type { + crate::protocol::PacketType::Call => self.receive_call(ingress, parsed), + crate::protocol::PacketType::Data => self.receive_data(parsed), + crate::protocol::PacketType::Fault => self.receive_fault(parsed), } } } diff --git a/src/protocol/tree/hook.rs b/src/protocol/tree/hook.rs index b89edbb..a773aa7 100644 --- a/src/protocol/tree/hook.rs +++ b/src/protocol/tree/hook.rs @@ -73,6 +73,10 @@ impl HookTable { /// /// Hook ids are scoped by host path, so this only needs to guarantee uniqueness within the /// local table. The wrapped increment keeps allocation infallible for long-lived runtimes. + /// + /// The table currently uses one counter shared across all host paths. The `return_path` + /// parameter remains in the API because hook ids are still interpreted as host-scoped by the + /// rest of the protocol surface. #[must_use] pub fn allocate_hook_id(&mut self, _return_path: &[String]) -> u64 { let id = self.next_id.max(1); @@ -197,6 +201,9 @@ impl HookTable { } /// Marks the local side finished and returns `true` once both sides are finished. + /// + /// This does not remove the hook. Callers use the boolean to decide whether cleanup should + /// happen immediately or whether the peer side is still expected to send more traffic. pub fn mark_local_end(&mut self, key: &HookKey) -> bool { let Some(active) = self.active_mut(key) else { return false; @@ -206,6 +213,9 @@ impl HookTable { } /// Marks the peer side finished and returns `true` once both sides are finished. + /// + /// This mirrors [`mark_local_end`](Self::mark_local_end): it only reports completion, leaving + /// final removal to the caller so higher layers can decide when to tear down hook state. pub fn mark_peer_end(&mut self, key: &HookKey) -> bool { let Some(active) = self.active_mut(key) else { return false; diff --git a/src/protocol/tree/leaf.rs b/src/protocol/tree/leaf.rs index b67cd94..3cc57d6 100644 --- a/src/protocol/tree/leaf.rs +++ b/src/protocol/tree/leaf.rs @@ -16,6 +16,7 @@ pub trait ProtocolLeaf { /// Generated call metadata and initial `Call` dispatch for one leaf. pub trait CallProcedures: ProtocolLeaf { + /// Leaf-specific error surfaced when generated call dispatch fails. type Error; /// Returns the local procedure suffixes supported by this leaf. @@ -50,6 +51,10 @@ pub trait CallProcedures: ProtocolLeaf { } /// Dispatches one initial `Call` that targeted this leaf. + /// + /// Implementations may assume the endpoint already proved the call targets this leaf. + /// They are still responsible for decoding the typed input payload and deciding which local + /// procedure suffix should run. fn dispatch_call( &mut self, call: crate::protocol::tree::IncomingCall, @@ -66,6 +71,26 @@ pub trait CallProcedures: ProtocolLeaf { /// casing into protocol-visible names. Deterministic is not the same as stable /// across refactors, so shipped protocol surfaces should prefer explicit `id` /// overrides. +/// +/// # Example +/// ```rust +/// use unshell::protocol::tree::derive_leaf_name; +/// +/// let leaf = derive_leaf_name( +/// "unshell-core", +/// "0", +/// "1", +/// "0", +/// "unshell_core::examples::demo_shell", +/// "ShellLeaf", +/// None, +/// None, +/// None, +/// None, +/// None, +/// ); +/// assert_eq!(leaf, "unshell_core.unshell_core.v0_1_0.examples.demo_shell.shell_leaf"); +/// ``` #[allow(clippy::too_many_arguments)] // This helper mirrors derive-macro inputs directly so callers do not have to allocate an // intermediate metadata struct just to compute one deterministic protocol identifier. @@ -138,6 +163,7 @@ fn normalize_leaf_segment(value: &str) -> String { for character in value.chars() { if character.is_ascii_uppercase() { + // Preserve CamelCase word boundaries in a snake_case protocol identifier. if !normalized.is_empty() && !previous_was_separator { normalized.push('_'); } @@ -163,6 +189,8 @@ fn normalize_leaf_segment(value: &str) -> String { } if normalized.is_empty() { + // Protocol identifiers still need a stable non-empty placeholder when user input is all + // punctuation or whitespace. String::from("leaf") } else { normalized diff --git a/src/protocol/tree/mod.rs b/src/protocol/tree/mod.rs index 10d1bd7..debb7f0 100644 --- a/src/protocol/tree/mod.rs +++ b/src/protocol/tree/mod.rs @@ -4,6 +4,8 @@ //! - `routing` contains static path declarations and longest-prefix routing helpers. //! - `hook` contains the pending/active hook lifecycle tables used by endpoint runtime code. //! - `endpoint` ties those pieces together into the runtime-facing protocol endpoint API. +//! - `leaf` defines application-facing metadata and generated call-dispatch traits. +//! - `call` and `procedure` layer higher-level runtimes on top of validated endpoint events. mod call; mod endpoint; diff --git a/src/protocol/tree/procedure.rs b/src/protocol/tree/procedure.rs index aab1ced..14e186b 100644 --- a/src/protocol/tree/procedure.rs +++ b/src/protocol/tree/procedure.rs @@ -185,7 +185,10 @@ impl ProcedureEffect { pub enum ProcedureRuntimeError { /// Protocol endpoint routing or framing failed. Endpoint(EndpointError), - /// The opening call failed to decode or open cleanly. + /// The opening call failed to decode or open cleanly before a session existed. + /// + /// Once a session is already live, runtime failures prefer emitting protocol faults and + /// tearing down that session rather than surfacing leaf errors directly. Decode(super::DispatchError), } @@ -298,35 +301,12 @@ where .collect::>(); for key in keys { - let Some(mut session) = self.leaf.procedure_sessions().remove(&key) else { + let Some(session) = self.leaf.procedure_sessions().remove(&key) else { continue; }; - let effect = match P::poll(&mut self.leaf, &mut session) { - Ok(effect) => self.ensure_terminal_packet(&key, effect), - Err(error) => { - let _ = P::close(&mut self.leaf, session); - frames.extend(self.emit_internal_fault(Some(key.clone()))?); - let _ = error; - continue; - } - }; - - match self.emit_outgoing(effect.outgoing) { - Ok(outgoing) => frames.extend(outgoing.frames), - Err(error) => { - if !effect.close_session { - self.leaf.procedure_sessions().insert(key, session); - } else { - let _ = P::close(&mut self.leaf, session); - } - return Err(error); - } - } - - if !effect.close_session { - self.leaf.procedure_sessions().insert(key, session); - } else { - let _ = P::close(&mut self.leaf, session); + match self.poll_session(key, session)? { + Some(session_frames) => frames.extend(session_frames), + None => continue, } } @@ -349,124 +329,187 @@ where frames: Vec::new(), dropped: true, }), - super::EndpointOutcome::Local(event) => { - let mut runtime = ProcedureRuntimeOutcome::default(); - - match event { - LocalEvent::Call { header, message } => { - if message.procedure_id != P::procedure_id() { - runtime.frames.extend( - self.emit_internal_fault_if_possible( - message.response_hook.as_ref(), - )?, - ); - return Ok(runtime); - } - let Some(hook) = message.response_hook.as_ref() else { - return Ok(runtime); - }; - let hook_key = HookKey::new(hook.return_path.clone(), hook.hook_id); - - let session = match self.open_session(header, message) { - Ok(session) => session, - Err(error) => { - runtime - .frames - .extend(self.emit_internal_fault(Some(hook_key.clone()))?); - let _ = error; - return Ok(runtime); - } - }; - - self.leaf.procedure_sessions().insert(hook_key, session); - } - LocalEvent::Data { - header, - message, - hook_key, - } => { - let Some(mut session) = self.leaf.procedure_sessions().remove(&hook_key) - else { - return Ok(runtime); - }; - let effect = match P::on_data( - &mut self.leaf, - &mut session, - IncomingData { - header, - message, - hook_key: hook_key.clone(), - }, - ) { - Ok(effect) => self.ensure_terminal_packet(&hook_key, effect), - Err(error) => { - let _ = P::close(&mut self.leaf, session); - runtime - .frames - .extend(self.emit_internal_fault(Some(hook_key.clone()))?); - let _ = error; - return Ok(runtime); - } - }; - match self.emit_outgoing(effect.outgoing) { - Ok(outgoing) => runtime.frames.extend(outgoing.frames), - Err(error) => { - if !effect.close_session { - self.leaf.procedure_sessions().insert(hook_key, session); - } else { - let _ = P::close(&mut self.leaf, session); - } - return Err(error); - } - } - if !effect.close_session { - self.leaf.procedure_sessions().insert(hook_key, session); - } else { - let _ = P::close(&mut self.leaf, session); - } - } - LocalEvent::Fault { - header, - message, - hook_key, - } => { - let Some(mut session) = self.leaf.procedure_sessions().remove(&hook_key) - else { - return Ok(runtime); - }; - let on_fault_result = P::on_fault( - &mut self.leaf, - &mut session, - IncomingFault { - header, - fault: message, - hook_key: hook_key.clone(), - }, - ); - let close_result = P::close(&mut self.leaf, session); - if let Err(error) = on_fault_result { - let _ = close_result; - runtime - .frames - .extend(self.emit_internal_fault(Some(hook_key.clone()))?); - let _ = error; - return Ok(runtime); - } - if let Err(error) = close_result { - runtime - .frames - .extend(self.emit_internal_fault(Some(hook_key))?); - let _ = error; - return Ok(runtime); - } - } - } - - Ok(runtime) - } + super::EndpointOutcome::Local(event) => self.process_local_event(event), } } + fn poll_session( + &mut self, + key: HookKey, + mut session: P, + ) -> Result>, ProcedureRuntimeError> { + let effect = match P::poll(&mut self.leaf, &mut session) { + Ok(effect) => self.ensure_terminal_packet(&key, effect), + Err(error) => { + let _ = P::close(&mut self.leaf, session); + let frames = self.emit_internal_fault(Some(key.clone()))?; + let _ = error; + return Ok(Some(frames)); + } + }; + + let outgoing = match self.emit_outgoing(effect.outgoing) { + Ok(outgoing) => outgoing.frames, + Err(error) => { + if !effect.close_session { + self.leaf.procedure_sessions().insert(key, session); + } else { + let _ = P::close(&mut self.leaf, session); + } + return Err(error); + } + }; + + if !effect.close_session { + self.leaf.procedure_sessions().insert(key, session); + } else { + let _ = P::close(&mut self.leaf, session); + } + + Ok(Some(outgoing)) + } + + fn process_local_event( + &mut self, + event: LocalEvent, + ) -> Result> { + match event { + LocalEvent::Call { header, message } => self.process_local_call(header, message), + LocalEvent::Data { + header, + message, + hook_key, + } => self.process_local_data(header, message, hook_key), + LocalEvent::Fault { + header, + message, + hook_key, + } => self.process_local_fault(header, message, hook_key), + } + } + + fn process_local_call( + &mut self, + header: crate::protocol::PacketHeader, + message: CallMessage, + ) -> Result> { + let mut runtime = ProcedureRuntimeOutcome::default(); + if message.procedure_id != P::procedure_id() { + runtime + .frames + .extend(self.emit_internal_fault_if_possible(message.response_hook.as_ref())?); + return Ok(runtime); + } + let Some(hook) = message.response_hook.as_ref() else { + return Ok(runtime); + }; + let hook_key = HookKey::new(hook.return_path.clone(), hook.hook_id); + + let session = match self.open_session(header, message) { + Ok(session) => session, + Err(error) => { + runtime + .frames + .extend(self.emit_internal_fault(Some(hook_key.clone()))?); + let _ = error; + return Ok(runtime); + } + }; + + self.leaf.procedure_sessions().insert(hook_key, session); + Ok(runtime) + } + + fn process_local_data( + &mut self, + header: crate::protocol::PacketHeader, + message: crate::protocol::DataMessage, + hook_key: HookKey, + ) -> Result> { + let Some(mut session) = self.leaf.procedure_sessions().remove(&hook_key) else { + return Ok(ProcedureRuntimeOutcome::default()); + }; + let effect = match P::on_data( + &mut self.leaf, + &mut session, + IncomingData { + header, + message, + hook_key: hook_key.clone(), + }, + ) { + Ok(effect) => self.ensure_terminal_packet(&hook_key, effect), + Err(error) => { + let _ = P::close(&mut self.leaf, session); + let frames = self.emit_internal_fault(Some(hook_key.clone()))?; + let _ = error; + return Ok(ProcedureRuntimeOutcome { + frames, + dropped: false, + }); + } + }; + let outgoing = match self.emit_outgoing(effect.outgoing) { + Ok(outgoing) => outgoing.frames, + Err(error) => { + if !effect.close_session { + self.leaf.procedure_sessions().insert(hook_key, session); + } else { + let _ = P::close(&mut self.leaf, session); + } + return Err(error); + } + }; + if !effect.close_session { + self.leaf.procedure_sessions().insert(hook_key, session); + } else { + let _ = P::close(&mut self.leaf, session); + } + Ok(ProcedureRuntimeOutcome { + frames: outgoing, + dropped: false, + }) + } + + fn process_local_fault( + &mut self, + header: crate::protocol::PacketHeader, + message: crate::protocol::FaultMessage, + hook_key: HookKey, + ) -> Result> { + let Some(mut session) = self.leaf.procedure_sessions().remove(&hook_key) else { + return Ok(ProcedureRuntimeOutcome::default()); + }; + let on_fault_result = P::on_fault( + &mut self.leaf, + &mut session, + IncomingFault { + header, + fault: message, + hook_key: hook_key.clone(), + }, + ); + let close_result = P::close(&mut self.leaf, session); + if let Err(error) = on_fault_result { + let _ = close_result; + let frames = self.emit_internal_fault(Some(hook_key.clone()))?; + let _ = error; + return Ok(ProcedureRuntimeOutcome { + frames, + dropped: false, + }); + } + if let Err(error) = close_result { + let frames = self.emit_internal_fault(Some(hook_key))?; + let _ = error; + return Ok(ProcedureRuntimeOutcome { + frames, + dropped: false, + }); + } + Ok(ProcedureRuntimeOutcome::default()) + } + fn open_session( &mut self, header: crate::protocol::PacketHeader, @@ -543,6 +586,10 @@ where Ok(self.process_endpoint_outcome(outcome)?.frames) } + /// Ensures a closing session leaves the protocol hook in a fully terminated state. + /// + /// If leaf code requests `close_session` without emitting an explicit terminal packet, the + /// runtime synthesizes an empty final `Data` frame so the hook closes cleanly on the wire. fn ensure_terminal_packet( &self, hook_key: &HookKey, diff --git a/src/protocol/tree/routing.rs b/src/protocol/tree/routing.rs index 53cafc3..f4f1a71 100644 --- a/src/protocol/tree/routing.rs +++ b/src/protocol/tree/routing.rs @@ -5,15 +5,21 @@ use alloc::{collections::BTreeMap, string::String, vec, vec::Vec}; -/// Explicit test tree declaration used for configuration. +/// Explicit tree declaration used for configuration and tests. #[derive(Debug, Clone, PartialEq, Eq)] pub enum TreeNode { /// The protocol root. Its path is always empty. - Root { children: Vec }, + Root { + /// Direct child endpoints hosted below the root. + children: Vec, + }, /// An addressable endpoint segment in the tree. Endpoint { + /// Path segment contributed by this endpoint. segment: String, + /// Leaves hosted directly at this endpoint. leaves: Vec, + /// Direct child endpoints hosted below this endpoint. children: Vec, }, } @@ -29,6 +35,8 @@ pub struct LeafNode { impl TreeNode { /// Flattens the explicit tree into the set of endpoint paths it declares. + /// + /// The returned list always includes the protocol root as `[]`. pub fn paths(&self) -> Vec> { let mut paths = Vec::new(); self.collect_paths(&[], &mut paths); @@ -88,6 +96,9 @@ struct RouteTrieNode { impl CompiledRoutes { /// Compiles child endpoint paths into a trie rooted at `local_path`. + /// + /// Only strict descendants of `local_path` participate in the compiled trie. Paths outside + /// the local subtree, or equal to `local_path` itself, are ignored. #[must_use] pub fn new(local_path: &[String], child_paths: &[Vec], has_parent: bool) -> Self { let mut routes = Self { @@ -169,8 +180,11 @@ pub fn is_prefix(prefix: &[String], path: &[String]) -> bool { .zip(path.iter()) .all(|(left, right)| left == right) } - /// Trait for resolving a destination path to a routing decision. +/// +/// The default policy is longest-prefix routing: exact matches stay local, the deepest matching +/// descendant wins for child forwarding, destinations outside the local subtree go to the parent +/// when one exists, and everything else drops. pub trait RouteProvider { /// Returns the route decision for `dst_path` from the perspective of `local_path`. fn route_destination( @@ -209,6 +223,9 @@ impl RouteProvider for DefaultRouteProvider { } /// Resolves `dst_path` with the default longest-prefix route provider. +/// +/// Exact matches return [`RouteDecision::Local`]. Destinations outside the local subtree return +/// [`RouteDecision::Parent`] when `has_parent` is `true`, otherwise [`RouteDecision::Drop`]. pub fn route_destination( local_path: &[String], child_paths: I, diff --git a/src/protocol/types.rs b/src/protocol/types.rs index 5eaf57a..a85bc40 100644 --- a/src/protocol/types.rs +++ b/src/protocol/types.rs @@ -18,50 +18,80 @@ pub enum PacketType { /// Header fields used for routing and hook attribution. #[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct PacketHeader { + /// Wire-level packet class, which determines which payload type follows. pub packet_type: PacketType, + /// Absolute endpoint path that sent the packet. pub src_path: Vec, + /// Absolute endpoint path the packet is trying to reach. pub dst_path: Vec, + /// Optional leaf name inside `dst_path` that should receive a `Call` packet. + /// + /// `Data` and `Fault` packets must leave this unset. pub dst_leaf: Option, + /// Hook identifier scoped to the receiving endpoint. + /// + /// `Call` packets must leave this unset. `Data` and `Fault` packets must fill it in. pub hook_id: Option, } /// Hook declaration embedded inside a call. #[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct HookTarget { + /// Hook identifier reserved by the caller for returned `Data` or `Fault` traffic. pub hook_id: u64, + /// Absolute endpoint path that should receive the response stream. + /// + /// Protocol validation requires this to exactly match the enclosing call header's + /// `src_path`. pub return_path: Vec, } /// Downwards call payload. #[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct CallMessage { + /// Canonical procedure identifier chosen by the caller. pub procedure_id: String, + /// Opaque application payload for the target procedure. pub data: Vec, + /// Optional response hook reservation for returned hook traffic. pub response_hook: Option, } /// Hook data payload. #[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct DataMessage { + /// Canonical procedure identifier that owns the hook stream. pub procedure_id: String, + /// Opaque application payload for the hook message. pub data: Vec, + /// Whether this packet closes the peer side of the hook stream. pub end_hook: bool, } /// Protocol fault payload. #[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct FaultMessage { + /// Stable protocol-level reason code for the failure. pub fault: ProtocolFault, } -/// Stable protocol fault set. +/// Stable protocol fault code. +/// +/// The raw numeric value is public so callers can persist, compare, or forward fault codes +/// without knowing every symbolic constant in advance. Unknown values are allowed so newer +/// peers can extend the set without breaking older runtimes. #[derive(Archive, Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] pub struct ProtocolFault(pub u8); impl ProtocolFault { + /// The addressed leaf name does not exist at the destination endpoint. pub const UNKNOWN_LEAF: Self = Self(0x01); + /// The destination exists, but it does not expose the requested procedure id. pub const UNKNOWN_PROCEDURE: Self = Self(0x02); + /// The packet source path is not valid for the ingress side where it arrived. pub const INVALID_SOURCE_PATH: Self = Self(0x03); + /// Hook traffic arrived from a peer that does not own the active hook relationship. pub const INVALID_HOOK_PEER: Self = Self(0x04); + /// The runtime hit an internal protocol failure and could only surface a generic fault. pub const INTERNAL_ERROR: Self = Self(0x05); } diff --git a/src/protocol/validation.rs b/src/protocol/validation.rs index 13c3f51..a683149 100644 --- a/src/protocol/validation.rs +++ b/src/protocol/validation.rs @@ -8,10 +8,15 @@ use core::fmt; /// Validation failures for protocol structures. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ValidationError { + /// One header field combination is invalid for the chosen packet type. HeaderInvariant(&'static str), + /// The procedure identifier violates the protocol's minimal reserved-id rules. ProcedureId(&'static str), + /// The call payload contradicts the surrounding packet header. CallInvariant(&'static str), + /// A hook lifecycle transition would break protocol state invariants. HookInvariant(&'static str), + /// A hook id collided with existing endpoint-local state. InvalidHookId, } @@ -29,7 +34,10 @@ impl fmt::Display for ValidationError { impl core::error::Error for ValidationError {} -/// Validates packet header invariants from the protocol. +/// Validates stateless packet-header invariants. +/// +/// This checks wire-shape rules only. It does not verify route existence, leaf existence, +/// hook ownership, or whether the destination actually supports the requested procedure. pub fn validate_header(header: &PacketHeader) -> Result<(), ValidationError> { match header.packet_type { PacketType::Call => { @@ -56,6 +64,9 @@ pub fn validate_header(header: &PacketHeader) -> Result<(), ValidationError> { } /// Validates the protocol-level `procedure_id` invariant. +/// +/// This is intentionally permissive. The protocol reserves only the empty string for +/// introspection; every other non-empty identifier is treated as opaque application data. pub fn validate_procedure_id(procedure_id: &str) -> Result<(), ValidationError> { if procedure_id == INTROSPECTION_PROCEDURE_ID { return Ok(()); @@ -69,6 +80,9 @@ pub fn validate_procedure_id(procedure_id: &str) -> Result<(), ValidationError> } /// Validates call-specific invariants that depend on both header and payload. +/// +/// This complements [`validate_header`]. It does not verify destination reachability or leaf +/// support, only consistency between the opening `Call` header and payload. pub fn validate_call(header: &PacketHeader, call: &CallMessage) -> Result<(), ValidationError> { validate_procedure_id(&call.procedure_id)?; @@ -81,6 +95,7 @@ pub fn validate_call(header: &PacketHeader, call: &CallMessage) -> Result<(), Va } if call.procedure_id == INTROSPECTION_PROCEDURE_ID && call.response_hook.is_none() { + // Introspection is defined as a request/response exchange, never a fire-and-forget call. return Err(ValidationError::CallInvariant( "introspection requires a response hook", ));