diff --git a/src/protocol/tests/protocol.rs b/src/protocol/tests/protocol.rs index 0664c33..1374fe1 100644 --- a/src/protocol/tests/protocol.rs +++ b/src/protocol/tests/protocol.rs @@ -81,16 +81,11 @@ fn header_and_call_validation_reject_invalid_combinations() { #[test] fn procedure_validation_accepts_introspection_and_rejects_bad_shapes() { assert_eq!(validate_procedure_id(""), Ok(())); + assert_eq!(validate_procedure_id("unshell.echo.v01.alpha.invoke"), Ok(())); assert_eq!( - validate_procedure_id("unshell.echo.v01.alpha.invoke"), + validate_procedure_id("contains spaces"), Err(ValidationError::ProcedureId( - "version segment must be v followed by a positive decimal integer" - )) - ); - assert_eq!( - validate_procedure_id("too.short.v1"), - Err(ValidationError::ProcedureId( - "must contain exactly 5 segments" + "procedure identifier should use alphanumeric characters, dots, and underscores" )) ); } @@ -105,7 +100,7 @@ fn truncated_frames_are_rejected() { hook_id: Some(9), }; let message = FaultMessage { - fault: ProtocolFault::InternalError, + fault: ProtocolFault::INTERNAL_ERROR, }; let frame = encode_packet(&header, &message).expect("frame should encode"); diff --git a/src/protocol/tests/tree.rs b/src/protocol/tests/tree.rs index d97b07c..1d5c8cd 100644 --- a/src/protocol/tests/tree.rs +++ b/src/protocol/tests/tree.rs @@ -6,7 +6,7 @@ use crate::protocol::tree::{ }; use crate::protocol::{ DataMessage, EndpointIntrospection, FaultMessage, PacketHeader, PacketType, ProtocolFault, - decode_frame, deserialize_archived_bytes, encode_packet, + deserialize_archived_bytes, encode_packet, }; fn path(parts: &[&str]) -> Vec { @@ -77,16 +77,16 @@ fn protocol_endpoint_introspection_returns_leaf_summary() { .receive(&Ingress::Local, frame) .expect("endpoint should handle introspection"); - assert!(outcome.events.is_empty()); - assert_eq!(outcome.forwards.len(), 1); - assert_eq!(outcome.forwards[0].0, RouteDecision::Parent); + assert!(outcome.forwards.is_empty()); + assert_eq!(outcome.events.len(), 1); - let parsed = decode_frame(&outcome.forwards[0].1).expect("response should decode"); - let response = parsed - .deserialize_data() - .expect("response data should deserialize"); + let LocalEvent::Data { header, message: response } = &outcome.events[0] else { + panic!("expected local data event"); + }; + assert_eq!(header.packet_type, PacketType::Data); + assert_eq!(header.dst_path, path(&["root"])); let introspection = deserialize_archived_bytes::< - rkyv::Archived, + crate::protocol::introspection::ArchivedEndpointIntrospection, EndpointIntrospection, >(&response.data) .expect("introspection payload should deserialize"); @@ -146,7 +146,7 @@ fn invalid_hook_peer_emits_local_fault_event() { assert_eq!( message, &FaultMessage { - fault: ProtocolFault::InvalidHookPeer, + fault: ProtocolFault::INVALID_HOOK_PEER, } ); } diff --git a/src/protocol/traits.rs b/src/protocol/traits.rs index c178bdc..86c91e2 100644 --- a/src/protocol/traits.rs +++ b/src/protocol/traits.rs @@ -8,7 +8,7 @@ use alloc::{string::String, vec::Vec}; use super::{ FrameBytes, FrameCodec, LeafIntrospection, LeafIntrospectionSummary, tree::{ - ActiveHook, Endpoint, EndpointError, EndpointOutcome, HookKey, HookTable, Ingress, + ActiveHook, Endpoint, EndpointError, EndpointOutcome, HookConflict, HookKey, HookTable, Ingress, LeafNode, LeafSpec, PendingHook, RouteProvider, }, }; @@ -26,8 +26,8 @@ impl RouteResolution for T where T: RouteProvider + ?Sized {} /// Hook storage contract for pending and active protocol flows. pub trait HookStore { fn allocate_hook_id(&mut self, return_path: &[String]) -> u64; - fn insert_pending(&mut self, pending: PendingHook) -> Result<(), ()>; - fn insert_active(&mut self, active: ActiveHook) -> Result<(), ()>; + fn insert_pending(&mut self, pending: PendingHook) -> Result<(), HookConflict>; + fn insert_active(&mut self, active: ActiveHook) -> Result<(), HookConflict>; fn activate_pending(&mut self, key: &HookKey, peer_path: Vec) -> Option<()>; fn remove_pending(&mut self, key: &HookKey) -> Option; fn remove_active(&mut self, key: &HookKey) -> Option; @@ -41,11 +41,11 @@ impl HookStore for HookTable { HookTable::allocate_hook_id(self, return_path) } - fn insert_pending(&mut self, pending: PendingHook) -> Result<(), ()> { + fn insert_pending(&mut self, pending: PendingHook) -> Result<(), HookConflict> { HookTable::insert_pending(self, pending) } - fn insert_active(&mut self, active: ActiveHook) -> Result<(), ()> { + fn insert_active(&mut self, active: ActiveHook) -> Result<(), HookConflict> { HookTable::insert_active(self, active) } diff --git a/src/protocol/tree/endpoint.rs b/src/protocol/tree/endpoint.rs deleted file mode 100644 index de678ff..0000000 --- a/src/protocol/tree/endpoint.rs +++ /dev/null @@ -1,630 +0,0 @@ -//! Endpoint runtime and traits. - -use alloc::{ - collections::{BTreeMap, BTreeSet}, - string::String, - vec, - vec::Vec, -}; -use core::fmt; -use rkyv::{rancor::Error as RkyvError, to_bytes}; - -use crate::protocol::{ - CallMessage, DataMessage, EndpointIntrospection, FaultMessage, FrameBytes, FrameError, - HookTarget, LeafIntrospection, LeafIntrospectionSummary, PacketHeader, PacketType, - ProtocolFault, ValidationError, decode_frame, encode_packet, - introspection::INTROSPECTION_PROCEDURE_ID, validate_call, validate_header, - validate_procedure_id, -}; - -use super::{ActiveHook, HookKey, HookTable, PendingHook, RouteDecision, route_destination}; - -/// Local connection state. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ConnectionState { - Unregistered, - Registered, -} - -/// Registered child route. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ChildRoute { - pub path: Vec, - pub state: ConnectionState, -} - -impl ChildRoute { - pub fn registered(path: Vec) -> Self { - Self { - path, - state: ConnectionState::Registered, - } - } -} - -/// Leaf behavior for test runtime. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum LeafBehavior { - Echo, -} - -/// Static leaf description. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct LeafSpec { - pub name: String, - pub procedures: Vec, - pub behavior: LeafBehavior, -} - -/// Arrival side. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Ingress { - Parent, - Child(Vec), - Local, -} - -/// Local events. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum LocalEvent { - Call { - header: PacketHeader, - message: CallMessage, - }, - Data { - header: PacketHeader, - message: DataMessage, - }, - Fault { - header: PacketHeader, - message: FaultMessage, - }, -} - -/// Processing outcome. -#[derive(Debug, Default)] -pub struct EndpointOutcome { - pub forwards: Vec<(RouteDecision, FrameBytes)>, - pub events: Vec, - pub dropped: bool, -} - -/// Processing error. -#[derive(Debug)] -pub enum EndpointError { - Frame(FrameError), - Validation(ValidationError), -} - -impl fmt::Display for EndpointError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Frame(error) => write!(f, "{error}"), - Self::Validation(error) => write!(f, "{error}"), - } - } -} - -impl core::error::Error for EndpointError {} - -impl From for EndpointError { - fn from(value: FrameError) -> Self { - Self::Frame(value) - } -} - -impl From for EndpointError { - fn from(value: ValidationError) -> Self { - Self::Validation(value) - } -} - -/// Core trait for a protocol endpoint. -pub trait Endpoint { - fn path(&self) -> &[String]; - fn receive( - &mut self, - ingress: &Ingress, - frame: FrameBytes, - ) -> Result; -} - -/// Default endpoint implementation. -#[derive(Debug, Default)] -pub struct ProtocolEndpoint { - path: Vec, - parent_path: Option>, - children: Vec, - leaves: BTreeMap, - endpoint_procedures: BTreeSet, - hooks: HookTable, -} - -impl ProtocolEndpoint { - pub fn new( - path: Vec, - parent_path: Option>, - children: Vec, - leaves: Vec, - ) -> Self { - Self { - path, - parent_path, - children, - leaves: leaves - .into_iter() - .map(|leaf| (leaf.name.clone(), leaf)) - .collect(), - endpoint_procedures: BTreeSet::new(), - hooks: HookTable::default(), - } - } - - pub fn add_endpoint_procedure( - &mut self, - procedure_id: impl Into, - ) -> Result<(), EndpointError> { - let procedure_id = procedure_id.into(); - validate_procedure_id(&procedure_id)?; - self.endpoint_procedures.insert(procedure_id); - Ok(()) - } - - pub fn allocate_hook_id(&mut self) -> u64 { - self.hooks.allocate_hook_id(&self.path) - } - - pub fn make_call( - &mut self, - dst_path: Vec, - dst_leaf: Option, - procedure_id: impl Into, - response_hook_id: Option, - data: Vec, - ) -> Result { - let procedure_id = procedure_id.into(); - validate_procedure_id(&procedure_id)?; - let response_hook = response_hook_id.map(|hook_id| HookTarget { - hook_id, - return_path: self.path.clone(), - }); - let header = PacketHeader { - packet_type: PacketType::Call, - src_path: self.path.clone(), - dst_path: dst_path.clone(), - dst_leaf: dst_leaf.clone(), - hook_id: None, - }; - let call = CallMessage { - procedure_id: procedure_id.clone(), - data, - response_hook, - }; - validate_header(&header)?; - validate_call(&header, &call)?; - - if let Some(hook) = &call.response_hook { - if self.hooks.insert_active(ActiveHook { - return_path: hook.return_path.clone(), - hook_id: hook.hook_id, - peer_path: dst_path, - procedure_id, - dst_leaf, - peer_finished: false, - }).is_err() { - return Err(EndpointError::Validation(ValidationError::InvalidHookId)); - } - } - - Ok(encode_packet(&header, &call)?) - } - - pub fn make_data( - &self, - dst_path: Vec, - hook_id: u64, - procedure_id: impl Into, - data: Vec, - end_hook: bool, - ) -> Result { - let procedure_id = procedure_id.into(); - validate_procedure_id(&procedure_id)?; - let header = PacketHeader { - packet_type: PacketType::Data, - src_path: self.path.clone(), - dst_path, - dst_leaf: None, - hook_id: Some(hook_id), - }; - let message = DataMessage { - procedure_id, - data, - end_hook, - }; - validate_header(&header)?; - Ok(encode_packet(&header, &message)?) - } - - fn handle_local_call( - &mut self, - header: PacketHeader, - message: CallMessage, - ) -> Result { - let key = message - .response_hook - .as_ref() - .map(|hook| HookKey::new(hook.return_path.clone(), hook.hook_id)); - - if let Some(hook) = &message.response_hook { - if self.hooks.insert_pending(PendingHook { - caller_src_path: header.src_path.clone(), - return_path: hook.return_path.clone(), - hook_id: hook.hook_id, - procedure_id: message.procedure_id.clone(), - dst_leaf: header.dst_leaf.clone(), - }).is_err() { - return self.emit_fault_if_possible(key, ProtocolFault::INTERNAL_ERROR); - } - } - - if message.procedure_id == INTROSPECTION_PROCEDURE_ID { - return self.handle_introspection(&header, key); - } - - let supported = match &header.dst_leaf { - Some(leaf_name) => self - .leaves - .get(leaf_name) - .map(|leaf| leaf.procedures.iter().any(|p| p == &message.procedure_id)) - .unwrap_or(false), - None => self.endpoint_procedures.contains(&message.procedure_id), - }; - - if !supported { - let fault = if header - .dst_leaf - .as_ref() - .is_some_and(|name| !self.leaves.contains_key(name)) - { - ProtocolFault::UNKNOWN_LEAF - } else { - ProtocolFault::UNKNOWN_PROCEDURE - }; - return self.emit_fault_if_possible(key, fault); - } - - if let Some(key) = &key { - self.hooks.activate_pending(key, header.src_path.clone()); - } - - match header - .dst_leaf - .as_ref() - .and_then(|name| self.leaves.get(name)) - { - Some(leaf) if leaf.behavior == LeafBehavior::Echo && key.is_some() => { - let hook = message.response_hook.expect("synchronized"); - let response = DataMessage { - procedure_id: message.procedure_id.clone(), - data: message.data, - end_hook: true, - }; - let response_header = PacketHeader { - packet_type: PacketType::Data, - src_path: self.path.clone(), - dst_path: hook.return_path.clone(), - dst_leaf: None, - hook_id: Some(hook.hook_id), - }; - let frame = encode_packet(&response_header, &response)?; - let route = self.decide_route(&hook.return_path); - self.hooks - .remove_active(&HookKey::new(hook.return_path, hook.hook_id)); - Ok(EndpointOutcome { - forwards: vec![(route, frame)], - ..EndpointOutcome::default() - }) - } - _ => Ok(EndpointOutcome { - events: vec![LocalEvent::Call { header, message }], - ..EndpointOutcome::default() - }), - } - } - - fn handle_introspection( - &mut self, - header: &PacketHeader, - key: Option, - ) -> Result { - let Some(key) = key else { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); - }; - self.hooks.activate_pending(&key, header.src_path.clone()); - - let payload = if let Some(leaf_name) = &header.dst_leaf { - let Some(leaf) = self.leaves.get(leaf_name) else { - return self.emit_fault_if_possible(Some(key), ProtocolFault::UNKNOWN_LEAF); - }; - to_bytes::(&LeafIntrospection { - leaf_name: leaf_name.clone(), - procedures: leaf.procedures.clone(), - }) - .map_err(|e| EndpointError::Frame(FrameError::Serialize(e)))? - .to_vec() - } else { - to_bytes::(&EndpointIntrospection { - leaves: self - .leaves - .values() - .map(|leaf| LeafIntrospectionSummary { - leaf_name: leaf.name.clone(), - procedures: leaf.procedures.clone(), - }) - .collect(), - }) - .map_err(|e| EndpointError::Frame(FrameError::Serialize(e)))? - .to_vec() - };; - - let response_header = PacketHeader { - packet_type: PacketType::Data, - src_path: self.path.clone(), - dst_path: key.return_path.clone(), - dst_leaf: None, - hook_id: Some(key.hook_id), - }; - let route = self.decide_route(&key.return_path); - let response = DataMessage { - procedure_id: String::new(), - data: payload, - end_hook: true, - }; - let frame = encode_packet(&response_header, &response)?; - self.hooks.remove_active(&key); - Ok(EndpointOutcome { - forwards: vec![(route, frame)], - ..EndpointOutcome::default() - }) - } - - fn handle_local_data( - &mut self, - header: PacketHeader, - message: DataMessage, - ) -> Result { - let key = HookKey::new(self.path.clone(), header.hook_id.expect("validated")); - - if self.hooks.active(&key).is_none() { - let matches = self.hooks.pending(&key).is_some_and(|p| { - p.caller_src_path == header.src_path && p.procedure_id == message.procedure_id - }); - if matches { - self.hooks.activate_pending(&key, header.src_path.clone()); - } - } - - let Some(active) = self.hooks.active(&key).cloned() else { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); - }; - - if active.peer_path != header.src_path { - self.hooks.remove_active(&key); - self.hooks.remove_pending(&key); - return Ok(EndpointOutcome { - events: vec![LocalEvent::Fault { - header: PacketHeader { - packet_type: PacketType::Fault, - src_path: header.src_path, - dst_path: self.path.clone(), - dst_leaf: None, - hook_id: Some(key.hook_id), - }, - message: FaultMessage { - fault: ProtocolFault::INVALID_HOOK_PEER, - }, - }], - ..EndpointOutcome::default() - }); - } - - if active.procedure_id != message.procedure_id { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); - } - - if message.end_hook { - self.hooks.remove_active(&key); - } - Ok(EndpointOutcome { - events: vec![LocalEvent::Data { header, message }], - ..EndpointOutcome::default() - }) - } - - fn handle_local_fault( - &mut self, - header: PacketHeader, - message: FaultMessage, - ) -> Result { - let key = HookKey::new(self.path.clone(), header.hook_id.expect("validated")); - let matches = self - .hooks - .active(&key) - .is_some_and(|a| a.peer_path == header.src_path) - || self - .hooks - .pending(&key) - .is_some_and(|p| p.caller_src_path == header.src_path); - if !matches { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); - } - self.hooks.remove_active(&key); - self.hooks.remove_pending(&key); - Ok(EndpointOutcome { - events: vec![LocalEvent::Fault { header, message }], - ..EndpointOutcome::default() - }) - } - - fn emit_fault_if_possible( - &mut self, - key: Option, - fault: ProtocolFault, - ) -> Result { - let Some(key) = key else { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); - }; - self.hooks.remove_pending(&key); - self.hooks.remove_active(&key); - let route = self.decide_route(&key.return_path); - let header = PacketHeader { - packet_type: PacketType::Fault, - src_path: self.path.clone(), - dst_path: key.return_path.clone(), - dst_leaf: None, - hook_id: Some(key.hook_id), - }; - let frame = encode_packet(&header, &FaultMessage { fault })?; - Ok(EndpointOutcome { - forwards: vec![(route, frame)], - ..EndpointOutcome::default() - }) - } - - fn decide_route(&self, dst_path: &[String]) -> RouteDecision { - let child_paths = self - .children - .iter() - .filter(|c| c.state == ConnectionState::Registered) - .map(|c| &c.path); - route_destination( - &self.path, - child_paths, - self.parent_path.is_some(), - dst_path, - ) - } - - fn valid_source_for_ingress(&self, ingress: &Ingress, src_path: &[String]) -> bool { - match ingress { - Ingress::Parent => { - // Valid if src_path is an ancestor, sibling, or the current node itself. - // Invalid if it's a descendant of the current node. - if src_path.len() < self.path.len() { - return true; // Ancestor or sibling in a different branch - } - if src_path.len() == self.path.len() { - return src_path == self.path; // Current node - } - // Check if it's a descendant - !src_path.starts_with(&self.path) - } - Ingress::Child(child_path) => { - // Valid if src_path is the child itself or any descendant of the child. - src_path.starts_with(child_path) - } - Ingress::Local => src_path == self.path, - } - } -} - -impl Endpoint for ProtocolEndpoint { - fn path(&self) -> &[String] { - &self.path - } - - fn receive( - &mut self, - ingress: &Ingress, - frame: FrameBytes, - ) -> Result { - let parsed = decode_frame(&frame)?; - let header = parsed.header(); - validate_header(header)?; - if !self.valid_source_for_ingress(ingress, &header.src_path) { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); - } - - match header.packet_type { - PacketType::Call => { - let message = parsed.deserialize_call()?; - if !matches!(ingress, Ingress::Parent | Ingress::Local) { - return Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }); - } - validate_call(header, &message)?; - match self.decide_route(&header.dst_path) { - RouteDecision::Child(idx) => Ok(EndpointOutcome { - forwards: vec![(RouteDecision::Child(idx), frame)], - ..EndpointOutcome::default() - }), - RouteDecision::Parent => Ok(EndpointOutcome { - forwards: vec![(RouteDecision::Parent, frame)], - ..EndpointOutcome::default() - }), - RouteDecision::Drop => Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }), - RouteDecision::Local => self.handle_local_call(header.clone(), message), - } - } - PacketType::Data => { - let message = parsed.deserialize_data()?; - match self.decide_route(&header.dst_path) { - RouteDecision::Local => self.handle_local_data(header.clone(), message), - RouteDecision::Child(idx) => Ok(EndpointOutcome { - forwards: vec![(RouteDecision::Child(idx), frame)], - ..EndpointOutcome::default() - }), - RouteDecision::Parent => Ok(EndpointOutcome { - forwards: vec![(RouteDecision::Parent, frame)], - ..EndpointOutcome::default() - }), - RouteDecision::Drop => Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }), - } - } - PacketType::Fault => { - let message = parsed.deserialize_fault()?; - match self.decide_route(&header.dst_path) { - RouteDecision::Local => self.handle_local_fault(header.clone(), message), - RouteDecision::Child(idx) => Ok(EndpointOutcome { - forwards: vec![(RouteDecision::Child(idx), frame)], - ..EndpointOutcome::default() - }), - RouteDecision::Parent => Ok(EndpointOutcome { - forwards: vec![(RouteDecision::Parent, frame)], - ..EndpointOutcome::default() - }), - RouteDecision::Drop => Ok(EndpointOutcome { - dropped: true, - ..EndpointOutcome::default() - }), - } - } - } - } -} diff --git a/src/protocol/tree/endpoint/builders.rs b/src/protocol/tree/endpoint/builders.rs new file mode 100644 index 0000000..6e40f3c --- /dev/null +++ b/src/protocol/tree/endpoint/builders.rs @@ -0,0 +1,140 @@ +//! Packet builders and basic endpoint configuration. +//! +//! These helpers map to `PROTOCOL.md` sections covering packet construction, +//! call headers, and hook declaration fields. + +use alloc::{collections::BTreeSet, string::String, vec::Vec}; + +use crate::protocol::{ + validate_call, validate_header, validate_procedure_id, CallMessage, DataMessage, FrameBytes, + HookTarget, PacketHeader, PacketType, ValidationError, encode_packet, +}; +use crate::protocol::tree::ActiveHook; + +use super::core::{ChildRoute, EndpointError, ProtocolEndpoint}; +use crate::protocol::tree::LeafSpec; + +impl ProtocolEndpoint { + /// Creates a runtime endpoint with static tree topology and leaf metadata. + /// + /// ``` + /// use unshell::protocol::tree::{Endpoint, ProtocolEndpoint}; + /// + /// let endpoint = ProtocolEndpoint::new(Vec::new(), None, Vec::new(), Vec::new()); + /// assert!(endpoint.path().is_empty()); + /// ``` + pub fn new( + path: Vec, + parent_path: Option>, + children: Vec, + leaves: Vec, + ) -> Self { + Self { + path, + parent_path, + children, + leaves: leaves + .into_iter() + .map(|leaf| (leaf.name.clone(), leaf)) + .collect(), + endpoint_procedures: BTreeSet::new(), + hooks: Default::default(), + } + } + + /// Registers an endpoint-local procedure identifier. + pub fn add_endpoint_procedure( + &mut self, + procedure_id: impl Into, + ) -> Result<(), EndpointError> { + let procedure_id = procedure_id.into(); + validate_procedure_id(&procedure_id)?; + self.endpoint_procedures.insert(procedure_id); + Ok(()) + } + + /// Allocates a locally unique hook id. + pub fn allocate_hook_id(&mut self) -> u64 { + self.hooks.allocate_hook_id(&self.path) + } + + /// Builds an outbound `Call` packet and pre-registers active hook state when requested. + pub fn make_call( + &mut self, + dst_path: Vec, + dst_leaf: Option, + procedure_id: impl Into, + response_hook_id: Option, + data: Vec, + ) -> Result { + let procedure_id = procedure_id.into(); + validate_procedure_id(&procedure_id)?; + + let response_hook = response_hook_id.map(|hook_id| HookTarget { + hook_id, + return_path: self.path.clone(), + }); + let header = PacketHeader { + packet_type: PacketType::Call, + src_path: self.path.clone(), + dst_path: dst_path.clone(), + dst_leaf: dst_leaf.clone(), + hook_id: None, + }; + let call = CallMessage { + procedure_id: procedure_id.clone(), + data, + response_hook, + }; + + validate_header(&header)?; + validate_call(&header, &call)?; + + if let Some(hook) = &call.response_hook + && self + .hooks + .insert_active(ActiveHook { + return_path: hook.return_path.clone(), + hook_id: hook.hook_id, + peer_path: dst_path, + procedure_id, + dst_leaf, + peer_finished: false, + }) + .is_err() + { + return Err(EndpointError::Validation(ValidationError::InvalidHookId)); + } + + Ok(encode_packet(&header, &call)?) + } + + /// Builds an outbound `Data` packet for an existing hook. + pub fn make_data( + &self, + dst_path: Vec, + hook_id: u64, + procedure_id: impl Into, + data: Vec, + end_hook: bool, + ) -> Result { + let procedure_id = procedure_id.into(); + validate_procedure_id(&procedure_id)?; + + let header = PacketHeader { + packet_type: PacketType::Data, + src_path: self.path.clone(), + dst_path, + dst_leaf: None, + hook_id: Some(hook_id), + }; + let message = DataMessage { + procedure_id, + data, + end_hook, + }; + + validate_header(&header)?; + Ok(encode_packet(&header, &message)?) + } +} diff --git a/src/protocol/tree/endpoint/core.rs b/src/protocol/tree/endpoint/core.rs new file mode 100644 index 0000000..03d273f --- /dev/null +++ b/src/protocol/tree/endpoint/core.rs @@ -0,0 +1,143 @@ +//! Core endpoint state and externally visible types. +//! +//! This file maps to the protocol concepts described in `PROTOCOL.md`: +//! - Packet processing entry points and local delivery state: "Packet Types" +//! - Child registration state used during route selection: "Routing" +//! - Hook-hosting endpoint state: "Hooks" + +use alloc::{ + collections::{BTreeMap, BTreeSet}, + string::String, + vec::Vec, +}; +use core::fmt; + +use crate::protocol::{CallMessage, DataMessage, FaultMessage, FrameBytes, FrameError, PacketHeader, + ValidationError}; + +use super::super::{HookTable, RouteDecision}; + +/// Local connection state used for child route eligibility. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnectionState { + Unregistered, + Registered, +} + +/// Child path plus current registration state. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChildRoute { + pub path: Vec, + pub state: ConnectionState, +} + +impl ChildRoute { + /// Convenience constructor for the common registered-child case. + pub fn registered(path: Vec) -> Self { + Self { + path, + state: ConnectionState::Registered, + } + } +} + +/// Test leaf behavior implemented by the endpoint runtime. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum LeafBehavior { + Echo, +} + +/// Static leaf metadata used for procedure dispatch and introspection. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LeafSpec { + pub name: String, + pub procedures: Vec, + pub behavior: LeafBehavior, +} + +/// Where a frame entered the local endpoint. +/// +/// This corresponds to the authority and ingress checks described in the +/// `PROTOCOL.md` routing and call sections. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Ingress { + Parent, + Child(Vec), + Local, +} + +/// Locally delivered protocol events. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum LocalEvent { + Call { + header: PacketHeader, + message: CallMessage, + }, + Data { + header: PacketHeader, + message: DataMessage, + }, + Fault { + header: PacketHeader, + message: FaultMessage, + }, +} + +/// Result of processing one framed packet. +#[derive(Debug, Default)] +pub struct EndpointOutcome { + pub forwards: Vec<(RouteDecision, FrameBytes)>, + pub events: Vec, + pub dropped: bool, +} + +/// Errors returned while decoding or validating a packet. +#[derive(Debug)] +pub enum EndpointError { + Frame(FrameError), + Validation(ValidationError), +} + +impl fmt::Display for EndpointError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Frame(error) => write!(f, "{error}"), + Self::Validation(error) => write!(f, "{error}"), + } + } +} + +impl core::error::Error for EndpointError {} + +impl From for EndpointError { + fn from(value: FrameError) -> Self { + Self::Frame(value) + } +} + +impl From for EndpointError { + fn from(value: ValidationError) -> Self { + Self::Validation(value) + } +} + +/// Public packet-processing trait exposed by the tree runtime. +pub trait Endpoint { + fn path(&self) -> &[String]; + fn receive( + &mut self, + ingress: &Ingress, + frame: FrameBytes, + ) -> Result; +} + +/// Stateful endpoint runtime implementing routing, hooks, and local dispatch. +#[derive(Debug, Default)] +pub struct ProtocolEndpoint { + pub(crate) path: Vec, + pub(crate) parent_path: Option>, + pub(crate) children: Vec, + pub(crate) leaves: BTreeMap, + pub(crate) endpoint_procedures: BTreeSet, + pub(crate) hooks: HookTable, +} diff --git a/src/protocol/tree/endpoint/hooks.rs b/src/protocol/tree/endpoint/hooks.rs new file mode 100644 index 0000000..bf5742c --- /dev/null +++ b/src/protocol/tree/endpoint/hooks.rs @@ -0,0 +1,189 @@ +//! Hook-state transitions and route helpers. +//! +//! These methods implement the hook lifecycle described in `PROTOCOL.md`: +//! pending contexts, active contexts, peer validation, and fault emission. + +use alloc::{string::String, vec}; + +use crate::protocol::{DataMessage, FaultMessage, PacketHeader, PacketType, ProtocolFault, encode_packet}; + +use super::core::{EndpointError, EndpointOutcome, Ingress, LocalEvent, ProtocolEndpoint}; +use super::super::{HookKey, RouteDecision, route_destination}; + +impl ProtocolEndpoint { + /// Emits a protocol fault only when the original call declared a response hook. + pub(crate) fn emit_fault_if_possible( + &mut self, + key: Option, + fault: ProtocolFault, + ) -> Result { + let Some(key) = key else { + return Ok(EndpointOutcome { + dropped: true, + ..EndpointOutcome::default() + }); + }; + + self.hooks.remove_pending(&key); + self.hooks.remove_active(&key); + + let header = PacketHeader { + packet_type: PacketType::Fault, + src_path: self.path.clone(), + dst_path: key.return_path.clone(), + dst_leaf: None, + hook_id: Some(key.hook_id), + }; + let message = FaultMessage { fault }; + let route = self.decide_route(&key.return_path); + + match route { + RouteDecision::Local => Ok(EndpointOutcome { + events: vec![LocalEvent::Fault { header, message }], + ..EndpointOutcome::default() + }), + _ => { + let frame = encode_packet(&header, &message)?; + Ok(EndpointOutcome { + forwards: vec![(route, frame)], + ..EndpointOutcome::default() + }) + } + } + } + + /// Handles locally delivered hook `Data` packets. + pub(crate) fn handle_local_data( + &mut self, + header: PacketHeader, + message: DataMessage, + ) -> Result { + let key = HookKey::new(self.path.clone(), header.hook_id.expect("validated")); + + if self.hooks.active(&key).is_none() { + let matches = self.hooks.pending(&key).is_some_and(|pending| { + pending.caller_src_path == header.src_path + && pending.procedure_id == message.procedure_id + }); + if matches { + self.hooks.activate_pending(&key, header.src_path.clone()); + } + } + + let Some(active) = self.hooks.active(&key) else { + return Ok(EndpointOutcome { + dropped: true, + ..EndpointOutcome::default() + }); + }; + + if active.peer_path != header.src_path { + self.hooks.remove_active(&key); + self.hooks.remove_pending(&key); + return Ok(EndpointOutcome { + events: vec![LocalEvent::Fault { + header: PacketHeader { + packet_type: PacketType::Fault, + src_path: header.src_path, + dst_path: self.path.clone(), + dst_leaf: None, + hook_id: Some(key.hook_id), + }, + message: FaultMessage { + fault: ProtocolFault::INVALID_HOOK_PEER, + }, + }], + ..EndpointOutcome::default() + }); + } + + if active.procedure_id != message.procedure_id { + return Ok(EndpointOutcome { + dropped: true, + ..EndpointOutcome::default() + }); + } + + if message.end_hook { + self.hooks.remove_active(&key); + } + + Ok(EndpointOutcome { + events: vec![LocalEvent::Data { header, message }], + ..EndpointOutcome::default() + }) + } + + /// Handles locally delivered hook `Fault` packets. + pub(crate) fn handle_local_fault( + &mut self, + header: PacketHeader, + message: FaultMessage, + ) -> Result { + let key = HookKey::new(self.path.clone(), header.hook_id.expect("validated")); + let matches = self + .hooks + .active(&key) + .is_some_and(|active| active.peer_path == header.src_path) + || self + .hooks + .pending(&key) + .is_some_and(|pending| pending.caller_src_path == header.src_path); + + if !matches { + return Ok(EndpointOutcome { + dropped: true, + ..EndpointOutcome::default() + }); + } + + self.hooks.remove_active(&key); + self.hooks.remove_pending(&key); + + Ok(EndpointOutcome { + events: vec![LocalEvent::Fault { header, message }], + ..EndpointOutcome::default() + }) + } + + /// Chooses the next hop using the protocol's longest-prefix routing rule. + pub(crate) fn decide_route(&self, dst_path: &[String]) -> RouteDecision { + let child_paths = self + .children + .iter() + .filter(|child| child.state == super::core::ConnectionState::Registered) + .map(|child| &child.path); + + route_destination( + &self.path, + child_paths, + self.parent_path.is_some(), + dst_path, + ) + } + + /// Validates whether a source path is attributable to the ingress side. + /// + /// Rationale: this looks backwards at first because parent ingress accepts + /// non-local source paths. That is required for multi-hop routing, where a + /// parent forwards traffic originating from ancestors or siblings. + pub(crate) fn valid_source_for_ingress( + &self, + ingress: &Ingress, + src_path: &[String], + ) -> bool { + match ingress { + Ingress::Parent => { + if src_path.len() < self.path.len() { + return true; + } + if src_path.len() == self.path.len() { + return src_path == self.path; + } + !src_path.starts_with(&self.path) + } + Ingress::Child(child_path) => src_path.starts_with(child_path), + Ingress::Local => src_path == self.path, + } + } +} diff --git a/src/protocol/tree/endpoint/introspection.rs b/src/protocol/tree/endpoint/introspection.rs new file mode 100644 index 0000000..4bc7e72 --- /dev/null +++ b/src/protocol/tree/endpoint/introspection.rs @@ -0,0 +1,90 @@ +//! Introspection response generation. +//! +//! This code implements the reserved empty-procedure behavior from the +//! introspection sections of `PROTOCOL.md`. + +use alloc::{string::String, vec}; +use rkyv::{rancor::Error as RkyvError, to_bytes}; + +use crate::protocol::{ + DataMessage, EndpointIntrospection, FrameError, LeafIntrospection, LeafIntrospectionSummary, + PacketHeader, PacketType, ProtocolFault, encode_packet, +}; + +use super::core::{EndpointError, EndpointOutcome, ProtocolEndpoint}; +use super::super::HookKey; + +impl ProtocolEndpoint { + /// Handles the reserved introspection procedure. + pub(crate) fn handle_introspection( + &mut self, + header: &PacketHeader, + key: Option, + ) -> Result { + let Some(key) = key else { + return Ok(EndpointOutcome { + dropped: true, + ..EndpointOutcome::default() + }); + }; + + self.hooks.activate_pending(&key, header.src_path.clone()); + + let payload = if let Some(leaf_name) = &header.dst_leaf { + let Some(leaf) = self.leaves.get(leaf_name) else { + return self.emit_fault_if_possible(Some(key), ProtocolFault::UNKNOWN_LEAF); + }; + to_bytes::(&LeafIntrospection { + leaf_name: leaf_name.clone(), + procedures: leaf.procedures.clone(), + }) + .map_err(|error| EndpointError::Frame(FrameError::Serialize(error)))? + .to_vec() + } else { + to_bytes::(&EndpointIntrospection { + leaves: self + .leaves + .values() + .map(|leaf| LeafIntrospectionSummary { + leaf_name: leaf.name.clone(), + procedures: leaf.procedures.clone(), + }) + .collect(), + }) + .map_err(|error| EndpointError::Frame(FrameError::Serialize(error)))? + .to_vec() + }; + + let response_header = PacketHeader { + packet_type: PacketType::Data, + src_path: self.path.clone(), + dst_path: key.return_path.clone(), + dst_leaf: None, + hook_id: Some(key.hook_id), + }; + let response = DataMessage { + procedure_id: String::new(), + data: payload, + end_hook: true, + }; + self.hooks.remove_active(&key); + let route = self.decide_route(&key.return_path); + + match route { + super::super::RouteDecision::Local => Ok(EndpointOutcome { + events: vec![super::core::LocalEvent::Data { + header: response_header, + message: response, + }], + ..EndpointOutcome::default() + }), + _ => { + let frame = encode_packet(&response_header, &response)?; + Ok(EndpointOutcome { + forwards: vec![(route, frame)], + ..EndpointOutcome::default() + }) + } + } + } +} diff --git a/src/protocol/tree/endpoint/mod.rs b/src/protocol/tree/endpoint/mod.rs new file mode 100644 index 0000000..343589a --- /dev/null +++ b/src/protocol/tree/endpoint/mod.rs @@ -0,0 +1,22 @@ +//! Endpoint runtime and traits. +//! +//! This module provides the core logic for a protocol endpoint, including +//! packet ingress, routing decisions, and hook lifecycle management. +//! +//! Protocol section mapping: +//! - `builders`: packet construction and outbound hook declaration +//! - `receive`: framed ingress, authority checks, and route selection +//! - `hooks`: hook lifecycle, peer validation, and fault emission +//! - `introspection`: reserved empty-procedure discovery responses +//! - `core`: externally visible endpoint state and result types + +mod builders; +mod core; +mod hooks; +mod introspection; +mod receive; + +pub use core::{ + ChildRoute, ConnectionState, Endpoint, EndpointError, EndpointOutcome, Ingress, + LeafBehavior, LeafSpec, LocalEvent, ProtocolEndpoint, +}; diff --git a/src/protocol/tree/endpoint/receive.rs b/src/protocol/tree/endpoint/receive.rs new file mode 100644 index 0000000..5ace710 --- /dev/null +++ b/src/protocol/tree/endpoint/receive.rs @@ -0,0 +1,204 @@ +//! Packet ingress and local call dispatch. +//! +//! This file implements the transport-facing packet entry point and maps it to +//! the `Call`, `Data`, and `Fault` sections of `PROTOCOL.md`. + +use alloc::vec; + +use crate::protocol::{ + CallMessage, DataMessage, PacketType, ProtocolFault, decode_frame, + introspection::INTROSPECTION_PROCEDURE_ID, validate_call, validate_header, +}; + +use super::core::{Endpoint, EndpointError, EndpointOutcome, Ingress, LocalEvent, ProtocolEndpoint}; +use super::super::{HookKey, PendingHook, RouteDecision}; + +impl ProtocolEndpoint { + /// Handles a locally delivered `Call` packet after routing selected `Local`. + pub(crate) fn handle_local_call( + &mut self, + header: crate::protocol::PacketHeader, + message: CallMessage, + ) -> Result { + let key = message + .response_hook + .as_ref() + .map(|hook| HookKey::new(hook.return_path.clone(), hook.hook_id)); + + if let Some(hook) = &message.response_hook + && hook.return_path != self.path + && self + .hooks + .insert_pending(PendingHook { + caller_src_path: header.src_path.clone(), + return_path: hook.return_path.clone(), + hook_id: hook.hook_id, + procedure_id: message.procedure_id.clone(), + dst_leaf: header.dst_leaf.clone(), + }) + .is_err() + { + return self.emit_fault_if_possible(key, ProtocolFault::INTERNAL_ERROR); + } + + if message.procedure_id == INTROSPECTION_PROCEDURE_ID { + return self.handle_introspection(&header, key); + } + + let supported = match &header.dst_leaf { + Some(leaf_name) => self + .leaves + .get(leaf_name) + .map(|leaf| leaf.procedures.iter().any(|procedure| procedure == &message.procedure_id)) + .unwrap_or(false), + None => self.endpoint_procedures.contains(&message.procedure_id), + }; + + if !supported { + let fault = if header + .dst_leaf + .as_ref() + .is_some_and(|name| !self.leaves.contains_key(name)) + { + ProtocolFault::UNKNOWN_LEAF + } else { + ProtocolFault::UNKNOWN_PROCEDURE + }; + return self.emit_fault_if_possible(key, fault); + } + + if let Some(key) = &key { + self.hooks.activate_pending(key, header.src_path.clone()); + } + + match header.dst_leaf.as_ref().and_then(|name| self.leaves.get(name)) { + Some(leaf) if leaf.behavior == super::core::LeafBehavior::Echo && key.is_some() => { + let hook = message.response_hook.expect("synchronized"); + let response = DataMessage { + procedure_id: message.procedure_id.clone(), + data: message.data, + end_hook: true, + }; + let response_header = crate::protocol::PacketHeader { + packet_type: PacketType::Data, + src_path: self.path.clone(), + dst_path: hook.return_path.clone(), + dst_leaf: None, + hook_id: Some(hook.hook_id), + }; + let route = self.decide_route(&hook.return_path); + self.hooks + .remove_active(&HookKey::new(hook.return_path.clone(), hook.hook_id)); + + match route { + RouteDecision::Local => Ok(EndpointOutcome { + events: vec![LocalEvent::Data { + header: response_header, + message: response, + }], + ..EndpointOutcome::default() + }), + _ => { + let frame = crate::protocol::encode_packet(&response_header, &response)?; + Ok(EndpointOutcome { + forwards: vec![(route, frame)], + ..EndpointOutcome::default() + }) + } + } + } + _ => Ok(EndpointOutcome { + events: vec![LocalEvent::Call { header, message }], + ..EndpointOutcome::default() + }), + } + } +} + +impl Endpoint for ProtocolEndpoint { + fn path(&self) -> &[alloc::string::String] { + &self.path + } + + fn receive( + &mut self, + ingress: &Ingress, + frame: crate::protocol::FrameBytes, + ) -> Result { + let parsed = decode_frame(&frame)?; + let header = parsed.header(); + validate_header(header)?; + + if !self.valid_source_for_ingress(ingress, &header.src_path) { + return Ok(EndpointOutcome { + dropped: true, + ..EndpointOutcome::default() + }); + } + + match header.packet_type { + PacketType::Call => { + let message = parsed.deserialize_call()?; + if !matches!(ingress, Ingress::Parent | Ingress::Local) { + return Ok(EndpointOutcome { + dropped: true, + ..EndpointOutcome::default() + }); + } + + validate_call(header, &message)?; + match self.decide_route(&header.dst_path) { + RouteDecision::Child(index) => Ok(EndpointOutcome { + forwards: vec![(RouteDecision::Child(index), frame)], + ..EndpointOutcome::default() + }), + RouteDecision::Parent => Ok(EndpointOutcome { + forwards: vec![(RouteDecision::Parent, frame)], + ..EndpointOutcome::default() + }), + RouteDecision::Drop => Ok(EndpointOutcome { + dropped: true, + ..EndpointOutcome::default() + }), + RouteDecision::Local => self.handle_local_call(header.clone(), message), + } + } + PacketType::Data => { + let message = parsed.deserialize_data()?; + match self.decide_route(&header.dst_path) { + RouteDecision::Local => self.handle_local_data(header.clone(), message), + RouteDecision::Child(index) => Ok(EndpointOutcome { + forwards: vec![(RouteDecision::Child(index), frame)], + ..EndpointOutcome::default() + }), + RouteDecision::Parent => Ok(EndpointOutcome { + forwards: vec![(RouteDecision::Parent, frame)], + ..EndpointOutcome::default() + }), + RouteDecision::Drop => Ok(EndpointOutcome { + dropped: true, + ..EndpointOutcome::default() + }), + } + } + PacketType::Fault => { + let message = parsed.deserialize_fault()?; + match self.decide_route(&header.dst_path) { + RouteDecision::Local => self.handle_local_fault(header.clone(), message), + RouteDecision::Child(index) => Ok(EndpointOutcome { + forwards: vec![(RouteDecision::Child(index), frame)], + ..EndpointOutcome::default() + }), + RouteDecision::Parent => Ok(EndpointOutcome { + forwards: vec![(RouteDecision::Parent, frame)], + ..EndpointOutcome::default() + }), + RouteDecision::Drop => Ok(EndpointOutcome { + dropped: true, + ..EndpointOutcome::default() + }), + } + } + } + } +} diff --git a/src/protocol/tree/hook.rs b/src/protocol/tree/hook.rs index 89715f5..aa88e50 100644 --- a/src/protocol/tree/hook.rs +++ b/src/protocol/tree/hook.rs @@ -41,7 +41,10 @@ pub struct ActiveHook { pub peer_finished: bool, } -/// Durable hook state tables. +/// Duplicate hook insertion error. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct HookConflict; + /// Durable hook state tables. #[derive(Debug)] pub struct HookTable { @@ -67,19 +70,19 @@ impl HookTable { id } - pub fn insert_pending(&mut self, pending: PendingHook) -> Result<(), ()> { + pub fn insert_pending(&mut self, pending: PendingHook) -> Result<(), HookConflict> { let key = HookKey::new(pending.return_path.clone(), pending.hook_id); if self.pending.contains_key(&key) || self.active.contains_key(&key) { - return Err(()); + return Err(HookConflict); } self.pending.insert(key, pending); Ok(()) } - pub fn insert_active(&mut self, active: ActiveHook) -> Result<(), ()> { + pub fn insert_active(&mut self, active: ActiveHook) -> Result<(), HookConflict> { let key = HookKey::new(active.return_path.clone(), active.hook_id); if self.pending.contains_key(&key) || self.active.contains_key(&key) { - return Err(()); + return Err(HookConflict); } self.active.insert(key, active); Ok(()) diff --git a/src/protocol/tree/mod.rs b/src/protocol/tree/mod.rs index a436512..f9cbf18 100644 --- a/src/protocol/tree/mod.rs +++ b/src/protocol/tree/mod.rs @@ -5,10 +5,10 @@ mod hook; mod routing; pub use endpoint::{ - ChildRoute, ConnectionState, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafBehavior, - LeafSpec, LocalEvent, ProtocolEndpoint, + ChildRoute, ConnectionState, Endpoint, EndpointError, EndpointOutcome, Ingress, + LeafBehavior, LeafSpec, LocalEvent, ProtocolEndpoint, }; -pub use hook::{ActiveHook, HookKey, HookTable, PendingHook}; +pub use hook::{ActiveHook, HookConflict, HookKey, HookTable, PendingHook}; pub use routing::{ DefaultRouteProvider, LeafNode, RouteDecision, RouteProvider, TreeNode, is_prefix, route_destination, diff --git a/src/protocol/validation.rs b/src/protocol/validation.rs index 0911f82..a59e761 100644 --- a/src/protocol/validation.rs +++ b/src/protocol/validation.rs @@ -98,7 +98,3 @@ pub fn validate_call(header: &PacketHeader, call: &CallMessage) -> Result<(), Va Ok(()) } - -fn is_portable_procedure_char(ch: char) -> bool { - ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_' -}