Improve protocol documentation and runtime structure

This commit is contained in:
Michael Mikovsky
2026-04-26 01:53:37 -06:00
parent 01faebc44b
commit 17be0f9daa
21 changed files with 676 additions and 353 deletions
+44 -3
View File
@@ -18,10 +18,15 @@ pub type FrameBytes = AlignedVec<SECTION_ALIGN>;
/// Framing or archive failure.
#[derive(Debug)]
pub enum FrameError {
/// The byte slice ended before a full frame could be decoded.
Truncated,
/// The archived header bytes failed validation or deserialization.
InvalidHeader(Error),
/// The archived payload bytes failed validation or deserialization.
InvalidPayload(Error),
/// Serializing one header or payload section failed.
Serialize(Error),
/// One archived section grew beyond the `u32` length prefix supported by the format.
LengthOverflow,
}
@@ -40,6 +45,9 @@ impl fmt::Display for FrameError {
impl core::error::Error for FrameError {}
/// Parsed frame with one owned header and a borrowed payload section.
///
/// The frame decoder eagerly materializes the routing header into owned Rust values, but keeps
/// the payload section borrowed so callers can choose which concrete payload type to decode.
pub struct ParsedFrame<'a> {
header: PacketHeader,
payload_bytes: &'a [u8],
@@ -47,39 +55,60 @@ pub struct ParsedFrame<'a> {
impl<'a> ParsedFrame<'a> {
#[must_use]
/// Returns the decoded packet header.
pub fn header(&self) -> &PacketHeader {
&self.header
}
#[must_use]
/// Returns the packet class from the decoded header.
pub fn packet_type(&self) -> PacketType {
self.header.packet_type
}
#[must_use]
/// Returns the borrowed payload section bytes.
pub fn payload_bytes(&self) -> &'a [u8] {
self.payload_bytes
}
#[must_use]
/// Splits the parsed frame into its owned header and borrowed payload bytes.
pub fn into_parts(self) -> (PacketHeader, &'a [u8]) {
(self.header, self.payload_bytes)
}
/// Deserializes the payload section as a [`CallMessage`].
pub fn deserialize_call(&self) -> Result<CallMessage, FrameError> {
deserialize_archived_bytes::<ArchivedCallMessage, CallMessage>(self.payload_bytes)
self.deserialize_payload::<ArchivedCallMessage, CallMessage>()
}
/// Deserializes the payload section as a [`DataMessage`].
pub fn deserialize_data(&self) -> Result<DataMessage, FrameError> {
deserialize_archived_bytes::<ArchivedDataMessage, DataMessage>(self.payload_bytes)
self.deserialize_payload::<ArchivedDataMessage, DataMessage>()
}
/// Deserializes the payload section as a [`FaultMessage`].
pub fn deserialize_fault(&self) -> Result<FaultMessage, FrameError> {
deserialize_archived_bytes::<ArchivedFaultMessage, FaultMessage>(self.payload_bytes)
self.deserialize_payload::<ArchivedFaultMessage, FaultMessage>()
}
fn deserialize_payload<A, T>(&self) -> Result<T, FrameError>
where
A: rkyv::Portable
+ for<'b> rkyv::bytecheck::CheckBytes<rkyv::api::high::HighValidator<'b, Error>>,
T: rkyv::Archive,
A: rkyv::Deserialize<T, rkyv::api::high::HighDeserializer<Error>>,
{
deserialize_archived_bytes::<A, T>(self.payload_bytes)
}
}
/// Encodes a packet header and payload using the aligned two-section frame format.
///
/// The frame starts with two big-endian `u32` lengths, followed by an aligned archived header
/// section and an aligned archived payload section. Both sections use [`SECTION_ALIGN`] so the
/// archived bytes can usually be accessed without a fallback copy on decode.
pub fn encode_packet<P>(header: &PacketHeader, payload: &P) -> Result<FrameBytes, FrameError>
where
P: for<'a> Serialize<
@@ -107,6 +136,9 @@ where
}
/// Decodes one aligned two-section frame.
///
/// This rejects trailing bytes instead of silently ignoring them, so callers can treat one byte
/// slice as exactly one protocol frame.
pub fn decode_frame(bytes: &[u8]) -> Result<ParsedFrame<'_>, FrameError> {
let (header_bytes, payload_bytes) = split_frame_sections(bytes)?;
let header = deserialize_section::<ArchivedPacketHeader, PacketHeader>(
@@ -121,6 +153,10 @@ pub fn decode_frame(bytes: &[u8]) -> Result<ParsedFrame<'_>, FrameError> {
}
/// Deserializes one archived byte section.
///
/// Payload bytes normally come from [`decode_frame`] or one of [`ParsedFrame`]`'s`
/// `deserialize_*` helpers. This function remains public for callers that archive nested
/// application payloads inside protocol `data` fields.
pub fn deserialize_archived_bytes<A, T>(bytes: &[u8]) -> Result<T, FrameError>
where
A: rkyv::Portable
@@ -158,6 +194,8 @@ fn split_frame_sections(bytes: &[u8]) -> Result<(&[u8], &[u8]), FrameError> {
let payload_start = align_up(header_end, SECTION_ALIGN);
let payload_end = payload_start + payload_len;
if payload_end != bytes.len() {
// Framed packets do not permit trailing bytes. Treating the slice as exactly one frame
// keeps stream framing bugs visible instead of silently accepting concatenated payloads.
return Err(FrameError::Truncated);
}
@@ -191,6 +229,9 @@ where
return deserialize::<T, Error>(archived).map_err(invalid);
}
// Archived types may require stronger alignment than a borrowed byte slice can guarantee.
// Copy into an aligned buffer so callers can still decode valid frames from arbitrary input
// sources instead of rejecting them purely for allocation layout reasons.
let mut aligned: FrameBytes = FrameBytes::with_capacity(bytes.len());
aligned.extend_from_slice(bytes);
let archived = access::<A, Error>(&aligned).map_err(invalid)?;
+13
View File
@@ -4,25 +4,38 @@ use alloc::{string::String, vec::Vec};
use rkyv::{Archive, Deserialize, Serialize};
/// Reserved procedure id for protocol introspection.
///
/// The protocol uses the empty string here so discovery traffic stays outside the normal
/// application procedure namespace. [`crate::protocol::validate_procedure_id`] reserves that
/// value exclusively for introspection.
pub const INTROSPECTION_PROCEDURE_ID: &str = "";
/// Endpoint-wide introspection payload.
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct EndpointIntrospection {
/// Direct child endpoint segment names hosted immediately below this endpoint.
pub sub_endpoints: Vec<String>,
/// Leaf summaries hosted directly at this endpoint.
pub leaves: Vec<LeafIntrospectionSummary>,
}
/// Shared per-leaf discovery record.
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct LeafIntrospectionSummary {
/// Canonical dotted leaf identifier.
pub leaf_name: String,
/// Exhaustive canonical procedure ids currently exposed by the leaf.
pub procedures: Vec<String>,
}
/// Leaf-specific introspection payload.
///
/// This duplicates [`LeafIntrospectionSummary`] intentionally because the leaf-only response is
/// a distinct wire payload from the endpoint-wide discovery response.
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct LeafIntrospection {
/// Canonical dotted leaf identifier.
pub leaf_name: String,
/// Exhaustive canonical procedure ids currently exposed by the leaf.
pub procedures: Vec<String>,
}
+12 -1
View File
@@ -1,4 +1,15 @@
//! Canonical UnShell protocol modules.
//! Canonical UnShell protocol surface.
//!
//! This module is the stable facade for wire-level protocol types, framing, and
//! stateless validation helpers. Callers normally:
//! - build one [`PacketHeader`] plus payload type from this module,
//! - encode it with [`encode_packet`],
//! - decode inbound bytes with [`decode_frame`], and
//! - validate message/header shape with [`validate_header`], [`validate_call`], and
//! [`validate_procedure_id`].
//!
//! The concrete wire structs live in the private `types` module and are re-exported here so the
//! public API stays flat while internal archived-type details remain hidden.
pub mod codec;
pub mod introspection;
+2 -1
View File
@@ -244,7 +244,8 @@ fn procedure_runtime_keeps_session_after_local_end_until_explicit_close() {
)
.expect("local end trigger should encode");
let EndpointOutcome::Forward {
frame: local_end_frame, ..
frame: local_end_frame,
..
} = local_end
else {
panic!("controller should forward local end trigger");
+95 -68
View File
@@ -221,6 +221,7 @@ impl<L> LeafRuntime<L>
where
L: CallLeaf + super::CallProcedures<Error = <L as CallLeaf>::Error>,
{
/// Delivers one inbound frame into the stateful leaf runtime.
pub fn receive(
&mut self,
ingress: &Ingress,
@@ -230,6 +231,7 @@ where
self.process_endpoint_outcome(outcome)
}
/// Polls the leaf for locally-generated hook traffic and routes any emitted frames.
pub fn poll(&mut self) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let outgoing = self.leaf.poll().map_err(LeafRuntimeError::Leaf)?;
self.emit_outgoing(outgoing)
@@ -248,80 +250,105 @@ where
frames: Vec::new(),
dropped: true,
}),
crate::protocol::tree::EndpointOutcome::Local(event) => {
let mut runtime = RuntimeOutcome::default();
crate::protocol::tree::EndpointOutcome::Local(event) => self.process_local_event(event),
}
}
match event {
LocalEvent::Call { header, message } => {
let CallMessage {
procedure_id,
data,
response_hook,
} = message;
let fault_hook = response_hook.as_ref();
let incoming = IncomingCall {
header,
message: CallMessage {
procedure_id: procedure_id.clone(),
data,
response_hook: response_hook.clone(),
},
};
match self.leaf.dispatch_call(incoming) {
Ok(CallReply::Reply(bytes)) => {
if let Some(hook) = response_hook {
runtime.frames.extend(self.send_reply_data(
hook,
procedure_id,
bytes,
true,
)?);
}
}
Ok(CallReply::NoReply) => {}
Err(error) => {
runtime
.frames
.extend(self.emit_internal_fault_if_possible(fault_hook)?);
return Err(LeafRuntimeError::Dispatch(error));
}
}
}
LocalEvent::Data {
header,
message,
hook_key,
} => {
let outgoing = self
.leaf
.on_data(IncomingData {
header,
message,
hook_key,
})
.map_err(LeafRuntimeError::Leaf)?;
runtime.frames.extend(self.emit_outgoing(outgoing)?.frames);
}
LocalEvent::Fault {
header,
message,
hook_key,
} => {
self.leaf
.on_fault(IncomingFault {
header,
fault: message,
hook_key,
})
.map_err(LeafRuntimeError::Leaf)?;
}
}
fn process_local_event(
&mut self,
event: LocalEvent,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
match event {
LocalEvent::Call { header, message } => self.process_local_call(header, message),
LocalEvent::Data {
header,
message,
hook_key,
} => self.process_local_data(header, message, hook_key),
LocalEvent::Fault {
header,
message,
hook_key,
} => self.process_local_fault(header, message, hook_key),
}
}
Ok(runtime)
fn process_local_call(
&mut self,
header: PacketHeader,
message: CallMessage,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let CallMessage {
procedure_id,
data,
response_hook,
} = message;
let fault_hook = response_hook.as_ref();
let incoming = IncomingCall {
header,
// Split the payload apart so the reply path can reuse the owned procedure id and
// response hook without re-decoding the incoming bytes.
message: CallMessage {
procedure_id: procedure_id.clone(),
data,
response_hook: response_hook.clone(),
},
};
match self.leaf.dispatch_call(incoming) {
Ok(CallReply::Reply(bytes)) => {
let frames = if let Some(hook) = response_hook {
self.send_reply_data(hook, procedure_id, bytes, true)?
} else {
Vec::new()
};
Ok(RuntimeOutcome {
frames,
dropped: false,
})
}
Ok(CallReply::NoReply) => Ok(RuntimeOutcome::default()),
Err(error) => {
let frames = self.emit_internal_fault_if_possible(fault_hook)?;
let _ = frames;
Err(LeafRuntimeError::Dispatch(error))
}
}
}
fn process_local_data(
&mut self,
header: PacketHeader,
message: DataMessage,
hook_key: HookKey,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let outgoing = self
.leaf
.on_data(IncomingData {
header,
message,
hook_key,
})
.map_err(LeafRuntimeError::Leaf)?;
self.emit_outgoing(outgoing)
}
fn process_local_fault(
&mut self,
header: PacketHeader,
message: crate::protocol::FaultMessage,
hook_key: HookKey,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
self.leaf
.on_fault(IncomingFault {
header,
fault: message,
hook_key,
})
.map_err(LeafRuntimeError::Leaf)?;
Ok(RuntimeOutcome::default())
}
fn emit_outgoing(
&mut self,
outgoing: Vec<OutgoingData>,
+26 -15
View File
@@ -104,6 +104,9 @@ impl ProtocolEndpoint {
#[must_use]
/// Creates an endpoint with compiled routing tables for its current topology.
///
/// `parent_path` is currently used only as a presence flag. The endpoint stores its own
/// absolute `path`, and routing only needs to know whether an upward route exists.
pub fn new(
path: Vec<String>,
parent_path: Option<Vec<String>>,
@@ -177,10 +180,7 @@ impl ProtocolEndpoint {
match self.decide_route(&header.dst_path) {
RouteDecision::Local => self.handle_local_call(header, call),
RouteDecision::Drop => {
if let Some(hook) = &call.response_hook {
self.hooks
.remove_pending(&HookKey::new(hook.return_path.clone(), hook.hook_id));
}
self.rollback_pending_call_hook(&call);
Ok(EndpointOutcome::Dropped)
}
route => Ok(EndpointOutcome::Forward {
@@ -232,17 +232,7 @@ impl ProtocolEndpoint {
self.prepare_data(dst_path, hook_id, procedure_id, data, end_hook)?;
if end_hook {
// Locally-originated streams may not have been resolved against a peer yet,
// so fall back to the endpoint's own hook key shape when closing them.
let local_hook_key = self
.hooks
.resolve_active_key(&local_end_dst_path, hook_id, &self.path)
.unwrap_or_else(|| host_key.clone());
if self.hooks.pending(&host_key).is_some() {
self.hooks.mark_pending_local_end(&host_key);
} else if self.hooks.mark_local_end(&local_hook_key) {
self.hooks.remove_active(&local_hook_key);
}
self.mark_local_stream_end(&local_end_dst_path, hook_id, &host_key);
}
match self.decide_route(&header.dst_path) {
@@ -254,4 +244,25 @@ impl ProtocolEndpoint {
}),
}
}
fn rollback_pending_call_hook(&mut self, call: &CallMessage) {
if let Some(hook) = &call.response_hook {
self.hooks
.remove_pending(&HookKey::new(hook.return_path.clone(), hook.hook_id));
}
}
fn mark_local_stream_end(&mut self, dst_path: &[String], hook_id: u64, host_key: &HookKey) {
// Locally-originated streams may not have been resolved against a peer yet, so fall
// back to the endpoint's own hook key shape when closing them.
let local_hook_key = self
.hooks
.resolve_active_key(dst_path, hook_id, &self.path)
.unwrap_or_else(|| host_key.clone());
if self.hooks.pending(host_key).is_some() {
self.hooks.mark_pending_local_end(host_key);
} else if self.hooks.mark_local_end(&local_hook_key) {
self.hooks.remove_active(&local_hook_key);
}
}
}
+21 -1
View File
@@ -24,6 +24,7 @@ pub struct ChildRoute {
impl ChildRoute {
#[must_use]
/// Builds one child route that is immediately eligible for routing decisions.
pub fn registered(path: Vec<String>) -> Self {
Self {
path,
@@ -44,26 +45,40 @@ pub struct LeafSpec {
/// Where an inbound frame entered this endpoint.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Ingress {
/// The frame arrived from the parent side of the tree.
Parent,
/// The frame arrived from one direct child, identified by that child's absolute path.
Child(Vec<String>),
/// The frame originated locally at this endpoint.
Local,
}
/// Event produced when the endpoint handles a packet locally.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LocalEvent {
/// One opening `Call` packet validated and delivered to local code.
Call {
/// Validated protocol header for the packet.
header: PacketHeader,
/// Deserialized call payload.
message: CallMessage,
},
/// One hook-associated `Data` packet validated and delivered locally.
Data {
/// Validated protocol header for the packet.
header: PacketHeader,
/// Deserialized data payload.
message: DataMessage,
/// Canonical host-scoped hook key resolved for this hook stream.
hook_key: HookKey,
},
/// One hook-associated `Fault` packet validated and delivered locally.
Fault {
/// Validated protocol header for the packet.
header: PacketHeader,
/// Deserialized fault payload.
message: FaultMessage,
/// Canonical host-scoped hook key resolved for this hook stream.
hook_key: HookKey,
},
}
@@ -72,7 +87,10 @@ pub enum LocalEvent {
#[derive(Debug)]
pub enum EndpointOutcome {
/// Frame to forward, together with the next routing decision.
Forward { route: RouteDecision, frame: FrameBytes },
Forward {
route: RouteDecision,
frame: FrameBytes,
},
/// Locally-delivered protocol event.
Local(LocalEvent),
/// Packet intentionally discarded.
@@ -82,7 +100,9 @@ pub enum EndpointOutcome {
/// Error surfaced while validating or encoding protocol frames.
#[derive(Debug)]
pub enum EndpointError {
/// Framing, archive decode, or archive encode failed.
Frame(FrameError),
/// One protocol invariant failed validation.
Validation(ValidationError),
}
+9
View File
@@ -81,6 +81,9 @@ impl ProtocolEndpoint {
if active.procedure_id != message.procedure_id {
// Data frames stay bound to the procedure chosen by the original call.
// A procedure mismatch is dropped rather than faulted because the wrong peer may be
// replaying stale traffic, and converting that into a terminal hook fault would let a
// stray packet tear down an otherwise valid stream.
return Ok(EndpointOutcome::Dropped);
}
@@ -134,6 +137,12 @@ impl ProtocolEndpoint {
self.routing.route(dst_path)
}
/// Returns whether one `src_path` is topologically valid for the ingress side that delivered
/// the frame.
///
/// Parent ingress may carry packets from ancestors, siblings, or the endpoint itself, but not
/// from descendants pretending to be upstream. Child ingress may only carry packets from that
/// child subtree, and local ingress must exactly match the endpoint path.
pub(crate) fn valid_source_for_ingress(&self, ingress: &Ingress, src_path: &[String]) -> bool {
match ingress {
Ingress::Parent => {
+31 -15
View File
@@ -1,6 +1,6 @@
//! Introspection response generation.
use alloc::string::String;
use alloc::{string::String, vec::Vec};
use rkyv::{rancor::Error as RkyvError, to_bytes};
use crate::protocol::{
@@ -25,20 +25,13 @@ impl ProtocolEndpoint {
let Some(leaf) = self.leaves.get(leaf_name) else {
return self.emit_fault_if_possible(Some(key), ProtocolFault::UNKNOWN_LEAF);
};
to_bytes::<RkyvError>(&LeafIntrospection {
self.serialize_introspection(&LeafIntrospection {
leaf_name: leaf_name.clone(),
procedures: leaf.procedures.clone(),
})
.map_err(|error| EndpointError::Frame(FrameError::Serialize(error)))?
.to_vec()
})?
} else {
to_bytes::<RkyvError>(&EndpointIntrospection {
sub_endpoints: self
.children
.iter()
.filter(|child| child.registered)
.filter_map(|child| child.path.get(self.path.len()).cloned())
.collect(),
self.serialize_introspection(&EndpointIntrospection {
sub_endpoints: self.direct_registered_child_names(),
leaves: self
.leaves
.values()
@@ -47,9 +40,7 @@ impl ProtocolEndpoint {
procedures: leaf.procedures.clone(),
})
.collect(),
})
.map_err(|error| EndpointError::Frame(FrameError::Serialize(error)))?
.to_vec()
})?
};
let response_header = PacketHeader {
@@ -84,4 +75,29 @@ impl ProtocolEndpoint {
}),
}
}
fn direct_registered_child_names(&self) -> Vec<String> {
self.children
.iter()
.filter(|child| child.registered)
// Child routes store absolute endpoint paths. Index the first segment below the
// current endpoint so discovery only reports direct descendants.
.filter_map(|child| child.path.get(self.path.len()).cloned())
.collect()
}
fn serialize_introspection<T>(&self, value: &T) -> Result<Vec<u8>, EndpointError>
where
T: for<'a> rkyv::Serialize<
rkyv::api::high::HighSerializer<
rkyv::util::AlignedVec,
rkyv::ser::allocator::ArenaHandle<'a>,
RkyvError,
>,
>,
{
to_bytes::<RkyvError>(value)
.map_err(|error| EndpointError::Frame(FrameError::Serialize(error)))
.map(|bytes| bytes.to_vec())
}
}
+69 -66
View File
@@ -54,6 +54,8 @@ impl ProtocolEndpoint {
if let Some(hook) = &message.response_hook
&& hook.return_path != self.path
{
// Calls targeting this endpoint may still ask another endpoint to host the response
// hook. Only register a local active hook when the response path escapes this node.
let Some(key) = key.clone() else {
unreachable!("response_hook checked above");
};
@@ -76,6 +78,65 @@ impl ProtocolEndpoint {
Ok(EndpointOutcome::Local(LocalEvent::Call { header, message }))
}
fn receive_call(
&mut self,
ingress: &Ingress,
parsed: crate::protocol::ParsedFrame<'_>,
) -> Result<EndpointOutcome, EndpointError> {
// Calls only enter from the parent side of the tree or from the endpoint itself.
// Children can return data/faults, but they do not initiate new calls through this node.
if !matches!(ingress, Ingress::Parent | Ingress::Local) {
return Ok(EndpointOutcome::Dropped);
}
let (header, payload) = parsed.into_parts();
let message = deserialize_archived_bytes::<ArchivedCallMessage, CallMessage>(payload)?;
validate_call(&header, &message)?;
self.handle_local_call(header, message)
}
fn receive_data(
&mut self,
parsed: crate::protocol::ParsedFrame<'_>,
) -> Result<EndpointOutcome, EndpointError> {
let (header, payload) = parsed.into_parts();
let message = deserialize_archived_bytes::<
ArchivedDataMessage,
crate::protocol::DataMessage,
>(payload)?;
self.handle_local_data(header, message)
}
fn receive_fault(
&mut self,
parsed: crate::protocol::ParsedFrame<'_>,
) -> Result<EndpointOutcome, EndpointError> {
let (header, payload) = parsed.into_parts();
let message = deserialize_archived_bytes::<
ArchivedFaultMessage,
crate::protocol::FaultMessage,
>(payload)?;
self.handle_local_fault(header, message)
}
fn forward_or_drop(
route: RouteDecision,
frame: crate::protocol::FrameBytes,
) -> EndpointOutcome {
match route {
RouteDecision::Child(index) => EndpointOutcome::Forward {
route: RouteDecision::Child(index),
frame,
},
RouteDecision::Parent => EndpointOutcome::Forward {
route: RouteDecision::Parent,
frame,
},
RouteDecision::Drop => EndpointOutcome::Dropped,
RouteDecision::Local => unreachable!("local routes are handled before forwarding"),
}
}
}
impl Endpoint for ProtocolEndpoint {
@@ -96,73 +157,15 @@ impl Endpoint for ProtocolEndpoint {
return Ok(EndpointOutcome::Dropped);
}
match header.packet_type {
crate::protocol::PacketType::Call => {
// Calls only enter from the parent side of the tree or from the endpoint
// itself. Children can return data/faults, but they do not initiate new
// calls through this node.
if !matches!(ingress, Ingress::Parent | Ingress::Local) {
return Ok(EndpointOutcome::Dropped);
}
let route = self.decide_route(&header.dst_path);
if route != RouteDecision::Local {
return Ok(Self::forward_or_drop(route, frame));
}
match self.decide_route(&header.dst_path) {
RouteDecision::Child(index) => Ok(EndpointOutcome::Forward {
route: RouteDecision::Child(index),
frame,
}),
RouteDecision::Parent => Ok(EndpointOutcome::Forward {
route: RouteDecision::Parent,
frame,
}),
RouteDecision::Drop => Ok(EndpointOutcome::Dropped),
RouteDecision::Local => {
let (header, payload) = parsed.into_parts();
let message = deserialize_archived_bytes::<ArchivedCallMessage, CallMessage>(
payload,
)?;
validate_call(&header, &message)?;
self.handle_local_call(header, message)
}
}
}
crate::protocol::PacketType::Data => match self.decide_route(&header.dst_path) {
RouteDecision::Local => {
let (header, payload) = parsed.into_parts();
let message = deserialize_archived_bytes::<
ArchivedDataMessage,
crate::protocol::DataMessage,
>(payload)?;
self.handle_local_data(header, message)
}
RouteDecision::Child(index) => Ok(EndpointOutcome::Forward {
route: RouteDecision::Child(index),
frame,
}),
RouteDecision::Parent => Ok(EndpointOutcome::Forward {
route: RouteDecision::Parent,
frame,
}),
RouteDecision::Drop => Ok(EndpointOutcome::Dropped),
},
crate::protocol::PacketType::Fault => match self.decide_route(&header.dst_path) {
RouteDecision::Local => {
let (header, payload) = parsed.into_parts();
let message = deserialize_archived_bytes::<
ArchivedFaultMessage,
crate::protocol::FaultMessage,
>(payload)?;
self.handle_local_fault(header, message)
}
RouteDecision::Child(index) => Ok(EndpointOutcome::Forward {
route: RouteDecision::Child(index),
frame,
}),
RouteDecision::Parent => Ok(EndpointOutcome::Forward {
route: RouteDecision::Parent,
frame,
}),
RouteDecision::Drop => Ok(EndpointOutcome::Dropped),
},
match header.packet_type {
crate::protocol::PacketType::Call => self.receive_call(ingress, parsed),
crate::protocol::PacketType::Data => self.receive_data(parsed),
crate::protocol::PacketType::Fault => self.receive_fault(parsed),
}
}
}
+10
View File
@@ -73,6 +73,10 @@ impl HookTable {
///
/// Hook ids are scoped by host path, so this only needs to guarantee uniqueness within the
/// local table. The wrapped increment keeps allocation infallible for long-lived runtimes.
///
/// The table currently uses one counter shared across all host paths. The `return_path`
/// parameter remains in the API because hook ids are still interpreted as host-scoped by the
/// rest of the protocol surface.
#[must_use]
pub fn allocate_hook_id(&mut self, _return_path: &[String]) -> u64 {
let id = self.next_id.max(1);
@@ -197,6 +201,9 @@ impl HookTable {
}
/// Marks the local side finished and returns `true` once both sides are finished.
///
/// This does not remove the hook. Callers use the boolean to decide whether cleanup should
/// happen immediately or whether the peer side is still expected to send more traffic.
pub fn mark_local_end(&mut self, key: &HookKey) -> bool {
let Some(active) = self.active_mut(key) else {
return false;
@@ -206,6 +213,9 @@ impl HookTable {
}
/// Marks the peer side finished and returns `true` once both sides are finished.
///
/// This mirrors [`mark_local_end`](Self::mark_local_end): it only reports completion, leaving
/// final removal to the caller so higher layers can decide when to tear down hook state.
pub fn mark_peer_end(&mut self, key: &HookKey) -> bool {
let Some(active) = self.active_mut(key) else {
return false;
+28
View File
@@ -16,6 +16,7 @@ pub trait ProtocolLeaf {
/// Generated call metadata and initial `Call` dispatch for one leaf.
pub trait CallProcedures: ProtocolLeaf {
/// Leaf-specific error surfaced when generated call dispatch fails.
type Error;
/// Returns the local procedure suffixes supported by this leaf.
@@ -50,6 +51,10 @@ pub trait CallProcedures: ProtocolLeaf {
}
/// Dispatches one initial `Call` that targeted this leaf.
///
/// Implementations may assume the endpoint already proved the call targets this leaf.
/// They are still responsible for decoding the typed input payload and deciding which local
/// procedure suffix should run.
fn dispatch_call(
&mut self,
call: crate::protocol::tree::IncomingCall,
@@ -66,6 +71,26 @@ pub trait CallProcedures: ProtocolLeaf {
/// casing into protocol-visible names. Deterministic is not the same as stable
/// across refactors, so shipped protocol surfaces should prefer explicit `id`
/// overrides.
///
/// # Example
/// ```rust
/// use unshell::protocol::tree::derive_leaf_name;
///
/// let leaf = derive_leaf_name(
/// "unshell-core",
/// "0",
/// "1",
/// "0",
/// "unshell_core::examples::demo_shell",
/// "ShellLeaf",
/// None,
/// None,
/// None,
/// None,
/// None,
/// );
/// assert_eq!(leaf, "unshell_core.unshell_core.v0_1_0.examples.demo_shell.shell_leaf");
/// ```
#[allow(clippy::too_many_arguments)]
// This helper mirrors derive-macro inputs directly so callers do not have to allocate an
// intermediate metadata struct just to compute one deterministic protocol identifier.
@@ -138,6 +163,7 @@ fn normalize_leaf_segment(value: &str) -> String {
for character in value.chars() {
if character.is_ascii_uppercase() {
// Preserve CamelCase word boundaries in a snake_case protocol identifier.
if !normalized.is_empty() && !previous_was_separator {
normalized.push('_');
}
@@ -163,6 +189,8 @@ fn normalize_leaf_segment(value: &str) -> String {
}
if normalized.is_empty() {
// Protocol identifiers still need a stable non-empty placeholder when user input is all
// punctuation or whitespace.
String::from("leaf")
} else {
normalized
+2
View File
@@ -4,6 +4,8 @@
//! - `routing` contains static path declarations and longest-prefix routing helpers.
//! - `hook` contains the pending/active hook lifecycle tables used by endpoint runtime code.
//! - `endpoint` ties those pieces together into the runtime-facing protocol endpoint API.
//! - `leaf` defines application-facing metadata and generated call-dispatch traits.
//! - `call` and `procedure` layer higher-level runtimes on top of validated endpoint events.
mod call;
mod endpoint;
+190 -143
View File
@@ -185,7 +185,10 @@ impl ProcedureEffect {
pub enum ProcedureRuntimeError<E> {
/// Protocol endpoint routing or framing failed.
Endpoint(EndpointError),
/// The opening call failed to decode or open cleanly.
/// The opening call failed to decode or open cleanly before a session existed.
///
/// Once a session is already live, runtime failures prefer emitting protocol faults and
/// tearing down that session rather than surfacing leaf errors directly.
Decode(super::DispatchError<E>),
}
@@ -298,35 +301,12 @@ where
.collect::<Vec<_>>();
for key in keys {
let Some(mut session) = self.leaf.procedure_sessions().remove(&key) else {
let Some(session) = self.leaf.procedure_sessions().remove(&key) else {
continue;
};
let effect = match P::poll(&mut self.leaf, &mut session) {
Ok(effect) => self.ensure_terminal_packet(&key, effect),
Err(error) => {
let _ = P::close(&mut self.leaf, session);
frames.extend(self.emit_internal_fault(Some(key.clone()))?);
let _ = error;
continue;
}
};
match self.emit_outgoing(effect.outgoing) {
Ok(outgoing) => frames.extend(outgoing.frames),
Err(error) => {
if !effect.close_session {
self.leaf.procedure_sessions().insert(key, session);
} else {
let _ = P::close(&mut self.leaf, session);
}
return Err(error);
}
}
if !effect.close_session {
self.leaf.procedure_sessions().insert(key, session);
} else {
let _ = P::close(&mut self.leaf, session);
match self.poll_session(key, session)? {
Some(session_frames) => frames.extend(session_frames),
None => continue,
}
}
@@ -349,124 +329,187 @@ where
frames: Vec::new(),
dropped: true,
}),
super::EndpointOutcome::Local(event) => {
let mut runtime = ProcedureRuntimeOutcome::default();
match event {
LocalEvent::Call { header, message } => {
if message.procedure_id != P::procedure_id() {
runtime.frames.extend(
self.emit_internal_fault_if_possible(
message.response_hook.as_ref(),
)?,
);
return Ok(runtime);
}
let Some(hook) = message.response_hook.as_ref() else {
return Ok(runtime);
};
let hook_key = HookKey::new(hook.return_path.clone(), hook.hook_id);
let session = match self.open_session(header, message) {
Ok(session) => session,
Err(error) => {
runtime
.frames
.extend(self.emit_internal_fault(Some(hook_key.clone()))?);
let _ = error;
return Ok(runtime);
}
};
self.leaf.procedure_sessions().insert(hook_key, session);
}
LocalEvent::Data {
header,
message,
hook_key,
} => {
let Some(mut session) = self.leaf.procedure_sessions().remove(&hook_key)
else {
return Ok(runtime);
};
let effect = match P::on_data(
&mut self.leaf,
&mut session,
IncomingData {
header,
message,
hook_key: hook_key.clone(),
},
) {
Ok(effect) => self.ensure_terminal_packet(&hook_key, effect),
Err(error) => {
let _ = P::close(&mut self.leaf, session);
runtime
.frames
.extend(self.emit_internal_fault(Some(hook_key.clone()))?);
let _ = error;
return Ok(runtime);
}
};
match self.emit_outgoing(effect.outgoing) {
Ok(outgoing) => runtime.frames.extend(outgoing.frames),
Err(error) => {
if !effect.close_session {
self.leaf.procedure_sessions().insert(hook_key, session);
} else {
let _ = P::close(&mut self.leaf, session);
}
return Err(error);
}
}
if !effect.close_session {
self.leaf.procedure_sessions().insert(hook_key, session);
} else {
let _ = P::close(&mut self.leaf, session);
}
}
LocalEvent::Fault {
header,
message,
hook_key,
} => {
let Some(mut session) = self.leaf.procedure_sessions().remove(&hook_key)
else {
return Ok(runtime);
};
let on_fault_result = P::on_fault(
&mut self.leaf,
&mut session,
IncomingFault {
header,
fault: message,
hook_key: hook_key.clone(),
},
);
let close_result = P::close(&mut self.leaf, session);
if let Err(error) = on_fault_result {
let _ = close_result;
runtime
.frames
.extend(self.emit_internal_fault(Some(hook_key.clone()))?);
let _ = error;
return Ok(runtime);
}
if let Err(error) = close_result {
runtime
.frames
.extend(self.emit_internal_fault(Some(hook_key))?);
let _ = error;
return Ok(runtime);
}
}
}
Ok(runtime)
}
super::EndpointOutcome::Local(event) => self.process_local_event(event),
}
}
fn poll_session(
&mut self,
key: HookKey,
mut session: P,
) -> Result<Option<Vec<FrameBytes>>, ProcedureRuntimeError<P::Error>> {
let effect = match P::poll(&mut self.leaf, &mut session) {
Ok(effect) => self.ensure_terminal_packet(&key, effect),
Err(error) => {
let _ = P::close(&mut self.leaf, session);
let frames = self.emit_internal_fault(Some(key.clone()))?;
let _ = error;
return Ok(Some(frames));
}
};
let outgoing = match self.emit_outgoing(effect.outgoing) {
Ok(outgoing) => outgoing.frames,
Err(error) => {
if !effect.close_session {
self.leaf.procedure_sessions().insert(key, session);
} else {
let _ = P::close(&mut self.leaf, session);
}
return Err(error);
}
};
if !effect.close_session {
self.leaf.procedure_sessions().insert(key, session);
} else {
let _ = P::close(&mut self.leaf, session);
}
Ok(Some(outgoing))
}
fn process_local_event(
&mut self,
event: LocalEvent,
) -> Result<ProcedureRuntimeOutcome, ProcedureRuntimeError<P::Error>> {
match event {
LocalEvent::Call { header, message } => self.process_local_call(header, message),
LocalEvent::Data {
header,
message,
hook_key,
} => self.process_local_data(header, message, hook_key),
LocalEvent::Fault {
header,
message,
hook_key,
} => self.process_local_fault(header, message, hook_key),
}
}
fn process_local_call(
&mut self,
header: crate::protocol::PacketHeader,
message: CallMessage,
) -> Result<ProcedureRuntimeOutcome, ProcedureRuntimeError<P::Error>> {
let mut runtime = ProcedureRuntimeOutcome::default();
if message.procedure_id != P::procedure_id() {
runtime
.frames
.extend(self.emit_internal_fault_if_possible(message.response_hook.as_ref())?);
return Ok(runtime);
}
let Some(hook) = message.response_hook.as_ref() else {
return Ok(runtime);
};
let hook_key = HookKey::new(hook.return_path.clone(), hook.hook_id);
let session = match self.open_session(header, message) {
Ok(session) => session,
Err(error) => {
runtime
.frames
.extend(self.emit_internal_fault(Some(hook_key.clone()))?);
let _ = error;
return Ok(runtime);
}
};
self.leaf.procedure_sessions().insert(hook_key, session);
Ok(runtime)
}
fn process_local_data(
&mut self,
header: crate::protocol::PacketHeader,
message: crate::protocol::DataMessage,
hook_key: HookKey,
) -> Result<ProcedureRuntimeOutcome, ProcedureRuntimeError<P::Error>> {
let Some(mut session) = self.leaf.procedure_sessions().remove(&hook_key) else {
return Ok(ProcedureRuntimeOutcome::default());
};
let effect = match P::on_data(
&mut self.leaf,
&mut session,
IncomingData {
header,
message,
hook_key: hook_key.clone(),
},
) {
Ok(effect) => self.ensure_terminal_packet(&hook_key, effect),
Err(error) => {
let _ = P::close(&mut self.leaf, session);
let frames = self.emit_internal_fault(Some(hook_key.clone()))?;
let _ = error;
return Ok(ProcedureRuntimeOutcome {
frames,
dropped: false,
});
}
};
let outgoing = match self.emit_outgoing(effect.outgoing) {
Ok(outgoing) => outgoing.frames,
Err(error) => {
if !effect.close_session {
self.leaf.procedure_sessions().insert(hook_key, session);
} else {
let _ = P::close(&mut self.leaf, session);
}
return Err(error);
}
};
if !effect.close_session {
self.leaf.procedure_sessions().insert(hook_key, session);
} else {
let _ = P::close(&mut self.leaf, session);
}
Ok(ProcedureRuntimeOutcome {
frames: outgoing,
dropped: false,
})
}
fn process_local_fault(
&mut self,
header: crate::protocol::PacketHeader,
message: crate::protocol::FaultMessage,
hook_key: HookKey,
) -> Result<ProcedureRuntimeOutcome, ProcedureRuntimeError<P::Error>> {
let Some(mut session) = self.leaf.procedure_sessions().remove(&hook_key) else {
return Ok(ProcedureRuntimeOutcome::default());
};
let on_fault_result = P::on_fault(
&mut self.leaf,
&mut session,
IncomingFault {
header,
fault: message,
hook_key: hook_key.clone(),
},
);
let close_result = P::close(&mut self.leaf, session);
if let Err(error) = on_fault_result {
let _ = close_result;
let frames = self.emit_internal_fault(Some(hook_key.clone()))?;
let _ = error;
return Ok(ProcedureRuntimeOutcome {
frames,
dropped: false,
});
}
if let Err(error) = close_result {
let frames = self.emit_internal_fault(Some(hook_key))?;
let _ = error;
return Ok(ProcedureRuntimeOutcome {
frames,
dropped: false,
});
}
Ok(ProcedureRuntimeOutcome::default())
}
fn open_session(
&mut self,
header: crate::protocol::PacketHeader,
@@ -543,6 +586,10 @@ where
Ok(self.process_endpoint_outcome(outcome)?.frames)
}
/// Ensures a closing session leaves the protocol hook in a fully terminated state.
///
/// If leaf code requests `close_session` without emitting an explicit terminal packet, the
/// runtime synthesizes an empty final `Data` frame so the hook closes cleanly on the wire.
fn ensure_terminal_packet(
&self,
hook_key: &HookKey,
+20 -3
View File
@@ -5,15 +5,21 @@
use alloc::{collections::BTreeMap, string::String, vec, vec::Vec};
/// Explicit test tree declaration used for configuration.
/// Explicit tree declaration used for configuration and tests.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TreeNode {
/// The protocol root. Its path is always empty.
Root { children: Vec<Self> },
Root {
/// Direct child endpoints hosted below the root.
children: Vec<Self>,
},
/// An addressable endpoint segment in the tree.
Endpoint {
/// Path segment contributed by this endpoint.
segment: String,
/// Leaves hosted directly at this endpoint.
leaves: Vec<LeafNode>,
/// Direct child endpoints hosted below this endpoint.
children: Vec<Self>,
},
}
@@ -29,6 +35,8 @@ pub struct LeafNode {
impl TreeNode {
/// Flattens the explicit tree into the set of endpoint paths it declares.
///
/// The returned list always includes the protocol root as `[]`.
pub fn paths(&self) -> Vec<Vec<String>> {
let mut paths = Vec::new();
self.collect_paths(&[], &mut paths);
@@ -88,6 +96,9 @@ struct RouteTrieNode {
impl CompiledRoutes {
/// Compiles child endpoint paths into a trie rooted at `local_path`.
///
/// Only strict descendants of `local_path` participate in the compiled trie. Paths outside
/// the local subtree, or equal to `local_path` itself, are ignored.
#[must_use]
pub fn new(local_path: &[String], child_paths: &[Vec<String>], has_parent: bool) -> Self {
let mut routes = Self {
@@ -169,8 +180,11 @@ pub fn is_prefix(prefix: &[String], path: &[String]) -> bool {
.zip(path.iter())
.all(|(left, right)| left == right)
}
/// Trait for resolving a destination path to a routing decision.
///
/// The default policy is longest-prefix routing: exact matches stay local, the deepest matching
/// descendant wins for child forwarding, destinations outside the local subtree go to the parent
/// when one exists, and everything else drops.
pub trait RouteProvider {
/// Returns the route decision for `dst_path` from the perspective of `local_path`.
fn route_destination<I>(
@@ -209,6 +223,9 @@ impl RouteProvider for DefaultRouteProvider {
}
/// Resolves `dst_path` with the default longest-prefix route provider.
///
/// Exact matches return [`RouteDecision::Local`]. Destinations outside the local subtree return
/// [`RouteDecision::Parent`] when `has_parent` is `true`, otherwise [`RouteDecision::Drop`].
pub fn route_destination<I>(
local_path: &[String],
child_paths: I,
+31 -1
View File
@@ -18,50 +18,80 @@ pub enum PacketType {
/// Header fields used for routing and hook attribution.
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct PacketHeader {
/// Wire-level packet class, which determines which payload type follows.
pub packet_type: PacketType,
/// Absolute endpoint path that sent the packet.
pub src_path: Vec<String>,
/// Absolute endpoint path the packet is trying to reach.
pub dst_path: Vec<String>,
/// Optional leaf name inside `dst_path` that should receive a `Call` packet.
///
/// `Data` and `Fault` packets must leave this unset.
pub dst_leaf: Option<String>,
/// Hook identifier scoped to the receiving endpoint.
///
/// `Call` packets must leave this unset. `Data` and `Fault` packets must fill it in.
pub hook_id: Option<u64>,
}
/// Hook declaration embedded inside a call.
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct HookTarget {
/// Hook identifier reserved by the caller for returned `Data` or `Fault` traffic.
pub hook_id: u64,
/// Absolute endpoint path that should receive the response stream.
///
/// Protocol validation requires this to exactly match the enclosing call header's
/// `src_path`.
pub return_path: Vec<String>,
}
/// Downwards call payload.
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct CallMessage {
/// Canonical procedure identifier chosen by the caller.
pub procedure_id: String,
/// Opaque application payload for the target procedure.
pub data: Vec<u8>,
/// Optional response hook reservation for returned hook traffic.
pub response_hook: Option<HookTarget>,
}
/// Hook data payload.
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct DataMessage {
/// Canonical procedure identifier that owns the hook stream.
pub procedure_id: String,
/// Opaque application payload for the hook message.
pub data: Vec<u8>,
/// Whether this packet closes the peer side of the hook stream.
pub end_hook: bool,
}
/// Protocol fault payload.
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct FaultMessage {
/// Stable protocol-level reason code for the failure.
pub fault: ProtocolFault,
}
/// Stable protocol fault set.
/// Stable protocol fault code.
///
/// The raw numeric value is public so callers can persist, compare, or forward fault codes
/// without knowing every symbolic constant in advance. Unknown values are allowed so newer
/// peers can extend the set without breaking older runtimes.
#[derive(Archive, Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
pub struct ProtocolFault(pub u8);
impl ProtocolFault {
/// The addressed leaf name does not exist at the destination endpoint.
pub const UNKNOWN_LEAF: Self = Self(0x01);
/// The destination exists, but it does not expose the requested procedure id.
pub const UNKNOWN_PROCEDURE: Self = Self(0x02);
/// The packet source path is not valid for the ingress side where it arrived.
pub const INVALID_SOURCE_PATH: Self = Self(0x03);
/// Hook traffic arrived from a peer that does not own the active hook relationship.
pub const INVALID_HOOK_PEER: Self = Self(0x04);
/// The runtime hit an internal protocol failure and could only surface a generic fault.
pub const INTERNAL_ERROR: Self = Self(0x05);
}
+16 -1
View File
@@ -8,10 +8,15 @@ use core::fmt;
/// Validation failures for protocol structures.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ValidationError {
/// One header field combination is invalid for the chosen packet type.
HeaderInvariant(&'static str),
/// The procedure identifier violates the protocol's minimal reserved-id rules.
ProcedureId(&'static str),
/// The call payload contradicts the surrounding packet header.
CallInvariant(&'static str),
/// A hook lifecycle transition would break protocol state invariants.
HookInvariant(&'static str),
/// A hook id collided with existing endpoint-local state.
InvalidHookId,
}
@@ -29,7 +34,10 @@ impl fmt::Display for ValidationError {
impl core::error::Error for ValidationError {}
/// Validates packet header invariants from the protocol.
/// Validates stateless packet-header invariants.
///
/// This checks wire-shape rules only. It does not verify route existence, leaf existence,
/// hook ownership, or whether the destination actually supports the requested procedure.
pub fn validate_header(header: &PacketHeader) -> Result<(), ValidationError> {
match header.packet_type {
PacketType::Call => {
@@ -56,6 +64,9 @@ pub fn validate_header(header: &PacketHeader) -> Result<(), ValidationError> {
}
/// Validates the protocol-level `procedure_id` invariant.
///
/// This is intentionally permissive. The protocol reserves only the empty string for
/// introspection; every other non-empty identifier is treated as opaque application data.
pub fn validate_procedure_id(procedure_id: &str) -> Result<(), ValidationError> {
if procedure_id == INTROSPECTION_PROCEDURE_ID {
return Ok(());
@@ -69,6 +80,9 @@ pub fn validate_procedure_id(procedure_id: &str) -> Result<(), ValidationError>
}
/// Validates call-specific invariants that depend on both header and payload.
///
/// This complements [`validate_header`]. It does not verify destination reachability or leaf
/// support, only consistency between the opening `Call` header and payload.
pub fn validate_call(header: &PacketHeader, call: &CallMessage) -> Result<(), ValidationError> {
validate_procedure_id(&call.procedure_id)?;
@@ -81,6 +95,7 @@ pub fn validate_call(header: &PacketHeader, call: &CallMessage) -> Result<(), Va
}
if call.procedure_id == INTROSPECTION_PROCEDURE_ID && call.response_hook.is_none() {
// Introspection is defined as a request/response exchange, never a fire-and-forget call.
return Err(ValidationError::CallInvariant(
"introspection requires a response hook",
));