refactor: split protocol endpoint into focused modules

This commit is contained in:
Michael Mikovsky
2026-04-24 14:25:35 -06:00
parent 71afc49ac9
commit fc8638d014
13 changed files with 818 additions and 666 deletions
-630
View File
@@ -1,630 +0,0 @@
//! Endpoint runtime and traits.
use alloc::{
collections::{BTreeMap, BTreeSet},
string::String,
vec,
vec::Vec,
};
use core::fmt;
use rkyv::{rancor::Error as RkyvError, to_bytes};
use crate::protocol::{
CallMessage, DataMessage, EndpointIntrospection, FaultMessage, FrameBytes, FrameError,
HookTarget, LeafIntrospection, LeafIntrospectionSummary, PacketHeader, PacketType,
ProtocolFault, ValidationError, decode_frame, encode_packet,
introspection::INTROSPECTION_PROCEDURE_ID, validate_call, validate_header,
validate_procedure_id,
};
use super::{ActiveHook, HookKey, HookTable, PendingHook, RouteDecision, route_destination};
/// Local connection state.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
Unregistered,
Registered,
}
/// Registered child route.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChildRoute {
pub path: Vec<String>,
pub state: ConnectionState,
}
impl ChildRoute {
pub fn registered(path: Vec<String>) -> Self {
Self {
path,
state: ConnectionState::Registered,
}
}
}
/// Leaf behavior for test runtime.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LeafBehavior {
Echo,
}
/// Static leaf description.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LeafSpec {
pub name: String,
pub procedures: Vec<String>,
pub behavior: LeafBehavior,
}
/// Arrival side.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Ingress {
Parent,
Child(Vec<String>),
Local,
}
/// Local events.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LocalEvent {
Call {
header: PacketHeader,
message: CallMessage,
},
Data {
header: PacketHeader,
message: DataMessage,
},
Fault {
header: PacketHeader,
message: FaultMessage,
},
}
/// Processing outcome.
#[derive(Debug, Default)]
pub struct EndpointOutcome {
pub forwards: Vec<(RouteDecision, FrameBytes)>,
pub events: Vec<LocalEvent>,
pub dropped: bool,
}
/// Processing error.
#[derive(Debug)]
pub enum EndpointError {
Frame(FrameError),
Validation(ValidationError),
}
impl fmt::Display for EndpointError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Frame(error) => write!(f, "{error}"),
Self::Validation(error) => write!(f, "{error}"),
}
}
}
impl core::error::Error for EndpointError {}
impl From<FrameError> for EndpointError {
fn from(value: FrameError) -> Self {
Self::Frame(value)
}
}
impl From<ValidationError> for EndpointError {
fn from(value: ValidationError) -> Self {
Self::Validation(value)
}
}
/// Core trait for a protocol endpoint.
pub trait Endpoint {
fn path(&self) -> &[String];
fn receive(
&mut self,
ingress: &Ingress,
frame: FrameBytes,
) -> Result<EndpointOutcome, EndpointError>;
}
/// Default endpoint implementation.
#[derive(Debug, Default)]
pub struct ProtocolEndpoint {
path: Vec<String>,
parent_path: Option<Vec<String>>,
children: Vec<ChildRoute>,
leaves: BTreeMap<String, LeafSpec>,
endpoint_procedures: BTreeSet<String>,
hooks: HookTable,
}
impl ProtocolEndpoint {
pub fn new(
path: Vec<String>,
parent_path: Option<Vec<String>>,
children: Vec<ChildRoute>,
leaves: Vec<LeafSpec>,
) -> Self {
Self {
path,
parent_path,
children,
leaves: leaves
.into_iter()
.map(|leaf| (leaf.name.clone(), leaf))
.collect(),
endpoint_procedures: BTreeSet::new(),
hooks: HookTable::default(),
}
}
pub fn add_endpoint_procedure(
&mut self,
procedure_id: impl Into<String>,
) -> Result<(), EndpointError> {
let procedure_id = procedure_id.into();
validate_procedure_id(&procedure_id)?;
self.endpoint_procedures.insert(procedure_id);
Ok(())
}
pub fn allocate_hook_id(&mut self) -> u64 {
self.hooks.allocate_hook_id(&self.path)
}
pub fn make_call(
&mut self,
dst_path: Vec<String>,
dst_leaf: Option<String>,
procedure_id: impl Into<String>,
response_hook_id: Option<u64>,
data: Vec<u8>,
) -> Result<FrameBytes, EndpointError> {
let procedure_id = procedure_id.into();
validate_procedure_id(&procedure_id)?;
let response_hook = response_hook_id.map(|hook_id| HookTarget {
hook_id,
return_path: self.path.clone(),
});
let header = PacketHeader {
packet_type: PacketType::Call,
src_path: self.path.clone(),
dst_path: dst_path.clone(),
dst_leaf: dst_leaf.clone(),
hook_id: None,
};
let call = CallMessage {
procedure_id: procedure_id.clone(),
data,
response_hook,
};
validate_header(&header)?;
validate_call(&header, &call)?;
if let Some(hook) = &call.response_hook {
if self.hooks.insert_active(ActiveHook {
return_path: hook.return_path.clone(),
hook_id: hook.hook_id,
peer_path: dst_path,
procedure_id,
dst_leaf,
peer_finished: false,
}).is_err() {
return Err(EndpointError::Validation(ValidationError::InvalidHookId));
}
}
Ok(encode_packet(&header, &call)?)
}
pub fn make_data(
&self,
dst_path: Vec<String>,
hook_id: u64,
procedure_id: impl Into<String>,
data: Vec<u8>,
end_hook: bool,
) -> Result<FrameBytes, EndpointError> {
let procedure_id = procedure_id.into();
validate_procedure_id(&procedure_id)?;
let header = PacketHeader {
packet_type: PacketType::Data,
src_path: self.path.clone(),
dst_path,
dst_leaf: None,
hook_id: Some(hook_id),
};
let message = DataMessage {
procedure_id,
data,
end_hook,
};
validate_header(&header)?;
Ok(encode_packet(&header, &message)?)
}
fn handle_local_call(
&mut self,
header: PacketHeader,
message: CallMessage,
) -> Result<EndpointOutcome, EndpointError> {
let key = message
.response_hook
.as_ref()
.map(|hook| HookKey::new(hook.return_path.clone(), hook.hook_id));
if let Some(hook) = &message.response_hook {
if self.hooks.insert_pending(PendingHook {
caller_src_path: header.src_path.clone(),
return_path: hook.return_path.clone(),
hook_id: hook.hook_id,
procedure_id: message.procedure_id.clone(),
dst_leaf: header.dst_leaf.clone(),
}).is_err() {
return self.emit_fault_if_possible(key, ProtocolFault::INTERNAL_ERROR);
}
}
if message.procedure_id == INTROSPECTION_PROCEDURE_ID {
return self.handle_introspection(&header, key);
}
let supported = match &header.dst_leaf {
Some(leaf_name) => self
.leaves
.get(leaf_name)
.map(|leaf| leaf.procedures.iter().any(|p| p == &message.procedure_id))
.unwrap_or(false),
None => self.endpoint_procedures.contains(&message.procedure_id),
};
if !supported {
let fault = if header
.dst_leaf
.as_ref()
.is_some_and(|name| !self.leaves.contains_key(name))
{
ProtocolFault::UNKNOWN_LEAF
} else {
ProtocolFault::UNKNOWN_PROCEDURE
};
return self.emit_fault_if_possible(key, fault);
}
if let Some(key) = &key {
self.hooks.activate_pending(key, header.src_path.clone());
}
match header
.dst_leaf
.as_ref()
.and_then(|name| self.leaves.get(name))
{
Some(leaf) if leaf.behavior == LeafBehavior::Echo && key.is_some() => {
let hook = message.response_hook.expect("synchronized");
let response = DataMessage {
procedure_id: message.procedure_id.clone(),
data: message.data,
end_hook: true,
};
let response_header = PacketHeader {
packet_type: PacketType::Data,
src_path: self.path.clone(),
dst_path: hook.return_path.clone(),
dst_leaf: None,
hook_id: Some(hook.hook_id),
};
let frame = encode_packet(&response_header, &response)?;
let route = self.decide_route(&hook.return_path);
self.hooks
.remove_active(&HookKey::new(hook.return_path, hook.hook_id));
Ok(EndpointOutcome {
forwards: vec![(route, frame)],
..EndpointOutcome::default()
})
}
_ => Ok(EndpointOutcome {
events: vec![LocalEvent::Call { header, message }],
..EndpointOutcome::default()
}),
}
}
fn handle_introspection(
&mut self,
header: &PacketHeader,
key: Option<HookKey>,
) -> Result<EndpointOutcome, EndpointError> {
let Some(key) = key else {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
};
self.hooks.activate_pending(&key, header.src_path.clone());
let payload = if let Some(leaf_name) = &header.dst_leaf {
let Some(leaf) = self.leaves.get(leaf_name) else {
return self.emit_fault_if_possible(Some(key), ProtocolFault::UNKNOWN_LEAF);
};
to_bytes::<RkyvError>(&LeafIntrospection {
leaf_name: leaf_name.clone(),
procedures: leaf.procedures.clone(),
})
.map_err(|e| EndpointError::Frame(FrameError::Serialize(e)))?
.to_vec()
} else {
to_bytes::<RkyvError>(&EndpointIntrospection {
leaves: self
.leaves
.values()
.map(|leaf| LeafIntrospectionSummary {
leaf_name: leaf.name.clone(),
procedures: leaf.procedures.clone(),
})
.collect(),
})
.map_err(|e| EndpointError::Frame(FrameError::Serialize(e)))?
.to_vec()
};;
let response_header = PacketHeader {
packet_type: PacketType::Data,
src_path: self.path.clone(),
dst_path: key.return_path.clone(),
dst_leaf: None,
hook_id: Some(key.hook_id),
};
let route = self.decide_route(&key.return_path);
let response = DataMessage {
procedure_id: String::new(),
data: payload,
end_hook: true,
};
let frame = encode_packet(&response_header, &response)?;
self.hooks.remove_active(&key);
Ok(EndpointOutcome {
forwards: vec![(route, frame)],
..EndpointOutcome::default()
})
}
fn handle_local_data(
&mut self,
header: PacketHeader,
message: DataMessage,
) -> Result<EndpointOutcome, EndpointError> {
let key = HookKey::new(self.path.clone(), header.hook_id.expect("validated"));
if self.hooks.active(&key).is_none() {
let matches = self.hooks.pending(&key).is_some_and(|p| {
p.caller_src_path == header.src_path && p.procedure_id == message.procedure_id
});
if matches {
self.hooks.activate_pending(&key, header.src_path.clone());
}
}
let Some(active) = self.hooks.active(&key).cloned() else {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
};
if active.peer_path != header.src_path {
self.hooks.remove_active(&key);
self.hooks.remove_pending(&key);
return Ok(EndpointOutcome {
events: vec![LocalEvent::Fault {
header: PacketHeader {
packet_type: PacketType::Fault,
src_path: header.src_path,
dst_path: self.path.clone(),
dst_leaf: None,
hook_id: Some(key.hook_id),
},
message: FaultMessage {
fault: ProtocolFault::INVALID_HOOK_PEER,
},
}],
..EndpointOutcome::default()
});
}
if active.procedure_id != message.procedure_id {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
}
if message.end_hook {
self.hooks.remove_active(&key);
}
Ok(EndpointOutcome {
events: vec![LocalEvent::Data { header, message }],
..EndpointOutcome::default()
})
}
fn handle_local_fault(
&mut self,
header: PacketHeader,
message: FaultMessage,
) -> Result<EndpointOutcome, EndpointError> {
let key = HookKey::new(self.path.clone(), header.hook_id.expect("validated"));
let matches = self
.hooks
.active(&key)
.is_some_and(|a| a.peer_path == header.src_path)
|| self
.hooks
.pending(&key)
.is_some_and(|p| p.caller_src_path == header.src_path);
if !matches {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
}
self.hooks.remove_active(&key);
self.hooks.remove_pending(&key);
Ok(EndpointOutcome {
events: vec![LocalEvent::Fault { header, message }],
..EndpointOutcome::default()
})
}
fn emit_fault_if_possible(
&mut self,
key: Option<HookKey>,
fault: ProtocolFault,
) -> Result<EndpointOutcome, EndpointError> {
let Some(key) = key else {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
};
self.hooks.remove_pending(&key);
self.hooks.remove_active(&key);
let route = self.decide_route(&key.return_path);
let header = PacketHeader {
packet_type: PacketType::Fault,
src_path: self.path.clone(),
dst_path: key.return_path.clone(),
dst_leaf: None,
hook_id: Some(key.hook_id),
};
let frame = encode_packet(&header, &FaultMessage { fault })?;
Ok(EndpointOutcome {
forwards: vec![(route, frame)],
..EndpointOutcome::default()
})
}
fn decide_route(&self, dst_path: &[String]) -> RouteDecision {
let child_paths = self
.children
.iter()
.filter(|c| c.state == ConnectionState::Registered)
.map(|c| &c.path);
route_destination(
&self.path,
child_paths,
self.parent_path.is_some(),
dst_path,
)
}
fn valid_source_for_ingress(&self, ingress: &Ingress, src_path: &[String]) -> bool {
match ingress {
Ingress::Parent => {
// Valid if src_path is an ancestor, sibling, or the current node itself.
// Invalid if it's a descendant of the current node.
if src_path.len() < self.path.len() {
return true; // Ancestor or sibling in a different branch
}
if src_path.len() == self.path.len() {
return src_path == self.path; // Current node
}
// Check if it's a descendant
!src_path.starts_with(&self.path)
}
Ingress::Child(child_path) => {
// Valid if src_path is the child itself or any descendant of the child.
src_path.starts_with(child_path)
}
Ingress::Local => src_path == self.path,
}
}
}
impl Endpoint for ProtocolEndpoint {
fn path(&self) -> &[String] {
&self.path
}
fn receive(
&mut self,
ingress: &Ingress,
frame: FrameBytes,
) -> Result<EndpointOutcome, EndpointError> {
let parsed = decode_frame(&frame)?;
let header = parsed.header();
validate_header(header)?;
if !self.valid_source_for_ingress(ingress, &header.src_path) {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
}
match header.packet_type {
PacketType::Call => {
let message = parsed.deserialize_call()?;
if !matches!(ingress, Ingress::Parent | Ingress::Local) {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
}
validate_call(header, &message)?;
match self.decide_route(&header.dst_path) {
RouteDecision::Child(idx) => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Child(idx), frame)],
..EndpointOutcome::default()
}),
RouteDecision::Parent => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Parent, frame)],
..EndpointOutcome::default()
}),
RouteDecision::Drop => Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
}),
RouteDecision::Local => self.handle_local_call(header.clone(), message),
}
}
PacketType::Data => {
let message = parsed.deserialize_data()?;
match self.decide_route(&header.dst_path) {
RouteDecision::Local => self.handle_local_data(header.clone(), message),
RouteDecision::Child(idx) => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Child(idx), frame)],
..EndpointOutcome::default()
}),
RouteDecision::Parent => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Parent, frame)],
..EndpointOutcome::default()
}),
RouteDecision::Drop => Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
}),
}
}
PacketType::Fault => {
let message = parsed.deserialize_fault()?;
match self.decide_route(&header.dst_path) {
RouteDecision::Local => self.handle_local_fault(header.clone(), message),
RouteDecision::Child(idx) => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Child(idx), frame)],
..EndpointOutcome::default()
}),
RouteDecision::Parent => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Parent, frame)],
..EndpointOutcome::default()
}),
RouteDecision::Drop => Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
}),
}
}
}
}
}
+140
View File
@@ -0,0 +1,140 @@
//! Packet builders and basic endpoint configuration.
//!
//! These helpers map to `PROTOCOL.md` sections covering packet construction,
//! call headers, and hook declaration fields.
use alloc::{collections::BTreeSet, string::String, vec::Vec};
use crate::protocol::{
validate_call, validate_header, validate_procedure_id, CallMessage, DataMessage, FrameBytes,
HookTarget, PacketHeader, PacketType, ValidationError, encode_packet,
};
use crate::protocol::tree::ActiveHook;
use super::core::{ChildRoute, EndpointError, ProtocolEndpoint};
use crate::protocol::tree::LeafSpec;
impl ProtocolEndpoint {
/// Creates a runtime endpoint with static tree topology and leaf metadata.
///
/// ```
/// use unshell::protocol::tree::{Endpoint, ProtocolEndpoint};
///
/// let endpoint = ProtocolEndpoint::new(Vec::new(), None, Vec::new(), Vec::new());
/// assert!(endpoint.path().is_empty());
/// ```
pub fn new(
path: Vec<String>,
parent_path: Option<Vec<String>>,
children: Vec<ChildRoute>,
leaves: Vec<LeafSpec>,
) -> Self {
Self {
path,
parent_path,
children,
leaves: leaves
.into_iter()
.map(|leaf| (leaf.name.clone(), leaf))
.collect(),
endpoint_procedures: BTreeSet::new(),
hooks: Default::default(),
}
}
/// Registers an endpoint-local procedure identifier.
pub fn add_endpoint_procedure(
&mut self,
procedure_id: impl Into<String>,
) -> Result<(), EndpointError> {
let procedure_id = procedure_id.into();
validate_procedure_id(&procedure_id)?;
self.endpoint_procedures.insert(procedure_id);
Ok(())
}
/// Allocates a locally unique hook id.
pub fn allocate_hook_id(&mut self) -> u64 {
self.hooks.allocate_hook_id(&self.path)
}
/// Builds an outbound `Call` packet and pre-registers active hook state when requested.
pub fn make_call(
&mut self,
dst_path: Vec<String>,
dst_leaf: Option<String>,
procedure_id: impl Into<String>,
response_hook_id: Option<u64>,
data: Vec<u8>,
) -> Result<FrameBytes, EndpointError> {
let procedure_id = procedure_id.into();
validate_procedure_id(&procedure_id)?;
let response_hook = response_hook_id.map(|hook_id| HookTarget {
hook_id,
return_path: self.path.clone(),
});
let header = PacketHeader {
packet_type: PacketType::Call,
src_path: self.path.clone(),
dst_path: dst_path.clone(),
dst_leaf: dst_leaf.clone(),
hook_id: None,
};
let call = CallMessage {
procedure_id: procedure_id.clone(),
data,
response_hook,
};
validate_header(&header)?;
validate_call(&header, &call)?;
if let Some(hook) = &call.response_hook
&& self
.hooks
.insert_active(ActiveHook {
return_path: hook.return_path.clone(),
hook_id: hook.hook_id,
peer_path: dst_path,
procedure_id,
dst_leaf,
peer_finished: false,
})
.is_err()
{
return Err(EndpointError::Validation(ValidationError::InvalidHookId));
}
Ok(encode_packet(&header, &call)?)
}
/// Builds an outbound `Data` packet for an existing hook.
pub fn make_data(
&self,
dst_path: Vec<String>,
hook_id: u64,
procedure_id: impl Into<String>,
data: Vec<u8>,
end_hook: bool,
) -> Result<FrameBytes, EndpointError> {
let procedure_id = procedure_id.into();
validate_procedure_id(&procedure_id)?;
let header = PacketHeader {
packet_type: PacketType::Data,
src_path: self.path.clone(),
dst_path,
dst_leaf: None,
hook_id: Some(hook_id),
};
let message = DataMessage {
procedure_id,
data,
end_hook,
};
validate_header(&header)?;
Ok(encode_packet(&header, &message)?)
}
}
+143
View File
@@ -0,0 +1,143 @@
//! Core endpoint state and externally visible types.
//!
//! This file maps to the protocol concepts described in `PROTOCOL.md`:
//! - Packet processing entry points and local delivery state: "Packet Types"
//! - Child registration state used during route selection: "Routing"
//! - Hook-hosting endpoint state: "Hooks"
use alloc::{
collections::{BTreeMap, BTreeSet},
string::String,
vec::Vec,
};
use core::fmt;
use crate::protocol::{CallMessage, DataMessage, FaultMessage, FrameBytes, FrameError, PacketHeader,
ValidationError};
use super::super::{HookTable, RouteDecision};
/// Local connection state used for child route eligibility.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
Unregistered,
Registered,
}
/// Child path plus current registration state.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChildRoute {
pub path: Vec<String>,
pub state: ConnectionState,
}
impl ChildRoute {
/// Convenience constructor for the common registered-child case.
pub fn registered(path: Vec<String>) -> Self {
Self {
path,
state: ConnectionState::Registered,
}
}
}
/// Test leaf behavior implemented by the endpoint runtime.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LeafBehavior {
Echo,
}
/// Static leaf metadata used for procedure dispatch and introspection.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LeafSpec {
pub name: String,
pub procedures: Vec<String>,
pub behavior: LeafBehavior,
}
/// Where a frame entered the local endpoint.
///
/// This corresponds to the authority and ingress checks described in the
/// `PROTOCOL.md` routing and call sections.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Ingress {
Parent,
Child(Vec<String>),
Local,
}
/// Locally delivered protocol events.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LocalEvent {
Call {
header: PacketHeader,
message: CallMessage,
},
Data {
header: PacketHeader,
message: DataMessage,
},
Fault {
header: PacketHeader,
message: FaultMessage,
},
}
/// Result of processing one framed packet.
#[derive(Debug, Default)]
pub struct EndpointOutcome {
pub forwards: Vec<(RouteDecision, FrameBytes)>,
pub events: Vec<LocalEvent>,
pub dropped: bool,
}
/// Errors returned while decoding or validating a packet.
#[derive(Debug)]
pub enum EndpointError {
Frame(FrameError),
Validation(ValidationError),
}
impl fmt::Display for EndpointError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Frame(error) => write!(f, "{error}"),
Self::Validation(error) => write!(f, "{error}"),
}
}
}
impl core::error::Error for EndpointError {}
impl From<FrameError> for EndpointError {
fn from(value: FrameError) -> Self {
Self::Frame(value)
}
}
impl From<ValidationError> for EndpointError {
fn from(value: ValidationError) -> Self {
Self::Validation(value)
}
}
/// Public packet-processing trait exposed by the tree runtime.
pub trait Endpoint {
fn path(&self) -> &[String];
fn receive(
&mut self,
ingress: &Ingress,
frame: FrameBytes,
) -> Result<EndpointOutcome, EndpointError>;
}
/// Stateful endpoint runtime implementing routing, hooks, and local dispatch.
#[derive(Debug, Default)]
pub struct ProtocolEndpoint {
pub(crate) path: Vec<String>,
pub(crate) parent_path: Option<Vec<String>>,
pub(crate) children: Vec<ChildRoute>,
pub(crate) leaves: BTreeMap<String, LeafSpec>,
pub(crate) endpoint_procedures: BTreeSet<String>,
pub(crate) hooks: HookTable,
}
+189
View File
@@ -0,0 +1,189 @@
//! Hook-state transitions and route helpers.
//!
//! These methods implement the hook lifecycle described in `PROTOCOL.md`:
//! pending contexts, active contexts, peer validation, and fault emission.
use alloc::{string::String, vec};
use crate::protocol::{DataMessage, FaultMessage, PacketHeader, PacketType, ProtocolFault, encode_packet};
use super::core::{EndpointError, EndpointOutcome, Ingress, LocalEvent, ProtocolEndpoint};
use super::super::{HookKey, RouteDecision, route_destination};
impl ProtocolEndpoint {
/// Emits a protocol fault only when the original call declared a response hook.
pub(crate) fn emit_fault_if_possible(
&mut self,
key: Option<HookKey>,
fault: ProtocolFault,
) -> Result<EndpointOutcome, EndpointError> {
let Some(key) = key else {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
};
self.hooks.remove_pending(&key);
self.hooks.remove_active(&key);
let header = PacketHeader {
packet_type: PacketType::Fault,
src_path: self.path.clone(),
dst_path: key.return_path.clone(),
dst_leaf: None,
hook_id: Some(key.hook_id),
};
let message = FaultMessage { fault };
let route = self.decide_route(&key.return_path);
match route {
RouteDecision::Local => Ok(EndpointOutcome {
events: vec![LocalEvent::Fault { header, message }],
..EndpointOutcome::default()
}),
_ => {
let frame = encode_packet(&header, &message)?;
Ok(EndpointOutcome {
forwards: vec![(route, frame)],
..EndpointOutcome::default()
})
}
}
}
/// Handles locally delivered hook `Data` packets.
pub(crate) fn handle_local_data(
&mut self,
header: PacketHeader,
message: DataMessage,
) -> Result<EndpointOutcome, EndpointError> {
let key = HookKey::new(self.path.clone(), header.hook_id.expect("validated"));
if self.hooks.active(&key).is_none() {
let matches = self.hooks.pending(&key).is_some_and(|pending| {
pending.caller_src_path == header.src_path
&& pending.procedure_id == message.procedure_id
});
if matches {
self.hooks.activate_pending(&key, header.src_path.clone());
}
}
let Some(active) = self.hooks.active(&key) else {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
};
if active.peer_path != header.src_path {
self.hooks.remove_active(&key);
self.hooks.remove_pending(&key);
return Ok(EndpointOutcome {
events: vec![LocalEvent::Fault {
header: PacketHeader {
packet_type: PacketType::Fault,
src_path: header.src_path,
dst_path: self.path.clone(),
dst_leaf: None,
hook_id: Some(key.hook_id),
},
message: FaultMessage {
fault: ProtocolFault::INVALID_HOOK_PEER,
},
}],
..EndpointOutcome::default()
});
}
if active.procedure_id != message.procedure_id {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
}
if message.end_hook {
self.hooks.remove_active(&key);
}
Ok(EndpointOutcome {
events: vec![LocalEvent::Data { header, message }],
..EndpointOutcome::default()
})
}
/// Handles locally delivered hook `Fault` packets.
pub(crate) fn handle_local_fault(
&mut self,
header: PacketHeader,
message: FaultMessage,
) -> Result<EndpointOutcome, EndpointError> {
let key = HookKey::new(self.path.clone(), header.hook_id.expect("validated"));
let matches = self
.hooks
.active(&key)
.is_some_and(|active| active.peer_path == header.src_path)
|| self
.hooks
.pending(&key)
.is_some_and(|pending| pending.caller_src_path == header.src_path);
if !matches {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
}
self.hooks.remove_active(&key);
self.hooks.remove_pending(&key);
Ok(EndpointOutcome {
events: vec![LocalEvent::Fault { header, message }],
..EndpointOutcome::default()
})
}
/// Chooses the next hop using the protocol's longest-prefix routing rule.
pub(crate) fn decide_route(&self, dst_path: &[String]) -> RouteDecision {
let child_paths = self
.children
.iter()
.filter(|child| child.state == super::core::ConnectionState::Registered)
.map(|child| &child.path);
route_destination(
&self.path,
child_paths,
self.parent_path.is_some(),
dst_path,
)
}
/// Validates whether a source path is attributable to the ingress side.
///
/// Rationale: this looks backwards at first because parent ingress accepts
/// non-local source paths. That is required for multi-hop routing, where a
/// parent forwards traffic originating from ancestors or siblings.
pub(crate) fn valid_source_for_ingress(
&self,
ingress: &Ingress,
src_path: &[String],
) -> bool {
match ingress {
Ingress::Parent => {
if src_path.len() < self.path.len() {
return true;
}
if src_path.len() == self.path.len() {
return src_path == self.path;
}
!src_path.starts_with(&self.path)
}
Ingress::Child(child_path) => src_path.starts_with(child_path),
Ingress::Local => src_path == self.path,
}
}
}
@@ -0,0 +1,90 @@
//! Introspection response generation.
//!
//! This code implements the reserved empty-procedure behavior from the
//! introspection sections of `PROTOCOL.md`.
use alloc::{string::String, vec};
use rkyv::{rancor::Error as RkyvError, to_bytes};
use crate::protocol::{
DataMessage, EndpointIntrospection, FrameError, LeafIntrospection, LeafIntrospectionSummary,
PacketHeader, PacketType, ProtocolFault, encode_packet,
};
use super::core::{EndpointError, EndpointOutcome, ProtocolEndpoint};
use super::super::HookKey;
impl ProtocolEndpoint {
/// Handles the reserved introspection procedure.
pub(crate) fn handle_introspection(
&mut self,
header: &PacketHeader,
key: Option<HookKey>,
) -> Result<EndpointOutcome, EndpointError> {
let Some(key) = key else {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
};
self.hooks.activate_pending(&key, header.src_path.clone());
let payload = if let Some(leaf_name) = &header.dst_leaf {
let Some(leaf) = self.leaves.get(leaf_name) else {
return self.emit_fault_if_possible(Some(key), ProtocolFault::UNKNOWN_LEAF);
};
to_bytes::<RkyvError>(&LeafIntrospection {
leaf_name: leaf_name.clone(),
procedures: leaf.procedures.clone(),
})
.map_err(|error| EndpointError::Frame(FrameError::Serialize(error)))?
.to_vec()
} else {
to_bytes::<RkyvError>(&EndpointIntrospection {
leaves: self
.leaves
.values()
.map(|leaf| LeafIntrospectionSummary {
leaf_name: leaf.name.clone(),
procedures: leaf.procedures.clone(),
})
.collect(),
})
.map_err(|error| EndpointError::Frame(FrameError::Serialize(error)))?
.to_vec()
};
let response_header = PacketHeader {
packet_type: PacketType::Data,
src_path: self.path.clone(),
dst_path: key.return_path.clone(),
dst_leaf: None,
hook_id: Some(key.hook_id),
};
let response = DataMessage {
procedure_id: String::new(),
data: payload,
end_hook: true,
};
self.hooks.remove_active(&key);
let route = self.decide_route(&key.return_path);
match route {
super::super::RouteDecision::Local => Ok(EndpointOutcome {
events: vec![super::core::LocalEvent::Data {
header: response_header,
message: response,
}],
..EndpointOutcome::default()
}),
_ => {
let frame = encode_packet(&response_header, &response)?;
Ok(EndpointOutcome {
forwards: vec![(route, frame)],
..EndpointOutcome::default()
})
}
}
}
}
+22
View File
@@ -0,0 +1,22 @@
//! Endpoint runtime and traits.
//!
//! This module provides the core logic for a protocol endpoint, including
//! packet ingress, routing decisions, and hook lifecycle management.
//!
//! Protocol section mapping:
//! - `builders`: packet construction and outbound hook declaration
//! - `receive`: framed ingress, authority checks, and route selection
//! - `hooks`: hook lifecycle, peer validation, and fault emission
//! - `introspection`: reserved empty-procedure discovery responses
//! - `core`: externally visible endpoint state and result types
mod builders;
mod core;
mod hooks;
mod introspection;
mod receive;
pub use core::{
ChildRoute, ConnectionState, Endpoint, EndpointError, EndpointOutcome, Ingress,
LeafBehavior, LeafSpec, LocalEvent, ProtocolEndpoint,
};
+204
View File
@@ -0,0 +1,204 @@
//! Packet ingress and local call dispatch.
//!
//! This file implements the transport-facing packet entry point and maps it to
//! the `Call`, `Data`, and `Fault` sections of `PROTOCOL.md`.
use alloc::vec;
use crate::protocol::{
CallMessage, DataMessage, PacketType, ProtocolFault, decode_frame,
introspection::INTROSPECTION_PROCEDURE_ID, validate_call, validate_header,
};
use super::core::{Endpoint, EndpointError, EndpointOutcome, Ingress, LocalEvent, ProtocolEndpoint};
use super::super::{HookKey, PendingHook, RouteDecision};
impl ProtocolEndpoint {
/// Handles a locally delivered `Call` packet after routing selected `Local`.
pub(crate) fn handle_local_call(
&mut self,
header: crate::protocol::PacketHeader,
message: CallMessage,
) -> Result<EndpointOutcome, EndpointError> {
let key = message
.response_hook
.as_ref()
.map(|hook| HookKey::new(hook.return_path.clone(), hook.hook_id));
if let Some(hook) = &message.response_hook
&& hook.return_path != self.path
&& self
.hooks
.insert_pending(PendingHook {
caller_src_path: header.src_path.clone(),
return_path: hook.return_path.clone(),
hook_id: hook.hook_id,
procedure_id: message.procedure_id.clone(),
dst_leaf: header.dst_leaf.clone(),
})
.is_err()
{
return self.emit_fault_if_possible(key, ProtocolFault::INTERNAL_ERROR);
}
if message.procedure_id == INTROSPECTION_PROCEDURE_ID {
return self.handle_introspection(&header, key);
}
let supported = match &header.dst_leaf {
Some(leaf_name) => self
.leaves
.get(leaf_name)
.map(|leaf| leaf.procedures.iter().any(|procedure| procedure == &message.procedure_id))
.unwrap_or(false),
None => self.endpoint_procedures.contains(&message.procedure_id),
};
if !supported {
let fault = if header
.dst_leaf
.as_ref()
.is_some_and(|name| !self.leaves.contains_key(name))
{
ProtocolFault::UNKNOWN_LEAF
} else {
ProtocolFault::UNKNOWN_PROCEDURE
};
return self.emit_fault_if_possible(key, fault);
}
if let Some(key) = &key {
self.hooks.activate_pending(key, header.src_path.clone());
}
match header.dst_leaf.as_ref().and_then(|name| self.leaves.get(name)) {
Some(leaf) if leaf.behavior == super::core::LeafBehavior::Echo && key.is_some() => {
let hook = message.response_hook.expect("synchronized");
let response = DataMessage {
procedure_id: message.procedure_id.clone(),
data: message.data,
end_hook: true,
};
let response_header = crate::protocol::PacketHeader {
packet_type: PacketType::Data,
src_path: self.path.clone(),
dst_path: hook.return_path.clone(),
dst_leaf: None,
hook_id: Some(hook.hook_id),
};
let route = self.decide_route(&hook.return_path);
self.hooks
.remove_active(&HookKey::new(hook.return_path.clone(), hook.hook_id));
match route {
RouteDecision::Local => Ok(EndpointOutcome {
events: vec![LocalEvent::Data {
header: response_header,
message: response,
}],
..EndpointOutcome::default()
}),
_ => {
let frame = crate::protocol::encode_packet(&response_header, &response)?;
Ok(EndpointOutcome {
forwards: vec![(route, frame)],
..EndpointOutcome::default()
})
}
}
}
_ => Ok(EndpointOutcome {
events: vec![LocalEvent::Call { header, message }],
..EndpointOutcome::default()
}),
}
}
}
impl Endpoint for ProtocolEndpoint {
fn path(&self) -> &[alloc::string::String] {
&self.path
}
fn receive(
&mut self,
ingress: &Ingress,
frame: crate::protocol::FrameBytes,
) -> Result<EndpointOutcome, EndpointError> {
let parsed = decode_frame(&frame)?;
let header = parsed.header();
validate_header(header)?;
if !self.valid_source_for_ingress(ingress, &header.src_path) {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
}
match header.packet_type {
PacketType::Call => {
let message = parsed.deserialize_call()?;
if !matches!(ingress, Ingress::Parent | Ingress::Local) {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
}
validate_call(header, &message)?;
match self.decide_route(&header.dst_path) {
RouteDecision::Child(index) => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Child(index), frame)],
..EndpointOutcome::default()
}),
RouteDecision::Parent => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Parent, frame)],
..EndpointOutcome::default()
}),
RouteDecision::Drop => Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
}),
RouteDecision::Local => self.handle_local_call(header.clone(), message),
}
}
PacketType::Data => {
let message = parsed.deserialize_data()?;
match self.decide_route(&header.dst_path) {
RouteDecision::Local => self.handle_local_data(header.clone(), message),
RouteDecision::Child(index) => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Child(index), frame)],
..EndpointOutcome::default()
}),
RouteDecision::Parent => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Parent, frame)],
..EndpointOutcome::default()
}),
RouteDecision::Drop => Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
}),
}
}
PacketType::Fault => {
let message = parsed.deserialize_fault()?;
match self.decide_route(&header.dst_path) {
RouteDecision::Local => self.handle_local_fault(header.clone(), message),
RouteDecision::Child(index) => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Child(index), frame)],
..EndpointOutcome::default()
}),
RouteDecision::Parent => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Parent, frame)],
..EndpointOutcome::default()
}),
RouteDecision::Drop => Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
}),
}
}
}
}
}
+8 -5
View File
@@ -41,7 +41,10 @@ pub struct ActiveHook {
pub peer_finished: bool,
}
/// Durable hook state tables.
/// Duplicate hook insertion error.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct HookConflict;
/// Durable hook state tables.
#[derive(Debug)]
pub struct HookTable {
@@ -67,19 +70,19 @@ impl HookTable {
id
}
pub fn insert_pending(&mut self, pending: PendingHook) -> Result<(), ()> {
pub fn insert_pending(&mut self, pending: PendingHook) -> Result<(), HookConflict> {
let key = HookKey::new(pending.return_path.clone(), pending.hook_id);
if self.pending.contains_key(&key) || self.active.contains_key(&key) {
return Err(());
return Err(HookConflict);
}
self.pending.insert(key, pending);
Ok(())
}
pub fn insert_active(&mut self, active: ActiveHook) -> Result<(), ()> {
pub fn insert_active(&mut self, active: ActiveHook) -> Result<(), HookConflict> {
let key = HookKey::new(active.return_path.clone(), active.hook_id);
if self.pending.contains_key(&key) || self.active.contains_key(&key) {
return Err(());
return Err(HookConflict);
}
self.active.insert(key, active);
Ok(())
+3 -3
View File
@@ -5,10 +5,10 @@ mod hook;
mod routing;
pub use endpoint::{
ChildRoute, ConnectionState, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafBehavior,
LeafSpec, LocalEvent, ProtocolEndpoint,
ChildRoute, ConnectionState, Endpoint, EndpointError, EndpointOutcome, Ingress,
LeafBehavior, LeafSpec, LocalEvent, ProtocolEndpoint,
};
pub use hook::{ActiveHook, HookKey, HookTable, PendingHook};
pub use hook::{ActiveHook, HookConflict, HookKey, HookTable, PendingHook};
pub use routing::{
DefaultRouteProvider, LeafNode, RouteDecision, RouteProvider, TreeNode, is_prefix,
route_destination,