mirror of
https://github.com/Astatin3/unshell.git
synced 2026-06-09 06:47:59 -06:00
Split protocol and leaf surfaces into crates
Move the protocol runtime into unshell-protocol and remote shell leaf code into unshell-leaves so endpoint and TUI roles can compile independently without circular dependencies.
This commit is contained in:
@@ -0,0 +1,357 @@
|
||||
//! Packet builders and endpoint construction.
|
||||
|
||||
use alloc::{collections::BTreeSet, string::String, vec::Vec};
|
||||
|
||||
use crate::protocol::tree::{HookKey, PendingHook};
|
||||
use crate::protocol::{
|
||||
CallMessage, DataMessage, FrameBytes, HookTarget, PacketHeader, PacketType, ValidationError,
|
||||
encode_packet, validate_call, validate_header, validate_procedure_id,
|
||||
};
|
||||
|
||||
use super::super::{CompiledRoutes, RouteDecision};
|
||||
use super::core::{ChildRoute, EndpointError, EndpointOutcome, ProtocolEndpoint};
|
||||
use crate::protocol::tree::LeafSpec;
|
||||
|
||||
impl ProtocolEndpoint {
|
||||
fn prepare_call(
|
||||
&self,
|
||||
dst_path: Vec<String>,
|
||||
dst_leaf: Option<String>,
|
||||
procedure_id: impl Into<String>,
|
||||
response_hook_id: Option<u64>,
|
||||
data: Vec<u8>,
|
||||
) -> Result<(PacketHeader, CallMessage), EndpointError> {
|
||||
let procedure_id = procedure_id.into();
|
||||
validate_procedure_id(&procedure_id)?;
|
||||
|
||||
let response_hook = response_hook_id.map(|hook_id| HookTarget {
|
||||
hook_id,
|
||||
return_path: self.path.clone(),
|
||||
});
|
||||
let header = PacketHeader {
|
||||
packet_type: PacketType::Call,
|
||||
src_path: self.path.clone(),
|
||||
dst_path,
|
||||
dst_leaf,
|
||||
hook_id: None,
|
||||
};
|
||||
let call = CallMessage {
|
||||
procedure_id,
|
||||
data,
|
||||
response_hook,
|
||||
};
|
||||
|
||||
validate_header(&header)?;
|
||||
validate_call(&header, &call)?;
|
||||
Ok((header, call))
|
||||
}
|
||||
|
||||
fn prepare_data(
|
||||
&self,
|
||||
dst_path: Vec<String>,
|
||||
hook_id: u64,
|
||||
procedure_id: impl Into<String>,
|
||||
data: Vec<u8>,
|
||||
end_hook: bool,
|
||||
) -> Result<(PacketHeader, DataMessage), EndpointError> {
|
||||
let procedure_id = procedure_id.into();
|
||||
validate_procedure_id(&procedure_id)?;
|
||||
|
||||
let header = PacketHeader {
|
||||
packet_type: PacketType::Data,
|
||||
src_path: self.path.clone(),
|
||||
dst_path,
|
||||
dst_leaf: None,
|
||||
hook_id: Some(hook_id),
|
||||
};
|
||||
let message = DataMessage {
|
||||
procedure_id,
|
||||
data,
|
||||
end_hook,
|
||||
};
|
||||
|
||||
validate_header(&header)?;
|
||||
Ok((header, message))
|
||||
}
|
||||
|
||||
fn register_outbound_call_hook(
|
||||
&mut self,
|
||||
header: &PacketHeader,
|
||||
call: &CallMessage,
|
||||
) -> Result<(), EndpointError> {
|
||||
// Outbound calls reserve their response hook before the frame is emitted so
|
||||
// the endpoint can attribute returned Fault packets even before the callee
|
||||
// accepts the call. The hook only becomes active once valid hook traffic
|
||||
// comes back from the expected peer.
|
||||
if let Some(hook) = &call.response_hook
|
||||
&& let key = HookKey::new(hook.return_path.clone(), hook.hook_id)
|
||||
&& self
|
||||
.hooks
|
||||
.insert_pending(
|
||||
key,
|
||||
PendingHook {
|
||||
caller_src_path: header.dst_path.clone(),
|
||||
procedure_id: call.procedure_id.clone(),
|
||||
local_ended: false,
|
||||
},
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
return Err(EndpointError::Validation(ValidationError::InvalidHookId));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
/// Creates an endpoint with compiled routing tables for its current topology.
|
||||
///
|
||||
/// `parent_path` is currently used only as a presence flag. The endpoint stores its own
|
||||
/// absolute `path`, and routing only needs to know whether an upward route exists.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::{ChildRoute, LeafSpec, ProtocolEndpoint};
|
||||
/// let endpoint = ProtocolEndpoint::new(
|
||||
/// vec!["worker".into()],
|
||||
/// Some(Vec::new()),
|
||||
/// vec![ChildRoute::registered(vec!["worker".into(), "child".into()])],
|
||||
/// vec![LeafSpec {
|
||||
/// name: "service".into(),
|
||||
/// procedures: vec!["example.service.v1.invoke".into()],
|
||||
/// }],
|
||||
/// );
|
||||
/// let _ = endpoint;
|
||||
/// ```
|
||||
pub fn new(
|
||||
path: Vec<String>,
|
||||
parent_path: Option<Vec<String>>,
|
||||
children: Vec<ChildRoute>,
|
||||
leaves: Vec<LeafSpec>,
|
||||
) -> Self {
|
||||
let registered_child_paths = children
|
||||
.iter()
|
||||
.filter(|child| child.registered)
|
||||
.map(|child| child.path.clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Self {
|
||||
routing: CompiledRoutes::new(&path, ®istered_child_paths, parent_path.is_some()),
|
||||
path,
|
||||
children,
|
||||
leaves: leaves
|
||||
.into_iter()
|
||||
.map(|leaf| (leaf.name.clone(), leaf))
|
||||
.collect(),
|
||||
endpoint_procedures: BTreeSet::new(),
|
||||
hooks: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Registers a procedure that is handled directly by the endpoint.
|
||||
///
|
||||
/// Endpoint-level procedures exist for protocol services that are not attached to one leaf,
|
||||
/// such as built-in runtime behavior.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::ProtocolEndpoint;
|
||||
/// let mut endpoint = ProtocolEndpoint::new(Vec::new(), None, Vec::new(), Vec::new());
|
||||
/// endpoint.add_endpoint_procedure("example.endpoint.v1.health")?;
|
||||
/// # Ok::<(), unshell::protocol::tree::EndpointError>(())
|
||||
/// ```
|
||||
pub fn add_endpoint_procedure(
|
||||
&mut self,
|
||||
procedure_id: impl Into<String>,
|
||||
) -> Result<(), EndpointError> {
|
||||
let procedure_id = procedure_id.into();
|
||||
validate_procedure_id(&procedure_id)?;
|
||||
self.endpoint_procedures.insert(procedure_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
/// Allocates a hook id scoped to this endpoint path.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::ProtocolEndpoint;
|
||||
/// let mut endpoint = ProtocolEndpoint::new(Vec::new(), None, Vec::new(), Vec::new());
|
||||
/// let hook_id = endpoint.allocate_hook_id();
|
||||
/// assert_ne!(hook_id, 0);
|
||||
/// ```
|
||||
pub fn allocate_hook_id(&mut self) -> u64 {
|
||||
self.hooks.allocate_hook_id(&self.path)
|
||||
}
|
||||
|
||||
/// Encodes a call frame without routing it through the local endpoint.
|
||||
///
|
||||
/// This exists for callers that want a fully encoded outbound frame while handling transport
|
||||
/// themselves.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::ProtocolEndpoint;
|
||||
/// let mut endpoint = ProtocolEndpoint::new(Vec::new(), None, Vec::new(), Vec::new());
|
||||
/// let frame = endpoint.make_call(
|
||||
/// vec!["worker".into()],
|
||||
/// Some("service".into()),
|
||||
/// "example.service.v1.invoke",
|
||||
/// None,
|
||||
/// vec![1, 2, 3],
|
||||
/// )?;
|
||||
/// assert!(!frame.is_empty());
|
||||
/// # Ok::<(), unshell::protocol::tree::EndpointError>(())
|
||||
/// ```
|
||||
pub fn make_call(
|
||||
&mut self,
|
||||
dst_path: Vec<String>,
|
||||
dst_leaf: Option<String>,
|
||||
procedure_id: impl Into<String>,
|
||||
response_hook_id: Option<u64>,
|
||||
data: Vec<u8>,
|
||||
) -> Result<FrameBytes, EndpointError> {
|
||||
let (header, call) =
|
||||
self.prepare_call(dst_path, dst_leaf, procedure_id, response_hook_id, data)?;
|
||||
self.register_outbound_call_hook(&header, &call)?;
|
||||
Ok(encode_packet(&header, &call)?)
|
||||
}
|
||||
|
||||
/// Builds and immediately routes a call, producing either a forward or a local event.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::{ChildRoute, EndpointOutcome, ProtocolEndpoint};
|
||||
/// let mut endpoint = ProtocolEndpoint::new(
|
||||
/// Vec::new(),
|
||||
/// None,
|
||||
/// vec![ChildRoute::registered(vec!["worker".into()])],
|
||||
/// Vec::new(),
|
||||
/// );
|
||||
/// let outcome = endpoint.send_call(
|
||||
/// vec!["worker".into()],
|
||||
/// Some("service".into()),
|
||||
/// "example.service.v1.invoke",
|
||||
/// None,
|
||||
/// vec![],
|
||||
/// )?;
|
||||
/// assert!(matches!(outcome, EndpointOutcome::Forward { .. } | EndpointOutcome::Dropped | EndpointOutcome::Local(_)));
|
||||
/// # Ok::<(), unshell::protocol::tree::EndpointError>(())
|
||||
/// ```
|
||||
pub fn send_call(
|
||||
&mut self,
|
||||
dst_path: Vec<String>,
|
||||
dst_leaf: Option<String>,
|
||||
procedure_id: impl Into<String>,
|
||||
response_hook_id: Option<u64>,
|
||||
data: Vec<u8>,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
let (header, call) =
|
||||
self.prepare_call(dst_path, dst_leaf, procedure_id, response_hook_id, data)?;
|
||||
self.register_outbound_call_hook(&header, &call)?;
|
||||
|
||||
match self.decide_route(&header.dst_path) {
|
||||
RouteDecision::Local => self.handle_local_call(header, call),
|
||||
RouteDecision::Drop => {
|
||||
self.rollback_pending_call_hook(&call);
|
||||
Ok(EndpointOutcome::Dropped)
|
||||
}
|
||||
route => Ok(EndpointOutcome::Forward {
|
||||
route,
|
||||
frame: encode_packet(&header, &call)?,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Encodes a data frame without routing it through the local endpoint.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::ProtocolEndpoint;
|
||||
/// let endpoint = ProtocolEndpoint::new(Vec::new(), None, Vec::new(), Vec::new());
|
||||
/// let frame = endpoint.make_data(vec!["root".into()], 7, "example.service.v1.invoke", vec![1], false)?;
|
||||
/// assert!(!frame.is_empty());
|
||||
/// # Ok::<(), unshell::protocol::tree::EndpointError>(())
|
||||
/// ```
|
||||
pub fn make_data(
|
||||
&self,
|
||||
dst_path: Vec<String>,
|
||||
hook_id: u64,
|
||||
procedure_id: impl Into<String>,
|
||||
data: Vec<u8>,
|
||||
end_hook: bool,
|
||||
) -> Result<FrameBytes, EndpointError> {
|
||||
let (header, message) =
|
||||
self.prepare_data(dst_path, hook_id, procedure_id, data, end_hook)?;
|
||||
Ok(encode_packet(&header, &message)?)
|
||||
}
|
||||
|
||||
/// Builds and immediately routes a data packet, updating local hook state for end-of-stream.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::ProtocolEndpoint;
|
||||
/// let mut endpoint = ProtocolEndpoint::new(Vec::new(), None, Vec::new(), Vec::new());
|
||||
/// let _ = endpoint.send_data(vec!["root".into()], 7, "example.service.v1.invoke", vec![], false);
|
||||
/// # Ok::<(), unshell::protocol::tree::EndpointError>(())
|
||||
/// ```
|
||||
pub fn send_data(
|
||||
&mut self,
|
||||
dst_path: Vec<String>,
|
||||
hook_id: u64,
|
||||
procedure_id: impl Into<String>,
|
||||
data: Vec<u8>,
|
||||
end_hook: bool,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
if let Some(active_key) = self
|
||||
.hooks
|
||||
.resolve_active_key(&dst_path, hook_id, &self.path)
|
||||
&& self
|
||||
.hooks
|
||||
.active(&active_key)
|
||||
.is_some_and(|active| active.local_ended)
|
||||
{
|
||||
return Err(EndpointError::Validation(ValidationError::HookInvariant(
|
||||
"local side already closed this hook",
|
||||
)));
|
||||
}
|
||||
|
||||
let local_end_dst_path = dst_path.clone();
|
||||
let host_key = HookKey::new(self.path.clone(), hook_id);
|
||||
let (header, message) =
|
||||
self.prepare_data(dst_path, hook_id, procedure_id, data, end_hook)?;
|
||||
|
||||
if end_hook {
|
||||
self.mark_local_stream_end(&local_end_dst_path, hook_id, &host_key);
|
||||
}
|
||||
|
||||
match self.decide_route(&header.dst_path) {
|
||||
RouteDecision::Local => self.handle_local_data(header, message),
|
||||
RouteDecision::Drop => Ok(EndpointOutcome::Dropped),
|
||||
route => Ok(EndpointOutcome::Forward {
|
||||
route,
|
||||
frame: encode_packet(&header, &message)?,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn rollback_pending_call_hook(&mut self, call: &CallMessage) {
|
||||
if let Some(hook) = &call.response_hook {
|
||||
self.hooks
|
||||
.remove_pending(&HookKey::new(hook.return_path.clone(), hook.hook_id));
|
||||
}
|
||||
}
|
||||
|
||||
fn mark_local_stream_end(&mut self, dst_path: &[String], hook_id: u64, host_key: &HookKey) {
|
||||
// Locally-originated streams may not have been resolved against a peer yet, so fall
|
||||
// back to the endpoint's own hook key shape when closing them.
|
||||
let local_hook_key = self
|
||||
.hooks
|
||||
.resolve_active_key(dst_path, hook_id, &self.path)
|
||||
.unwrap_or_else(|| host_key.clone());
|
||||
if self.hooks.pending(host_key).is_some() {
|
||||
self.hooks.mark_pending_local_end(host_key);
|
||||
} else if self.hooks.mark_local_end(&local_hook_key) {
|
||||
self.hooks.remove_active(&local_hook_key);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,295 @@
|
||||
//! Core endpoint state and externally visible types.
|
||||
|
||||
use alloc::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
string::String,
|
||||
vec::Vec,
|
||||
};
|
||||
use core::fmt;
|
||||
|
||||
use crate::protocol::{
|
||||
CallMessage, DataMessage, FaultMessage, FrameBytes, FrameError, PacketHeader, ValidationError,
|
||||
};
|
||||
|
||||
use super::super::{CompiledRoutes, HookKey, HookTable, RouteDecision};
|
||||
|
||||
/// Routing metadata for one direct child endpoint.
|
||||
///
|
||||
/// This exists so one endpoint can distinguish topology from registration state. A child path may
|
||||
/// be known structurally while still being excluded from route decisions.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::ChildRoute;
|
||||
/// let route = ChildRoute::registered(vec!["root".into(), "worker".into()]);
|
||||
/// assert!(route.registered);
|
||||
/// ```
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ChildRoute {
|
||||
/// Absolute path for the child endpoint inside the protocol tree.
|
||||
pub path: Vec<String>,
|
||||
/// Whether this child currently participates in routing decisions.
|
||||
pub registered: bool,
|
||||
}
|
||||
|
||||
impl ChildRoute {
|
||||
#[must_use]
|
||||
/// Builds one child route that is immediately eligible for routing decisions.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::ChildRoute;
|
||||
/// let route = ChildRoute::registered(vec!["worker".into()]);
|
||||
/// assert!(route.registered);
|
||||
/// ```
|
||||
pub fn registered(path: Vec<String>) -> Self {
|
||||
Self {
|
||||
path,
|
||||
registered: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Procedures exposed by a named leaf attached to this endpoint.
|
||||
///
|
||||
/// This exists so endpoint construction can advertise one leaf's callable procedure ids up front,
|
||||
/// before any runtime packets arrive.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::LeafSpec;
|
||||
/// let leaf = LeafSpec {
|
||||
/// name: "service".into(),
|
||||
/// procedures: vec!["example.service.v1.invoke".into()],
|
||||
/// };
|
||||
/// assert_eq!(leaf.procedures.len(), 1);
|
||||
/// ```
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct LeafSpec {
|
||||
/// Leaf identifier used in packet headers.
|
||||
pub name: String,
|
||||
/// Procedures this leaf accepts.
|
||||
pub procedures: Vec<String>,
|
||||
}
|
||||
|
||||
/// Where an inbound frame entered this endpoint.
|
||||
///
|
||||
/// This exists because protocol validation depends on whether a packet arrived from the parent,
|
||||
/// one child subtree, or the endpoint itself.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::Ingress;
|
||||
/// let ingress = Ingress::Child(vec!["root".into(), "worker".into()]);
|
||||
/// assert!(matches!(ingress, Ingress::Child(_)));
|
||||
/// ```
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum Ingress {
|
||||
/// The frame arrived from the parent side of the tree.
|
||||
Parent,
|
||||
/// The frame arrived from one direct child, identified by that child's absolute path.
|
||||
Child(Vec<String>),
|
||||
/// The frame originated locally at this endpoint.
|
||||
Local,
|
||||
}
|
||||
|
||||
/// Event produced when the endpoint handles a packet locally.
|
||||
///
|
||||
/// This is the validated handoff boundary between transport/routing code and application-facing
|
||||
/// runtimes layered on top of `ProtocolEndpoint`.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::{CallMessage, PacketHeader, PacketType};
|
||||
/// use unshell::protocol::tree::LocalEvent;
|
||||
/// let event = LocalEvent::Call {
|
||||
/// header: PacketHeader {
|
||||
/// packet_type: PacketType::Call,
|
||||
/// src_path: vec!["root".into()],
|
||||
/// dst_path: vec!["worker".into()],
|
||||
/// dst_leaf: None,
|
||||
/// hook_id: None,
|
||||
/// },
|
||||
/// message: CallMessage {
|
||||
/// procedure_id: "example.invoke".into(),
|
||||
/// data: vec![],
|
||||
/// response_hook: None,
|
||||
/// },
|
||||
/// };
|
||||
/// assert!(matches!(event, LocalEvent::Call { .. }));
|
||||
/// ```
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum LocalEvent {
|
||||
/// One opening `Call` packet validated and delivered to local code.
|
||||
Call {
|
||||
/// Validated protocol header for the packet.
|
||||
header: PacketHeader,
|
||||
/// Deserialized call payload.
|
||||
message: CallMessage,
|
||||
},
|
||||
/// One hook-associated `Data` packet validated and delivered locally.
|
||||
Data {
|
||||
/// Validated protocol header for the packet.
|
||||
header: PacketHeader,
|
||||
/// Deserialized data payload.
|
||||
message: DataMessage,
|
||||
/// Canonical host-scoped hook key resolved for this hook stream.
|
||||
hook_key: HookKey,
|
||||
},
|
||||
/// One hook-associated `Fault` packet validated and delivered locally.
|
||||
Fault {
|
||||
/// Validated protocol header for the packet.
|
||||
header: PacketHeader,
|
||||
/// Deserialized fault payload.
|
||||
message: FaultMessage,
|
||||
/// Canonical host-scoped hook key resolved for this hook stream.
|
||||
hook_key: HookKey,
|
||||
},
|
||||
}
|
||||
|
||||
/// Result of processing a frame or building a locally-sent packet.
|
||||
///
|
||||
/// This exists so callers can distinguish forwarding, local delivery, and intentional drops
|
||||
/// without treating normal protocol routing outcomes as errors.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::FrameBytes;
|
||||
/// use unshell::protocol::tree::{EndpointOutcome, RouteDecision};
|
||||
/// let outcome = EndpointOutcome::Forward {
|
||||
/// route: RouteDecision::Parent,
|
||||
/// frame: FrameBytes::new(),
|
||||
/// };
|
||||
/// assert!(matches!(outcome, EndpointOutcome::Forward { .. }));
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub enum EndpointOutcome {
|
||||
/// Frame to forward, together with the next routing decision.
|
||||
Forward {
|
||||
/// The next routing decision chosen for the forwarded frame.
|
||||
route: RouteDecision,
|
||||
/// The encoded frame bytes to send along that route.
|
||||
frame: FrameBytes,
|
||||
},
|
||||
/// Locally-delivered protocol event.
|
||||
Local(LocalEvent),
|
||||
/// Packet intentionally discarded.
|
||||
Dropped,
|
||||
}
|
||||
|
||||
/// Error surfaced while validating or encoding protocol frames.
|
||||
///
|
||||
/// This exists so endpoint callers can preserve the distinction between malformed wire/archive
|
||||
/// data and semantic protocol invariant failures.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::{FrameError, ValidationError};
|
||||
/// use unshell::protocol::tree::EndpointError;
|
||||
/// let error = EndpointError::Frame(FrameError::Truncated);
|
||||
/// assert!(matches!(error, EndpointError::Frame(_)));
|
||||
/// let validation = EndpointError::Validation(ValidationError::InvalidHookId);
|
||||
/// assert!(matches!(validation, EndpointError::Validation(_)));
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub enum EndpointError {
|
||||
/// Framing, archive decode, or archive encode failed.
|
||||
Frame(FrameError),
|
||||
/// One protocol invariant failed validation.
|
||||
Validation(ValidationError),
|
||||
}
|
||||
|
||||
impl fmt::Display for EndpointError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Frame(error) => write!(f, "{error}"),
|
||||
Self::Validation(error) => write!(f, "{error}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl core::error::Error for EndpointError {}
|
||||
|
||||
impl From<FrameError> for EndpointError {
|
||||
fn from(value: FrameError) -> Self {
|
||||
Self::Frame(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ValidationError> for EndpointError {
|
||||
fn from(value: ValidationError) -> Self {
|
||||
Self::Validation(value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Minimal interface implemented by protocol-tree endpoints.
|
||||
///
|
||||
/// This exists so higher-level runtimes can depend on one small receive/path surface instead of a
|
||||
/// concrete endpoint implementation.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::{ChildRoute, Endpoint, Ingress, ProtocolEndpoint};
|
||||
/// let endpoint = ProtocolEndpoint::new(Vec::new(), None, vec![ChildRoute::registered(vec!["worker".into()])], Vec::new());
|
||||
/// assert_eq!(endpoint.path(), &Vec::<String>::new());
|
||||
/// let _ = Ingress::Local;
|
||||
/// ```
|
||||
pub trait Endpoint {
|
||||
/// Returns this endpoint's absolute path.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::{ChildRoute, Endpoint, ProtocolEndpoint};
|
||||
/// let endpoint = ProtocolEndpoint::new(Vec::new(), None, vec![ChildRoute::registered(vec!["worker".into()])], Vec::new());
|
||||
/// assert!(endpoint.path().is_empty());
|
||||
/// ```
|
||||
fn path(&self) -> &[String];
|
||||
|
||||
/// Processes one inbound frame from the given ingress.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::{CallMessage, PacketHeader, PacketType, encode_packet};
|
||||
/// use unshell::protocol::tree::{Endpoint, Ingress, ProtocolEndpoint};
|
||||
/// let mut endpoint = ProtocolEndpoint::new(vec!["worker".into()], Some(Vec::new()), Vec::new(), Vec::new());
|
||||
/// let frame = encode_packet(&PacketHeader {
|
||||
/// packet_type: PacketType::Call,
|
||||
/// src_path: Vec::new(),
|
||||
/// dst_path: vec!["worker".into()],
|
||||
/// dst_leaf: None,
|
||||
/// hook_id: None,
|
||||
/// }, &CallMessage {
|
||||
/// procedure_id: "example.invoke".into(),
|
||||
/// data: vec![],
|
||||
/// response_hook: None,
|
||||
/// })?;
|
||||
/// let _outcome = endpoint.receive(&Ingress::Parent, frame);
|
||||
/// # Ok::<(), unshell::protocol::FrameError>(())
|
||||
/// ```
|
||||
fn receive(
|
||||
&mut self,
|
||||
ingress: &Ingress,
|
||||
frame: FrameBytes,
|
||||
) -> Result<EndpointOutcome, EndpointError>;
|
||||
}
|
||||
|
||||
/// Runtime state for one endpoint in the protocol tree.
|
||||
///
|
||||
/// This exists as the central protocol node that owns route tables, local leaf metadata, and hook
|
||||
/// lifecycle state for one endpoint path.
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use unshell::protocol::tree::ProtocolEndpoint;
|
||||
/// let endpoint = ProtocolEndpoint::new(vec!["worker".into()], Some(Vec::new()), Vec::new(), Vec::new());
|
||||
/// let _ = endpoint;
|
||||
/// ```
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ProtocolEndpoint {
|
||||
pub(crate) path: Vec<String>,
|
||||
pub(crate) children: Vec<ChildRoute>,
|
||||
pub(crate) routing: CompiledRoutes,
|
||||
pub(crate) leaves: BTreeMap<String, LeafSpec>,
|
||||
pub(crate) endpoint_procedures: BTreeSet<String>,
|
||||
pub(crate) hooks: HookTable,
|
||||
}
|
||||
@@ -0,0 +1,163 @@
|
||||
//! Hook-state transitions and route helpers.
|
||||
|
||||
use alloc::string::String;
|
||||
|
||||
use crate::protocol::{
|
||||
DataMessage, FaultMessage, PacketHeader, PacketType, ProtocolFault, encode_packet,
|
||||
};
|
||||
|
||||
use super::super::{HookKey, RouteDecision};
|
||||
use super::core::{EndpointError, EndpointOutcome, Ingress, LocalEvent, ProtocolEndpoint};
|
||||
|
||||
impl ProtocolEndpoint {
|
||||
pub(crate) fn emit_fault_if_possible(
|
||||
&mut self,
|
||||
key: Option<HookKey>,
|
||||
fault: ProtocolFault,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
let Some(key) = key else {
|
||||
return Ok(EndpointOutcome::Dropped);
|
||||
};
|
||||
|
||||
self.hooks.remove_pending(&key);
|
||||
self.hooks.remove_active(&key);
|
||||
|
||||
let header = PacketHeader {
|
||||
packet_type: PacketType::Fault,
|
||||
src_path: self.path.clone(),
|
||||
dst_path: key.return_path.clone(),
|
||||
dst_leaf: None,
|
||||
hook_id: Some(key.hook_id),
|
||||
};
|
||||
let message = FaultMessage { fault };
|
||||
|
||||
match self.decide_route(&key.return_path) {
|
||||
RouteDecision::Local => Ok(EndpointOutcome::Local(LocalEvent::Fault {
|
||||
header,
|
||||
message,
|
||||
hook_key: key,
|
||||
})),
|
||||
route => Ok(EndpointOutcome::Forward {
|
||||
route,
|
||||
frame: encode_packet(&header, &message)?,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn handle_local_data(
|
||||
&mut self,
|
||||
header: PacketHeader,
|
||||
message: DataMessage,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
let hook_id = header.hook_id.expect("validated");
|
||||
let key = if let Some(key) =
|
||||
self.hooks
|
||||
.resolve_active_key(&self.path, hook_id, &header.src_path)
|
||||
{
|
||||
key
|
||||
} else {
|
||||
let pending_key = HookKey::new(self.path.clone(), hook_id);
|
||||
if self.hooks.pending(&pending_key).is_some_and(|pending| {
|
||||
pending.caller_src_path == header.src_path
|
||||
&& pending.procedure_id == message.procedure_id
|
||||
}) {
|
||||
self.hooks.activate_pending(&pending_key);
|
||||
pending_key
|
||||
} else {
|
||||
return Ok(EndpointOutcome::Dropped);
|
||||
}
|
||||
};
|
||||
|
||||
let Some(active) = self.hooks.active(&key) else {
|
||||
return Ok(EndpointOutcome::Dropped);
|
||||
};
|
||||
|
||||
if active.peer_path != header.src_path {
|
||||
// A reused hook id from the wrong peer is treated as terminal for this hook,
|
||||
// because the endpoint can no longer trust future traffic on it.
|
||||
self.hooks.remove_active(&key);
|
||||
return self.emit_fault_if_possible(Some(key), ProtocolFault::INVALID_HOOK_PEER);
|
||||
}
|
||||
|
||||
if active.procedure_id != message.procedure_id {
|
||||
// Data frames stay bound to the procedure chosen by the original call.
|
||||
// A procedure mismatch is dropped rather than faulted because the wrong peer may be
|
||||
// replaying stale traffic, and converting that into a terminal hook fault would let a
|
||||
// stray packet tear down an otherwise valid stream.
|
||||
return Ok(EndpointOutcome::Dropped);
|
||||
}
|
||||
|
||||
if message.end_hook && self.hooks.mark_peer_end(&key) {
|
||||
self.hooks.remove_active(&key);
|
||||
}
|
||||
|
||||
Ok(EndpointOutcome::Local(LocalEvent::Data {
|
||||
header,
|
||||
message,
|
||||
hook_key: key,
|
||||
}))
|
||||
}
|
||||
|
||||
pub(crate) fn handle_local_fault(
|
||||
&mut self,
|
||||
header: PacketHeader,
|
||||
message: FaultMessage,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
let hook_id = header.hook_id.expect("validated");
|
||||
if let Some(key) = self
|
||||
.hooks
|
||||
.resolve_active_key(&self.path, hook_id, &header.src_path)
|
||||
{
|
||||
self.hooks.remove_active(&key);
|
||||
return Ok(EndpointOutcome::Local(LocalEvent::Fault {
|
||||
header,
|
||||
message,
|
||||
hook_key: key,
|
||||
}));
|
||||
}
|
||||
|
||||
let pending_key = HookKey::new(self.path.clone(), hook_id);
|
||||
if self
|
||||
.hooks
|
||||
.pending(&pending_key)
|
||||
.is_some_and(|pending| pending.caller_src_path == header.src_path)
|
||||
{
|
||||
self.hooks.remove_pending(&pending_key);
|
||||
return Ok(EndpointOutcome::Local(LocalEvent::Fault {
|
||||
header,
|
||||
message,
|
||||
hook_key: pending_key,
|
||||
}));
|
||||
}
|
||||
|
||||
Ok(EndpointOutcome::Dropped)
|
||||
}
|
||||
|
||||
pub(crate) fn decide_route(&self, dst_path: &[String]) -> RouteDecision {
|
||||
self.routing.route(dst_path)
|
||||
}
|
||||
|
||||
/// Returns whether one `src_path` is topologically valid for the ingress side that delivered
|
||||
/// the frame.
|
||||
///
|
||||
/// Parent ingress may carry packets from ancestors, siblings, or the endpoint itself, but not
|
||||
/// from descendants pretending to be upstream. Child ingress may only carry packets from that
|
||||
/// child subtree, and local ingress must exactly match the endpoint path.
|
||||
pub(crate) fn valid_source_for_ingress(&self, ingress: &Ingress, src_path: &[String]) -> bool {
|
||||
match ingress {
|
||||
Ingress::Parent => {
|
||||
// Parent ingress may carry packets from ancestors, siblings, or the endpoint
|
||||
// itself, but not from descendants pretending to be upstream.
|
||||
if src_path.len() < self.path.len() {
|
||||
return true;
|
||||
}
|
||||
if src_path.len() == self.path.len() {
|
||||
return src_path == self.path;
|
||||
}
|
||||
!src_path.starts_with(&self.path)
|
||||
}
|
||||
Ingress::Child(child_path) => src_path.starts_with(child_path),
|
||||
Ingress::Local => src_path == self.path,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,103 @@
|
||||
//! Introspection response generation.
|
||||
|
||||
use alloc::{string::String, vec::Vec};
|
||||
use rkyv::{rancor::Error as RkyvError, to_bytes};
|
||||
|
||||
use crate::protocol::{
|
||||
DataMessage, EndpointIntrospection, FrameError, LeafIntrospection, LeafIntrospectionSummary,
|
||||
PacketHeader, PacketType, ProtocolFault, encode_packet,
|
||||
};
|
||||
|
||||
use super::super::HookKey;
|
||||
use super::core::{EndpointError, EndpointOutcome, ProtocolEndpoint};
|
||||
|
||||
impl ProtocolEndpoint {
|
||||
pub(crate) fn handle_introspection(
|
||||
&mut self,
|
||||
header: &PacketHeader,
|
||||
key: Option<HookKey>,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
let Some(key) = key else {
|
||||
return Ok(EndpointOutcome::Dropped);
|
||||
};
|
||||
|
||||
let response_payload = if let Some(leaf_name) = &header.dst_leaf {
|
||||
let Some(leaf) = self.leaves.get(leaf_name) else {
|
||||
return self.emit_fault_if_possible(Some(key), ProtocolFault::UNKNOWN_LEAF);
|
||||
};
|
||||
self.serialize_introspection(&LeafIntrospection {
|
||||
leaf_name: leaf_name.clone(),
|
||||
procedures: leaf.procedures.clone(),
|
||||
})?
|
||||
} else {
|
||||
self.serialize_introspection(&EndpointIntrospection {
|
||||
sub_endpoints: self.direct_registered_child_names(),
|
||||
leaves: self
|
||||
.leaves
|
||||
.values()
|
||||
.map(|leaf| LeafIntrospectionSummary {
|
||||
leaf_name: leaf.name.clone(),
|
||||
procedures: leaf.procedures.clone(),
|
||||
})
|
||||
.collect(),
|
||||
})?
|
||||
};
|
||||
|
||||
let response_header = PacketHeader {
|
||||
packet_type: PacketType::Data,
|
||||
src_path: self.path.clone(),
|
||||
dst_path: key.return_path.clone(),
|
||||
dst_leaf: None,
|
||||
hook_id: Some(key.hook_id),
|
||||
};
|
||||
let response = DataMessage {
|
||||
procedure_id: String::new(),
|
||||
data: response_payload,
|
||||
end_hook: true,
|
||||
};
|
||||
|
||||
// Introspection always completes in a single response frame.
|
||||
if self.hooks.mark_local_end(&key) {
|
||||
self.hooks.remove_active(&key);
|
||||
}
|
||||
|
||||
match self.decide_route(&key.return_path) {
|
||||
super::super::RouteDecision::Local => {
|
||||
Ok(EndpointOutcome::Local(super::core::LocalEvent::Data {
|
||||
header: response_header,
|
||||
message: response,
|
||||
hook_key: key,
|
||||
}))
|
||||
}
|
||||
route => Ok(EndpointOutcome::Forward {
|
||||
route,
|
||||
frame: encode_packet(&response_header, &response)?,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn direct_registered_child_names(&self) -> Vec<String> {
|
||||
self.children
|
||||
.iter()
|
||||
.filter(|child| child.registered)
|
||||
// Child routes store absolute endpoint paths. Index the first segment below the
|
||||
// current endpoint so discovery only reports direct descendants.
|
||||
.filter_map(|child| child.path.get(self.path.len()).cloned())
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn serialize_introspection<T>(&self, value: &T) -> Result<Vec<u8>, EndpointError>
|
||||
where
|
||||
T: for<'a> rkyv::Serialize<
|
||||
rkyv::api::high::HighSerializer<
|
||||
rkyv::util::AlignedVec,
|
||||
rkyv::ser::allocator::ArenaHandle<'a>,
|
||||
RkyvError,
|
||||
>,
|
||||
>,
|
||||
{
|
||||
to_bytes::<RkyvError>(value)
|
||||
.map_err(|error| EndpointError::Frame(FrameError::Serialize(error)))
|
||||
.map(|bytes| bytes.to_vec())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
//! Protocol-tree endpoint runtime.
|
||||
//!
|
||||
//! This module holds the state machine that validates ingress, decides whether a
|
||||
//! packet should be handled locally or forwarded, and manages hook lifetimes for
|
||||
//! call/data/fault exchanges.
|
||||
|
||||
mod builders;
|
||||
mod core;
|
||||
mod hooks;
|
||||
mod introspection;
|
||||
mod receive;
|
||||
|
||||
pub use core::{
|
||||
ChildRoute, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec, LocalEvent,
|
||||
ProtocolEndpoint,
|
||||
};
|
||||
@@ -0,0 +1,171 @@
|
||||
//! Packet ingress and local call dispatch.
|
||||
|
||||
use crate::protocol::types::{ArchivedCallMessage, ArchivedDataMessage, ArchivedFaultMessage};
|
||||
use crate::protocol::{
|
||||
CallMessage, ProtocolFault, decode_frame, deserialize_archived_bytes,
|
||||
introspection::INTROSPECTION_PROCEDURE_ID, validate_call, validate_header,
|
||||
};
|
||||
|
||||
use super::super::{ActiveHook, HookKey, RouteDecision};
|
||||
use super::core::{
|
||||
Endpoint, EndpointError, EndpointOutcome, Ingress, LocalEvent, ProtocolEndpoint,
|
||||
};
|
||||
|
||||
impl ProtocolEndpoint {
|
||||
fn local_procedure_fault(
|
||||
&self,
|
||||
dst_leaf: Option<&str>,
|
||||
procedure_id: &str,
|
||||
) -> Option<ProtocolFault> {
|
||||
match dst_leaf {
|
||||
Some(leaf_name) => match self.leaves.get(leaf_name) {
|
||||
Some(leaf) => (!leaf
|
||||
.procedures
|
||||
.iter()
|
||||
.any(|procedure| procedure == procedure_id))
|
||||
.then_some(ProtocolFault::UNKNOWN_PROCEDURE),
|
||||
None => Some(ProtocolFault::UNKNOWN_LEAF),
|
||||
},
|
||||
None => (!self.endpoint_procedures.contains(procedure_id))
|
||||
.then_some(ProtocolFault::UNKNOWN_PROCEDURE),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn handle_local_call(
|
||||
&mut self,
|
||||
header: crate::protocol::PacketHeader,
|
||||
message: CallMessage,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
let key = message
|
||||
.response_hook
|
||||
.as_ref()
|
||||
.map(|hook| HookKey::new(hook.return_path.clone(), hook.hook_id));
|
||||
|
||||
if message.procedure_id == INTROSPECTION_PROCEDURE_ID {
|
||||
return self.handle_introspection(&header, key);
|
||||
}
|
||||
|
||||
if let Some(fault) =
|
||||
self.local_procedure_fault(header.dst_leaf.as_deref(), &message.procedure_id)
|
||||
{
|
||||
return self.emit_fault_if_possible(key, fault);
|
||||
}
|
||||
|
||||
if let Some(hook) = &message.response_hook
|
||||
&& hook.return_path != self.path
|
||||
{
|
||||
// Calls targeting this endpoint may still ask another endpoint to host the response
|
||||
// hook. Only register a local active hook when the response path escapes this node.
|
||||
let Some(key) = key.clone() else {
|
||||
unreachable!("response_hook checked above");
|
||||
};
|
||||
if self
|
||||
.hooks
|
||||
.insert_active(
|
||||
key.clone(),
|
||||
ActiveHook {
|
||||
peer_path: header.src_path.clone(),
|
||||
procedure_id: message.procedure_id.clone(),
|
||||
local_ended: false,
|
||||
peer_ended: false,
|
||||
},
|
||||
)
|
||||
.is_err()
|
||||
{
|
||||
return self.emit_fault_if_possible(Some(key), ProtocolFault::INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(EndpointOutcome::Local(LocalEvent::Call { header, message }))
|
||||
}
|
||||
|
||||
fn receive_call(
|
||||
&mut self,
|
||||
ingress: &Ingress,
|
||||
parsed: crate::protocol::ParsedFrame<'_>,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
// Calls only enter from the parent side of the tree or from the endpoint itself.
|
||||
// Children can return data/faults, but they do not initiate new calls through this node.
|
||||
if !matches!(ingress, Ingress::Parent | Ingress::Local) {
|
||||
return Ok(EndpointOutcome::Dropped);
|
||||
}
|
||||
|
||||
let (header, payload) = parsed.into_parts();
|
||||
let message = deserialize_archived_bytes::<ArchivedCallMessage, CallMessage>(payload)?;
|
||||
validate_call(&header, &message)?;
|
||||
self.handle_local_call(header, message)
|
||||
}
|
||||
|
||||
fn receive_data(
|
||||
&mut self,
|
||||
parsed: crate::protocol::ParsedFrame<'_>,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
let (header, payload) = parsed.into_parts();
|
||||
let message = deserialize_archived_bytes::<
|
||||
ArchivedDataMessage,
|
||||
crate::protocol::DataMessage,
|
||||
>(payload)?;
|
||||
self.handle_local_data(header, message)
|
||||
}
|
||||
|
||||
fn receive_fault(
|
||||
&mut self,
|
||||
parsed: crate::protocol::ParsedFrame<'_>,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
let (header, payload) = parsed.into_parts();
|
||||
let message = deserialize_archived_bytes::<
|
||||
ArchivedFaultMessage,
|
||||
crate::protocol::FaultMessage,
|
||||
>(payload)?;
|
||||
self.handle_local_fault(header, message)
|
||||
}
|
||||
|
||||
fn forward_or_drop(
|
||||
route: RouteDecision,
|
||||
frame: crate::protocol::FrameBytes,
|
||||
) -> EndpointOutcome {
|
||||
match route {
|
||||
RouteDecision::Child(index) => EndpointOutcome::Forward {
|
||||
route: RouteDecision::Child(index),
|
||||
frame,
|
||||
},
|
||||
RouteDecision::Parent => EndpointOutcome::Forward {
|
||||
route: RouteDecision::Parent,
|
||||
frame,
|
||||
},
|
||||
RouteDecision::Drop => EndpointOutcome::Dropped,
|
||||
RouteDecision::Local => unreachable!("local routes are handled before forwarding"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Endpoint for ProtocolEndpoint {
|
||||
fn path(&self) -> &[alloc::string::String] {
|
||||
&self.path
|
||||
}
|
||||
|
||||
fn receive(
|
||||
&mut self,
|
||||
ingress: &Ingress,
|
||||
frame: crate::protocol::FrameBytes,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
let parsed = decode_frame(&frame)?;
|
||||
let header = parsed.header();
|
||||
validate_header(header)?;
|
||||
|
||||
if !self.valid_source_for_ingress(ingress, &header.src_path) {
|
||||
return Ok(EndpointOutcome::Dropped);
|
||||
}
|
||||
|
||||
let route = self.decide_route(&header.dst_path);
|
||||
if route != RouteDecision::Local {
|
||||
return Ok(Self::forward_or_drop(route, frame));
|
||||
}
|
||||
|
||||
match header.packet_type {
|
||||
crate::protocol::PacketType::Call => self.receive_call(ingress, parsed),
|
||||
crate::protocol::PacketType::Data => self.receive_data(parsed),
|
||||
crate::protocol::PacketType::Fault => self.receive_fault(parsed),
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user