Clarify tree protocol runtime and routing

Document the hook lifecycle, ingress rules, and longest-prefix routing behavior so the tree endpoint code is easier to follow. Keep the pass behavior-neutral while tightening local names and comments around non-obvious protocol paths.
This commit is contained in:
Michael Mikovsky
2026-04-25 13:34:18 -06:00
parent d7a5a5d0e5
commit 9895248bbf
9 changed files with 152 additions and 35 deletions
+17 -5
View File
@@ -79,6 +79,9 @@ impl ProtocolEndpoint {
header: &PacketHeader,
call: &CallMessage,
) -> Result<(), EndpointError> {
// Outbound calls reserve their response hook before the frame is emitted so
// the endpoint can accept a synchronous local response path as well as a
// remote one.
if let Some(hook) = &call.response_hook
&& self
.hooks
@@ -99,20 +102,21 @@ impl ProtocolEndpoint {
}
#[must_use]
/// Creates an endpoint with compiled routing tables for its current topology.
pub fn new(
path: Vec<String>,
parent_path: Option<Vec<String>>,
children: Vec<ChildRoute>,
leaves: Vec<LeafSpec>,
) -> Self {
let registered_children = children
let registered_child_paths = children
.iter()
.filter(|child| child.state == super::core::ConnectionState::Registered)
.map(|child| child.path.clone())
.collect::<Vec<_>>();
Self {
routing: CompiledRoutes::new(&path, &registered_children, parent_path.is_some()),
routing: CompiledRoutes::new(&path, &registered_child_paths, parent_path.is_some()),
path,
children,
leaves: leaves
@@ -124,6 +128,7 @@ impl ProtocolEndpoint {
}
}
/// Registers a procedure that is handled directly by the endpoint.
pub fn add_endpoint_procedure(
&mut self,
procedure_id: impl Into<String>,
@@ -135,10 +140,12 @@ impl ProtocolEndpoint {
}
#[must_use]
/// Allocates a hook id scoped to this endpoint path.
pub fn allocate_hook_id(&mut self) -> u64 {
self.hooks.allocate_hook_id(&self.path)
}
/// Encodes a call frame without routing it through the local endpoint.
pub fn make_call(
&mut self,
dst_path: Vec<String>,
@@ -153,6 +160,7 @@ impl ProtocolEndpoint {
Ok(encode_packet(&header, &call)?)
}
/// Builds and immediately routes a call, producing either a forward or a local event.
pub fn send_call(
&mut self,
dst_path: Vec<String>,
@@ -174,6 +182,7 @@ impl ProtocolEndpoint {
}
}
/// Encodes a data frame without routing it through the local endpoint.
pub fn make_data(
&self,
dst_path: Vec<String>,
@@ -187,6 +196,7 @@ impl ProtocolEndpoint {
Ok(encode_packet(&header, &message)?)
}
/// Builds and immediately routes a data packet, updating local hook state for end-of-stream.
pub fn send_data(
&mut self,
dst_path: Vec<String>,
@@ -199,12 +209,14 @@ impl ProtocolEndpoint {
self.prepare_data(dst_path, hook_id, procedure_id, data, end_hook)?;
if end_hook {
let sender_key = self
// Locally-originated streams may not have been resolved against a peer yet,
// so fall back to the endpoint's own hook key shape when closing them.
let local_hook_key = self
.hooks
.resolve_active_key(&self.path, hook_id, &self.path)
.unwrap_or_else(|| HookKey::new(self.path.clone(), hook_id));
if self.hooks.mark_local_end(&sender_key) {
self.hooks.remove_active(&sender_key);
if self.hooks.mark_local_end(&local_hook_key) {
self.hooks.remove_active(&local_hook_key);
}
}
+18
View File
@@ -13,15 +13,19 @@ use crate::protocol::{
use super::super::{CompiledRoutes, HookTable, RouteDecision};
/// Registration state for a direct child endpoint.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
Unregistered,
Registered,
}
/// Routing metadata for one direct child endpoint.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChildRoute {
/// Absolute path for the child endpoint inside the protocol tree.
pub path: Vec<String>,
/// Whether this child currently participates in routing decisions.
pub state: ConnectionState,
}
@@ -35,12 +39,16 @@ impl ChildRoute {
}
}
/// Procedures exposed by a named leaf attached to this endpoint.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LeafSpec {
/// Leaf identifier used in packet headers.
pub name: String,
/// Procedures this leaf accepts.
pub procedures: Vec<String>,
}
/// Where an inbound frame entered this endpoint.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Ingress {
Parent,
@@ -48,6 +56,7 @@ pub enum Ingress {
Local,
}
/// Event produced when the endpoint handles a packet locally.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LocalEvent {
Call {
@@ -64,10 +73,14 @@ pub enum LocalEvent {
},
}
/// Result of processing a frame or building a locally-sent packet.
#[derive(Debug, Default)]
pub struct EndpointOutcome {
/// Frame to forward, together with the next routing decision.
pub forward: Option<(RouteDecision, FrameBytes)>,
/// Locally-delivered protocol event.
pub event: Option<LocalEvent>,
/// Whether the packet was intentionally discarded.
pub dropped: bool,
}
@@ -100,6 +113,7 @@ impl EndpointOutcome {
}
}
/// Error surfaced while validating or encoding protocol frames.
#[derive(Debug)]
pub enum EndpointError {
Frame(FrameError),
@@ -129,9 +143,12 @@ impl From<ValidationError> for EndpointError {
}
}
/// Minimal interface implemented by protocol-tree endpoints.
pub trait Endpoint {
/// Returns this endpoint's absolute path.
fn path(&self) -> &[String];
/// Processes one inbound frame from the given ingress.
fn receive(
&mut self,
ingress: &Ingress,
@@ -139,6 +156,7 @@ pub trait Endpoint {
) -> Result<EndpointOutcome, EndpointError>;
}
/// Runtime state for one endpoint in the protocol tree.
#[derive(Debug, Default)]
pub struct ProtocolEndpoint {
pub(crate) path: Vec<String>,
+5
View File
@@ -61,6 +61,8 @@ impl ProtocolEndpoint {
};
if active.peer_path != header.src_path {
// A reused hook id from the wrong peer is treated as terminal for this hook,
// because the endpoint can no longer trust future traffic on it.
self.hooks.remove_active(&key);
return Ok(EndpointOutcome::event(LocalEvent::Fault {
header: PacketHeader {
@@ -77,6 +79,7 @@ impl ProtocolEndpoint {
}
if active.procedure_id != message.procedure_id {
// Data frames stay bound to the procedure chosen by the original call.
return Ok(EndpointOutcome::dropped());
}
@@ -127,6 +130,8 @@ impl ProtocolEndpoint {
pub(crate) fn valid_source_for_ingress(&self, ingress: &Ingress, src_path: &[String]) -> bool {
match ingress {
Ingress::Parent => {
// Parent ingress may carry packets from ancestors, siblings, or the endpoint
// itself, but not from descendants pretending to be upstream.
if src_path.len() < self.path.len() {
return true;
}
+3 -2
View File
@@ -21,7 +21,7 @@ impl ProtocolEndpoint {
return Ok(EndpointOutcome::dropped());
};
let payload = if let Some(leaf_name) = &header.dst_leaf {
let response_payload = if let Some(leaf_name) = &header.dst_leaf {
let Some(leaf) = self.leaves.get(leaf_name) else {
return self.emit_fault_if_possible(Some(key), ProtocolFault::UNKNOWN_LEAF);
};
@@ -61,10 +61,11 @@ impl ProtocolEndpoint {
};
let response = DataMessage {
procedure_id: String::new(),
data: payload,
data: response_payload,
end_hook: true,
};
// Introspection always completes in a single response frame.
if self.hooks.mark_local_end(&key) {
self.hooks.remove_active(&key);
}
+5 -1
View File
@@ -1,4 +1,8 @@
//! Endpoint runtime and traits.
//! Protocol-tree endpoint runtime.
//!
//! This module holds the state machine that validates ingress, decides whether a
//! packet should be handled locally or forwarded, and manages hook lifetimes for
//! call/data/fault exchanges.
mod builders;
mod core;
+21 -13
View File
@@ -12,6 +12,21 @@ use super::core::{
};
impl ProtocolEndpoint {
fn supports_local_procedure(&self, dst_leaf: Option<&str>, procedure_id: &str) -> bool {
match dst_leaf {
Some(leaf_name) => self
.leaves
.get(leaf_name)
.map(|leaf| {
leaf.procedures
.iter()
.any(|procedure| procedure == procedure_id)
})
.unwrap_or(false),
None => self.endpoint_procedures.contains(procedure_id),
}
}
pub(crate) fn handle_local_call(
&mut self,
header: crate::protocol::PacketHeader,
@@ -26,20 +41,10 @@ impl ProtocolEndpoint {
return self.handle_introspection(&header, key);
}
let supported = match &header.dst_leaf {
Some(leaf_name) => self
.leaves
.get(leaf_name)
.map(|leaf| {
leaf.procedures
.iter()
.any(|procedure| procedure == &message.procedure_id)
})
.unwrap_or(false),
None => self.endpoint_procedures.contains(&message.procedure_id),
};
let procedure_is_supported =
self.supports_local_procedure(header.dst_leaf.as_deref(), &message.procedure_id);
if !supported {
if !procedure_is_supported {
let fault = if header
.dst_leaf
.as_ref()
@@ -94,6 +99,9 @@ impl Endpoint for ProtocolEndpoint {
match header.packet_type {
PacketType::Call => {
// Calls only enter from the parent side of the tree or from the endpoint
// itself. Children can return data/faults, but they do not initiate new
// calls through this node.
if !matches!(ingress, Ingress::Parent | Ingress::Local) {
return Ok(EndpointOutcome::dropped());
}