Make new error structs, improve tests, remake file structure.

This commit is contained in:
Michael Mikovsky
2026-05-28 12:41:32 -06:00
parent 3973589a35
commit 65a7f675a9
14 changed files with 958 additions and 385 deletions
+20 -17
View File
@@ -23,10 +23,12 @@ pub mod logger;
/// proc-macro output and downstream code do not need a second migration. /// proc-macro output and downstream code do not need a second migration.
pub use unshell_protocol as protocol; pub use unshell_protocol as protocol;
/// Re-export the leaf library crate behind the historical `unshell::leaves` path // Re-export the leaf library crate behind the historical `unshell::leaves` path
// once the leaf crate is part of the active workspace again.
// pub use unshell_leaves as leaves; // pub use unshell_leaves as leaves;
/// Re-export the runtime crate behind the `unshell::runtime` path. // Re-export the runtime crate behind the `unshell::runtime` path once the runtime
// crate is part of the active workspace again.
// pub use unshell_runtime as runtime; // pub use unshell_runtime as runtime;
// pub use unshell_macros::{Procedure, leaf, procedures}; // pub use unshell_macros::{Procedure, leaf, procedures};
@@ -40,21 +42,22 @@ pub use unshell_protocol as protocol;
/// Why it exists: the common bootstrap case should not require callers to manually construct an /// Why it exists: the common bootstrap case should not require callers to manually construct an
/// empty path, `Vec<ChildRoute>`, and a `Vec<LeafSpec>` when they already have leaf host values. /// empty path, `Vec<ChildRoute>`, and a `Vec<LeafSpec>` when they already have leaf host values.
/// ///
// # Example /// # Example
// ```rust ///
// use unshell::{create_endpoint, leaf}; /// ```rust,ignore
// use unshell::protocol::tree::Endpoint; /// use unshell::{create_endpoint, leaf};
/// use unshell::protocol::tree::Endpoint;
// #[derive(Default)] ///
// struct DemoLeaf; /// #[derive(Default)]
/// struct DemoLeaf;
// #[leaf(id = "org.example.v1.demo", procedures = ["ping"], endpoint_struct = DemoLeaf)] ///
// struct Demo; /// #[leaf(id = "org.example.v1.demo", procedures = ["ping"], endpoint_struct = DemoLeaf)]
/// struct Demo;
// let endpoint = create_endpoint!("demo", DemoLeaf::default()); ///
// assert!(endpoint.path().is_empty()); /// let endpoint = create_endpoint!("demo", DemoLeaf::default());
// assert_eq!(endpoint.local_id(), Some("demo")); /// assert!(endpoint.path().is_empty());
// ``` /// assert_eq!(endpoint.local_id(), Some("demo"));
/// ```
#[macro_export] #[macro_export]
macro_rules! create_endpoint { macro_rules! create_endpoint {
($id:expr $(, $leaf:expr )* $(,)?) => {{ ($id:expr $(, $leaf:expr )* $(,)?) => {{
-9
View File
@@ -1,9 +0,0 @@
#[derive(Debug)]
pub enum EndpointError {
NoAbsoultePathYet,
IncorrectAbsolutePath,
RouteNotExist,
HookDuplicate,
HookNotExist,
}
+1 -6
View File
@@ -1,13 +1,8 @@
pub mod error;
mod routing; mod routing;
use alloc::{boxed::Box, vec::Vec}; use alloc::{boxed::Box, vec::Vec};
use crate::{ use crate::{ConnectionSet, HookMap, Leaf, Packet, Path, RouteMap};
leaf::Leaf,
packet::Packet,
types::{ConnectionSet, HookMap, Path, RouteMap},
};
pub struct Endpoint { pub struct Endpoint {
// This endpoint's identifier // This endpoint's identifier
+93 -71
View File
@@ -1,105 +1,127 @@
use crate::{ use crate::{Endpoint, EndpointError, Packet, RouteDirection};
endpoint::{Endpoint, error::EndpointError},
packet::Packet,
};
impl Endpoint { impl Endpoint {
/// Register an inbound packet and route it /// Register an inbound packet and route it through the local endpoint state.
///
/// Inbound transport data still uses the same local routing rules as packets
/// generated by leaves: local destinations are delivered to `inbound`, and
/// transit destinations are queued by their immediate next hop.
pub fn add_inbound(&mut self, packet: Packet) -> Result<(), EndpointError> { pub fn add_inbound(&mut self, packet: Packet) -> Result<(), EndpointError> {
// In case some leaf hasn't assigned the endpoint a path yet. self.route_packet(packet)
if self.path.is_empty() {
return Err(EndpointError::NoAbsoultePathYet);
}
// If the packet is routed towards this endpoint
if packet.path == *self.path {
// Get the last segment of the path
let local_id = self
.path
.last()
.cloned()
.ok_or(EndpointError::IncorrectAbsolutePath)?;
self.inbound.entry(local_id).or_default().push_back(packet);
Ok(())
} else {
let (next_hop, is_upward) = self.next_hop_for(&packet)?;
if !self.connections.contains(&(next_hop, is_upward)) {
return Err(EndpointError::RouteNotExist);
}
self.queue_outbound(packet, next_hop)
}
} }
/// Register an outbound packet produced locally and route it to the next queue.
///
/// This intentionally shares the same implementation as [`Self::add_inbound`]
/// so local leaf output and received transport packets cannot drift into subtly
/// different route semantics.
pub fn add_outbound(&mut self, packet: Packet) -> Result<(), EndpointError> { pub fn add_outbound(&mut self, packet: Packet) -> Result<(), EndpointError> {
// In case some leaf hasn't assigned the endpoint a path yet. self.route_packet(packet)
if self.path.is_empty() { }
return Err(EndpointError::NoAbsoultePathYet);
}
// If this packet is routed towards this node /// Route a packet by classifying its destination and mutating exactly one queue.
if packet.path == *self.path { ///
// Grab the last endpoint ID /// Hook cleanup is deliberately last. A packet with `end_hook = true` should not
/// tear down local hook state unless the packet has a valid route and is actually
/// queued for forwarding. The route branches are kept inline rather than using
/// an intermediate decision enum so size-focused builds have less structure to
/// optimize away.
fn route_packet(&mut self, packet: Packet) -> Result<(), EndpointError> {
self.ensure_path_is_set()?;
if packet.path == self.path {
let local_id = self let local_id = self
.path .path
.last() .last()
.cloned() .copied()
.ok_or(EndpointError::IncorrectAbsolutePath)?; .ok_or(EndpointError::EndpointPathUnset)?;
// Add it to the inbound queue
self.inbound.entry(local_id).or_default().push_back(packet); self.inbound.entry(local_id).or_default().push_back(packet);
return Ok(()); return Ok(());
} }
let (next_hop, is_upward) = self.next_hop_for(&packet)?;
if !self.connections.contains(&(next_hop, is_upward)) {
return Err(EndpointError::RouteNotExist);
}
self.queue_outbound(packet, next_hop)
}
fn queue_outbound(&mut self, packet: Packet, next_hop: u32) -> Result<(), EndpointError> {
if packet.end_hook {
self.hooks.remove(&packet.hook_id);
}
self.outbound.entry(next_hop).or_default().push_back(packet);
Ok(())
}
fn next_hop_for(&self, packet: &Packet) -> Result<(u32, bool), EndpointError> {
// Direction is derived from the local path. The packet never gets to declare // Direction is derived from the local path. The packet never gets to declare
// whether it is moving upward, because that would make the trust boundary spoofable. // whether it is moving upward, because that would make the trust boundary spoofable.
if packet.path.starts_with(&self.path) { if packet.path.starts_with(&self.path) {
let next_hop = packet let next_hop = packet
.path .path
.get(self.path.len()) .get(self.path.len())
.cloned() .copied()
.ok_or(EndpointError::IncorrectAbsolutePath)?; .ok_or(EndpointError::DestinationOutsideLocalTree)?;
Ok((next_hop, false)) self.ensure_registered_connection(next_hop, RouteDirection::Downward)?;
} else if self.path.starts_with(&packet.path) { self.queue_outbound(packet, next_hop, RouteDirection::Downward);
// SECURITY: All upward-routed packets must be checked against local hook state. return Ok(());
}
if self.path.starts_with(&packet.path) {
// Upward-routed packets must be tied to local hook state. Otherwise a
// peer could forge a packet to an ancestor by choosing an older path.
if !self.hooks.contains_key(&packet.hook_id) { if !self.hooks.contains_key(&packet.hook_id) {
return Err(EndpointError::HookNotExist); return Err(EndpointError::UnknownHook {
hook_id: packet.hook_id,
});
} }
let parent_index = self let parent_index = self
.path .path
.len() .len()
.checked_sub(2) .checked_sub(2)
.ok_or(EndpointError::RouteNotExist)?; .ok_or(EndpointError::MissingParentRoute)?;
Ok((self.path[parent_index], true)) let next_hop = self.path[parent_index];
self.ensure_registered_connection(next_hop, RouteDirection::Upward)?;
self.queue_outbound(packet, next_hop, RouteDirection::Upward);
return Ok(());
}
Err(EndpointError::DestinationOutsideLocalTree)
}
/// Reject routing before path-relative decisions when no absolute path is known.
///
/// This preserves the current runtime sentinel where an empty path means the
/// endpoint has not been attached to the tree yet.
fn ensure_path_is_set(&self) -> Result<(), EndpointError> {
if self.path.is_empty() {
Err(EndpointError::EndpointPathUnset)
} else { } else {
Err(EndpointError::IncorrectAbsolutePath) Ok(())
} }
} }
/// Verify that the derived adjacent endpoint is registered in this direction.
///
/// The current connection table stores direction as a boolean. Keeping the bool
/// conversion here confines that legacy representation to one place in routing.
fn ensure_registered_connection(
&self,
next_hop: u32,
direction: RouteDirection,
) -> Result<(), EndpointError> {
let is_upward = matches!(direction, RouteDirection::Upward);
if self.connections.contains(&(next_hop, is_upward)) {
Ok(())
} else {
Err(EndpointError::MissingConnection {
next_hop,
direction,
})
}
}
/// Queue `packet` after all route validation has already succeeded.
///
/// `end_hook` closes local hook state only when hook traffic is moving upward
/// toward the hook host. Downward calls may carry a response hook id, but that
/// id is only a promise for future upward traffic and must not delete local
/// state if it happens to collide with an existing hook id.
fn queue_outbound(&mut self, packet: Packet, next_hop: u32, direction: RouteDirection) {
if matches!(direction, RouteDirection::Upward) && packet.end_hook {
self.hooks.remove(&packet.hook_id);
}
self.outbound.entry(next_hop).or_default().push_back(packet);
}
} }
+134
View File
@@ -0,0 +1,134 @@
/// Direction across the next local routing boundary.
///
/// The endpoint derives this from its own absolute path and the packet's
/// destination path. Packets are never trusted to declare their direction because
/// that would let an untrusted peer spoof the local routing boundary.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RouteDirection {
/// The packet moves toward this endpoint's direct parent.
Upward,
/// The packet moves toward one of this endpoint's direct children.
Downward,
}
/// Top-level endpoint failure for packet conversion and local routing.
///
/// These are local processing failures, not protocol fault packets. A transport or
/// leaf may choose to drop the packet, log it, or translate it into a higher-level
/// fault depending on where the packet came from. Route variants stay flat so the
/// hot route path does not need a second nested enum just to explain the failure.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EndpointError {
/// This endpoint cannot route because its absolute path has not been assigned.
///
/// The current runtime uses an empty path as "not initialized". If the protocol
/// later supports an empty root path, route initialization should become an
/// explicit flag instead of being inferred from `path.is_empty()`.
EndpointPathUnset,
/// The packet destination is not local, below this endpoint, or above this endpoint.
///
/// This catches sideways or forged paths, for example local `/a/b` receiving a
/// packet addressed to `/a/c`.
DestinationOutsideLocalTree,
/// A route points upward, but this endpoint has no parent segment to forward to.
///
/// This means the path topology is internally inconsistent for upward routing.
MissingParentRoute,
/// The packet needs a registered connection for the computed next hop, but none exists.
///
/// Route derivation succeeded. Delivery fails only because the local connection
/// table does not contain the adjacent endpoint in the required direction.
MissingConnection {
/// Adjacent endpoint that should receive the packet next.
next_hop: u32,
/// Direction that the local connection must be registered for.
direction: RouteDirection,
},
/// The packet is trying to move upward without known hook state.
///
/// Upward hook traffic is gated by local hook state so a peer cannot forge a
/// return path just by choosing an ancestor destination.
UnknownHook {
/// Hook id claimed by the upward packet.
hook_id: u16,
},
/// A packet could not be converted into bytes for transport.
///
/// Endpoint-level code that drains outbound queues often wants one error type
/// for both routing and framing. Keeping the source error preserves the exact
/// packet-size invariant that failed.
PacketSerialize {
/// Exact packet serialization failure.
source: SerializeError,
},
/// Incoming bytes could not be parsed into a packet.
///
/// This represents a frame rejection before routing begins. The source error is
/// retained so callers can distinguish truncation from malformed body fields.
PacketDeserialize {
/// Exact packet deserialization failure.
source: DeserializeError,
},
}
/// Errors produced while converting a [`Packet`] into its wire representation.
///
/// These failures are size-bound checks rather than transport errors. They protect
/// the length fields in the frame from integer overflow or values that cannot be
/// represented by the protocol's current `u32` length fields.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SerializeError {
/// The packet path contains more bytes than the frame length field can represent.
PathTooLarge,
/// The procedure identifier is too large to encode in a `u32` length field.
ProcIdTooLarge,
/// The body section is too large to encode in a `u32` length field.
BodyTooLarge,
}
/// Errors produced while parsing a [`Packet`] from untrusted wire bytes.
///
/// Deserialization rejects partial, inconsistent, or invalid UTF-8 frames before
/// endpoint routing sees them. Keeping these separate from route failures makes it
/// clear whether a packet failed before or after it became structured data.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeserializeError {
/// The buffer ended before the parser could read the required field.
BufferTooShort,
/// The advertised body length does not fit inside the provided buffer.
BodyLengthMismatch,
/// The path length overflowed while computing the path byte range.
PathTooLong,
/// The procedure id length overflowed while computing the body byte range.
ProcIdTooLong,
/// The encoded procedure id was not valid UTF-8.
InvalidUtf8,
}
impl From<SerializeError> for EndpointError {
/// Wraps packet serialization failures for endpoint-level callers.
fn from(source: SerializeError) -> Self {
Self::PacketSerialize { source }
}
}
impl From<DeserializeError> for EndpointError {
/// Wraps packet deserialization failures for endpoint-level callers.
fn from(source: DeserializeError) -> Self {
Self::PacketDeserialize { source }
}
}
-9
View File
@@ -1,9 +0,0 @@
use crate::endpoint::Endpoint;
pub trait Leaf {
// Identifier for this leaf
fn get_id(&self) -> u32;
// Gets called every program loop
fn update(&mut self, _: &mut Endpoint);
}
+33 -5
View File
@@ -2,10 +2,38 @@
extern crate alloc; extern crate alloc;
pub mod endpoint; mod endpoint;
pub mod leaf; mod error;
pub mod packet; mod packet;
mod types;
pub use endpoint::Endpoint;
pub use error::*;
pub use packet::Packet;
pub trait Leaf {
// Identifier for this leaf
fn get_id(&self) -> u32;
// Gets called every program loop
fn update(&mut self, _: &mut Endpoint);
}
// Various named types used for brevity
use alloc::{
collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque},
vec::Vec,
};
type Path = Vec<u32>;
type EndpointName = u32;
type HookID = u16;
type ConnectionSet = BTreeSet<(EndpointName, bool)>;
type HookMap = BTreeMap<HookID, EndpointName>;
type PacketQueue = VecDeque<Packet>;
type RouteMap = BTreeMap<EndpointName, PacketQueue>;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests {
mod oneshot;
mod packet;
}
@@ -1,11 +1,10 @@
#[cfg(test)]
mod tests;
extern crate alloc; extern crate alloc;
use alloc::string::String; use alloc::string::String;
use alloc::vec::Vec; use alloc::vec::Vec;
use crate::{DeserializeError, SerializeError};
#[derive(Debug)] #[derive(Debug)]
pub struct Packet { pub struct Packet {
pub hook_id: u16, pub hook_id: u16,
@@ -27,22 +26,6 @@ pub struct HeaderRef<'buf> {
pub body_remainder: &'buf [u8], pub body_remainder: &'buf [u8],
} }
#[derive(Debug)]
pub enum SerializeError {
PathTooLarge,
ProcIdTooLarge,
BodyTooLarge,
}
#[derive(Debug, PartialEq)]
pub enum DeserializeError {
BufferTooShort,
BodyLengthMismatch,
PathTooLong,
ProcIdTooLong,
InvalidUtf8,
}
impl Packet { impl Packet {
pub fn serialize(&self) -> Result<Vec<u8>, SerializeError> { pub fn serialize(&self) -> Result<Vec<u8>, SerializeError> {
let proc_id_bytes = self.procedure_id.as_bytes(); let proc_id_bytes = self.procedure_id.as_bytes();
-1
View File
@@ -1 +0,0 @@
mod oneshot;
-231
View File
@@ -1,231 +0,0 @@
use crate::{
endpoint::{Endpoint, error::EndpointError},
leaf::Leaf,
packet::Packet,
};
use alloc::{boxed::Box, string::ToString, vec, vec::Vec};
use crossbeam_channel::{Receiver, Sender};
const ENDPOINT_A: u32 = 0;
const ENDPOINT_B: u32 = 1;
const LEAF_CONTROLLER: u32 = 100;
const LEAF_COMMS: u32 = 101;
const LEAF_RESPONDER: u32 = 102;
// const HOOK_ECHO: u16 = 500;
fn echo_packet(path: Vec<u32>, hook_id: u16) -> Packet {
Packet {
hook_id,
end_hook: false,
path,
procedure_id: "echo".to_string(),
data: "ABC123".as_bytes().to_vec(),
}
}
struct ControllerLeaf {
has_run: bool,
}
struct CommsLeaf {
tx: Sender<Vec<u8>>,
rx: Receiver<Vec<u8>>,
remote_id: u32,
is_authority: bool,
started: bool,
}
struct ResponderLeaf;
impl Leaf for ControllerLeaf {
fn get_id(&self) -> u32 {
LEAF_CONTROLLER
}
fn update(&mut self, endpoint: &mut Endpoint) {
if !self.has_run {
// Get next free available hook id
let hook_id = endpoint.get_hook_id();
// Create packet
let packet = echo_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id);
// Add packet to queue
let _ = endpoint.add_outbound(packet);
// Don't run again
self.has_run = true;
}
}
}
impl Leaf for CommsLeaf {
fn get_id(&self) -> u32 {
LEAF_COMMS
}
fn update(&mut self, endpoint: &mut Endpoint) {
if !self.started {
endpoint
.connections
.insert((self.remote_id, self.is_authority));
}
while !self.rx.is_empty() {
let packet = Packet::deserialize(&self.rx.recv().unwrap()).unwrap();
let _ = endpoint.add_inbound(packet);
}
endpoint.take_outbound_clear(self.remote_id, |packet| {
let data = packet.serialize().unwrap();
let _ = self.tx.send(data);
});
}
}
impl Leaf for ResponderLeaf {
fn get_id(&self) -> u32 {
LEAF_RESPONDER
}
fn update(&mut self, endpoint: &mut Endpoint) {
let local_id = endpoint.path.last().cloned().unwrap_or(0);
let mut packets = Vec::new();
endpoint.take_inbound_clear(local_id, |packet| {
let mut response = echo_packet(vec![ENDPOINT_A], packet.hook_id);
response.hook_id = packet.hook_id;
response.data = packet.data.clone();
packets.push(response);
});
for packet in packets {
endpoint.hooks.insert(packet.hook_id, 0);
let _ = endpoint.add_outbound(packet);
}
}
}
#[test]
fn test_oneshot() {
let (tx_a, rx_a) = crossbeam_channel::unbounded();
let (tx_b, rx_b) = crossbeam_channel::unbounded();
let mut endpoint_a = crate::endpoint::Endpoint::new(
ENDPOINT_A,
vec![
Box::new(ControllerLeaf { has_run: false }),
Box::new(CommsLeaf {
tx: tx_b,
rx: rx_a,
remote_id: ENDPOINT_B,
is_authority: false,
started: false,
}),
],
);
endpoint_a.path = vec![ENDPOINT_A];
let mut endpoint_b = crate::endpoint::Endpoint::new(
ENDPOINT_B,
vec![
Box::new(ResponderLeaf),
Box::new(CommsLeaf {
tx: tx_a,
rx: rx_b,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
}),
],
);
endpoint_b.path = vec![ENDPOINT_A, ENDPOINT_B];
// Connections are registered routing state. The comms leaves also insert them
// during updates, but the first application packet should not depend on leaf order.
endpoint_a.connections.insert((ENDPOINT_B, false));
endpoint_b.connections.insert((ENDPOINT_A, true));
// Cycle 1: A sends request to B
endpoint_a.update();
endpoint_b.update();
// Cycle 2: B receives request and sends response to A
endpoint_b.update();
endpoint_a.update();
// Cycle 3: A's CommsLeaf needs one more update to pull the packet from the channel
// and put it into the inbound queue.
endpoint_a.update();
// Assertions on state
assert!(
endpoint_a.inbound.contains_key(&ENDPOINT_A),
"Endpoint A should have received response"
);
assert_eq!(
endpoint_a.inbound.get(&ENDPOINT_A).unwrap().len(),
1,
"Endpoint A should have exactly one packet"
);
let response = &endpoint_a
.inbound
.get(&ENDPOINT_A)
.unwrap()
.front()
.unwrap();
assert_eq!(response.data, "ABC123".as_bytes());
// assert_eq!(response.hook_id, HOOK_ECHO);
}
#[test]
fn upward_outbound_without_hook_is_rejected() {
let mut endpoint = Endpoint::new(ENDPOINT_B, vec![]);
endpoint.path = vec![ENDPOINT_A, ENDPOINT_B];
endpoint.connections.insert((ENDPOINT_A, true));
let new_hook = endpoint.get_hook_id();
let error = endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A], new_hook))
.unwrap_err();
assert!(matches!(error, EndpointError::HookNotExist));
assert!(endpoint.outbound.is_empty());
}
#[test]
fn downward_outbound_without_hook_is_allowed() {
let mut endpoint = crate::endpoint::Endpoint::new(ENDPOINT_A, vec![]);
endpoint.path = vec![ENDPOINT_A];
endpoint.connections.insert((ENDPOINT_B, false));
let new_hook = endpoint.get_hook_id();
endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A, ENDPOINT_B], new_hook))
.unwrap();
assert_eq!(endpoint.outbound.get(&ENDPOINT_B).unwrap().len(), 1);
}
#[test]
fn deeper_upward_route_uses_parent_as_next_hop() {
const ENDPOINT_C: u32 = 2;
let mut endpoint = crate::endpoint::Endpoint::new(ENDPOINT_C, vec![]);
let new_hook = endpoint.get_hook_id();
endpoint.path = vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C];
endpoint.hooks.insert(new_hook, ENDPOINT_A);
endpoint.connections.insert((ENDPOINT_B, true));
endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A], new_hook))
.unwrap();
assert!(endpoint.outbound.contains_key(&ENDPOINT_B));
assert!(!endpoint.outbound.contains_key(&ENDPOINT_A));
}
+472
View File
@@ -0,0 +1,472 @@
mod support;
use crate::{Endpoint, EndpointError, RouteDirection};
use alloc::{boxed::Box, vec};
use support::{
CommsLeaf, ControllerLeaf, ENDPOINT_A, ENDPOINT_B, ENDPOINT_C, ResponderLeaf,
assert_hook_present, assert_hook_removed, echo_packet, endpoint_at, single_inbound_packet,
single_outbound_packet,
};
#[test]
fn test_oneshot() {
let (tx_a, rx_a) = crossbeam_channel::unbounded();
let (tx_b, rx_b) = crossbeam_channel::unbounded();
let mut endpoint_a = crate::endpoint::Endpoint::new(
ENDPOINT_A,
vec![
Box::new(ControllerLeaf { has_run: false }),
Box::new(CommsLeaf {
tx: tx_b,
rx: rx_a,
remote_id: ENDPOINT_B,
is_authority: false,
started: false,
}),
],
);
endpoint_a.path = vec![ENDPOINT_A];
let mut endpoint_b = crate::endpoint::Endpoint::new(
ENDPOINT_B,
vec![
Box::new(ResponderLeaf),
Box::new(CommsLeaf {
tx: tx_a,
rx: rx_b,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
}),
],
);
endpoint_b.path = vec![ENDPOINT_A, ENDPOINT_B];
// Connections are registered routing state. The comms leaves also insert them
// during updates, but the first application packet should not depend on leaf order.
endpoint_a.connections.insert((ENDPOINT_B, false));
endpoint_b.connections.insert((ENDPOINT_A, true));
// Cycle 1: A sends request to B
endpoint_a.update();
endpoint_b.update();
// Cycle 2: B receives request and sends response to A
endpoint_b.update();
endpoint_a.update();
// Cycle 3: A's CommsLeaf needs one more update to pull the packet from the channel
// and put it into the inbound queue.
endpoint_a.update();
// Assertions on state
assert!(
endpoint_a.inbound.contains_key(&ENDPOINT_A),
"Endpoint A should have received response"
);
assert_eq!(
endpoint_a.inbound.get(&ENDPOINT_A).unwrap().len(),
1,
"Endpoint A should have exactly one packet"
);
let response = &endpoint_a
.inbound
.get(&ENDPOINT_A)
.unwrap()
.front()
.unwrap();
assert!(response.end_hook);
assert_eq!(response.data, "ABC123".as_bytes());
assert!(
endpoint_b.hooks.is_empty(),
"responder hook should be cleaned after the upward response"
);
// assert_eq!(response.hook_id, HOOK_ECHO);
}
#[test]
fn inbound_packet_for_local_endpoint_is_delivered_locally() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.hooks.insert(hook_id, ENDPOINT_A);
endpoint
.add_inbound(echo_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id))
.unwrap();
let packet = single_inbound_packet(&endpoint, ENDPOINT_B);
assert!(packet.end_hook);
assert_eq!(packet.path, vec![ENDPOINT_A, ENDPOINT_B]);
assert_hook_present(&endpoint, hook_id);
assert!(endpoint.outbound.is_empty());
}
#[test]
fn outbound_packet_for_local_endpoint_is_delivered_locally() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.hooks.insert(hook_id, ENDPOINT_A);
endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id))
.unwrap();
let packet = single_inbound_packet(&endpoint, ENDPOINT_B);
assert!(packet.end_hook);
assert_eq!(packet.data, "ABC123".as_bytes());
assert_hook_present(&endpoint, hook_id);
assert!(endpoint.outbound.is_empty());
}
#[test]
fn inbound_downward_packet_routes_to_immediate_child() {
let mut endpoint = endpoint_at(ENDPOINT_A, vec![ENDPOINT_A]);
let hook_id = endpoint.get_hook_id();
endpoint.hooks.insert(hook_id, ENDPOINT_B);
endpoint.connections.insert((ENDPOINT_B, false));
endpoint
.add_inbound(echo_packet(
vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C],
hook_id,
))
.unwrap();
let packet = single_outbound_packet(&endpoint, ENDPOINT_B);
assert!(packet.end_hook);
assert_eq!(packet.path, vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C]);
assert_hook_present(&endpoint, hook_id);
assert!(!endpoint.outbound.contains_key(&ENDPOINT_C));
}
#[test]
fn outbound_downward_packet_routes_to_immediate_child() {
let mut endpoint = endpoint_at(ENDPOINT_A, vec![ENDPOINT_A]);
let hook_id = endpoint.get_hook_id();
endpoint.hooks.insert(hook_id, ENDPOINT_B);
endpoint.connections.insert((ENDPOINT_B, false));
endpoint
.add_outbound(echo_packet(
vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C],
hook_id,
))
.unwrap();
let packet = single_outbound_packet(&endpoint, ENDPOINT_B);
assert!(packet.end_hook);
assert_eq!(packet.path, vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C]);
assert_hook_present(&endpoint, hook_id);
assert!(!endpoint.outbound.contains_key(&ENDPOINT_C));
}
#[test]
fn inbound_upward_packet_with_hook_routes_to_parent() {
let mut endpoint = endpoint_at(ENDPOINT_C, vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C]);
let hook_id = endpoint.get_hook_id();
endpoint.hooks.insert(hook_id, ENDPOINT_A);
endpoint.connections.insert((ENDPOINT_B, true));
endpoint
.add_inbound(echo_packet(vec![ENDPOINT_A], hook_id))
.unwrap();
let packet = single_outbound_packet(&endpoint, ENDPOINT_B);
assert!(packet.end_hook);
assert_eq!(packet.hook_id, hook_id);
assert_hook_removed(&endpoint, hook_id);
assert!(!endpoint.outbound.contains_key(&ENDPOINT_A));
}
#[test]
fn inbound_upward_packet_without_hook_is_rejected() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.connections.insert((ENDPOINT_A, true));
let error = endpoint
.add_inbound(echo_packet(vec![ENDPOINT_A], hook_id))
.unwrap_err();
assert!(matches!(
error,
EndpointError::UnknownHook { hook_id: observed_hook_id } if observed_hook_id == hook_id
));
assert!(endpoint.inbound.is_empty());
assert!(endpoint.outbound.is_empty());
}
#[test]
fn forged_upward_packet_with_unknown_hook_is_rejected() {
let mut endpoint = endpoint_at(ENDPOINT_C, vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C]);
endpoint.hooks.insert(7, ENDPOINT_A);
endpoint.connections.insert((ENDPOINT_B, true));
let error = endpoint
.add_inbound(echo_packet(vec![ENDPOINT_A], 99))
.unwrap_err();
assert!(matches!(error, EndpointError::UnknownHook { hook_id: 99 }));
assert_hook_present(&endpoint, 7);
assert!(endpoint.outbound.is_empty());
}
#[test]
fn forged_sideways_packet_is_rejected_as_incorrect_path() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.hooks.insert(hook_id, ENDPOINT_A);
endpoint.connections.insert((ENDPOINT_A, true));
let error = endpoint
.add_inbound(echo_packet(vec![ENDPOINT_A, ENDPOINT_C], hook_id))
.unwrap_err();
assert!(matches!(error, EndpointError::DestinationOutsideLocalTree));
assert_hook_present(&endpoint, hook_id);
assert!(endpoint.inbound.is_empty());
assert!(endpoint.outbound.is_empty());
}
#[test]
fn malformed_frame_is_dropped_by_comms_leaf() {
let (tx_to_endpoint, rx_for_endpoint) = crossbeam_channel::unbounded();
let (tx_unused, _rx_unused) = crossbeam_channel::unbounded();
let mut endpoint = Endpoint::new(
ENDPOINT_B,
vec![Box::new(CommsLeaf {
tx: tx_unused,
rx: rx_for_endpoint,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
})],
);
endpoint.path = vec![ENDPOINT_A, ENDPOINT_B];
tx_to_endpoint.send(vec![0, 1, 2, 3]).unwrap();
endpoint.update();
assert!(endpoint.inbound.is_empty());
assert!(endpoint.outbound.is_empty());
}
#[test]
fn malformed_frame_does_not_block_following_valid_packet() {
let (tx_to_endpoint, rx_for_endpoint) = crossbeam_channel::unbounded();
let (tx_unused, _rx_unused) = crossbeam_channel::unbounded();
let hook_id = 42;
let mut endpoint = Endpoint::new(
ENDPOINT_B,
vec![Box::new(CommsLeaf {
tx: tx_unused,
rx: rx_for_endpoint,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
})],
);
endpoint.path = vec![ENDPOINT_A, ENDPOINT_B];
tx_to_endpoint.send(vec![0, 1, 2, 3]).unwrap();
tx_to_endpoint
.send(
echo_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id)
.serialize()
.unwrap(),
)
.unwrap();
endpoint.update();
let packet = single_inbound_packet(&endpoint, ENDPOINT_B);
assert!(packet.end_hook);
assert_eq!(packet.hook_id, hook_id);
}
#[test]
fn forged_frame_without_required_hook_is_dropped_by_comms_leaf() {
let (tx_to_endpoint, rx_for_endpoint) = crossbeam_channel::unbounded();
let (tx_unused, _rx_unused) = crossbeam_channel::unbounded();
let mut endpoint = Endpoint::new(
ENDPOINT_B,
vec![Box::new(CommsLeaf {
tx: tx_unused,
rx: rx_for_endpoint,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
})],
);
endpoint.path = vec![ENDPOINT_A, ENDPOINT_B];
endpoint.hooks.insert(7, ENDPOINT_A);
tx_to_endpoint
.send(echo_packet(vec![ENDPOINT_A], 12).serialize().unwrap())
.unwrap();
endpoint.update();
assert_hook_present(&endpoint, 7);
assert!(endpoint.inbound.is_empty());
assert!(endpoint.outbound.is_empty());
}
#[test]
fn upward_outbound_without_hook_is_rejected() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
endpoint.hooks.insert(7, ENDPOINT_A);
endpoint.connections.insert((ENDPOINT_A, true));
let new_hook = endpoint.get_hook_id();
let error = endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A], new_hook))
.unwrap_err();
assert!(matches!(
error,
EndpointError::UnknownHook { hook_id: observed_hook_id } if observed_hook_id == new_hook
));
assert_hook_present(&endpoint, 7);
assert!(endpoint.outbound.is_empty());
}
#[test]
fn downward_outbound_without_hook_is_allowed() {
let mut endpoint = endpoint_at(ENDPOINT_A, vec![ENDPOINT_A]);
endpoint.connections.insert((ENDPOINT_B, false));
let new_hook = endpoint.get_hook_id();
endpoint.hooks.insert(new_hook, ENDPOINT_B);
endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A, ENDPOINT_B], new_hook))
.unwrap();
assert_eq!(endpoint.outbound.get(&ENDPOINT_B).unwrap().len(), 1);
assert_hook_present(&endpoint, new_hook);
}
#[test]
fn deeper_upward_route_uses_parent_as_next_hop() {
let mut endpoint = endpoint_at(ENDPOINT_C, vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C]);
let new_hook = endpoint.get_hook_id();
endpoint.hooks.insert(new_hook, ENDPOINT_A);
endpoint.connections.insert((ENDPOINT_B, true));
endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A], new_hook))
.unwrap();
assert!(endpoint.outbound.contains_key(&ENDPOINT_B));
assert!(!endpoint.outbound.contains_key(&ENDPOINT_A));
assert_hook_removed(&endpoint, new_hook);
}
#[test]
fn downward_route_without_connection_is_rejected() {
let mut endpoint = endpoint_at(ENDPOINT_A, vec![ENDPOINT_A]);
let hook_id = endpoint.get_hook_id();
endpoint.hooks.insert(hook_id, ENDPOINT_B);
let error = endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id))
.unwrap_err();
assert!(matches!(
error,
EndpointError::MissingConnection {
next_hop: ENDPOINT_B,
direction: RouteDirection::Downward,
}
));
assert_hook_present(&endpoint, hook_id);
assert!(endpoint.outbound.is_empty());
}
#[test]
fn upward_route_without_connection_is_rejected_even_with_hook() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.hooks.insert(hook_id, ENDPOINT_A);
let error = endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A], hook_id))
.unwrap_err();
assert!(matches!(
error,
EndpointError::MissingConnection {
next_hop: ENDPOINT_A,
direction: RouteDirection::Upward,
}
));
assert_hook_present(&endpoint, hook_id);
assert!(endpoint.outbound.is_empty());
}
#[test]
fn end_hook_removes_hook_after_packet_is_queued() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.hooks.insert(hook_id, ENDPOINT_A);
endpoint.connections.insert((ENDPOINT_A, true));
endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A], hook_id))
.unwrap();
assert_hook_removed(&endpoint, hook_id);
assert_eq!(
single_outbound_packet(&endpoint, ENDPOINT_A).hook_id,
hook_id
);
}
#[test]
fn failed_end_hook_route_keeps_hook_state() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.hooks.insert(hook_id, ENDPOINT_A);
let error = endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A], hook_id))
.unwrap_err();
assert!(matches!(
error,
EndpointError::MissingConnection {
next_hop: ENDPOINT_A,
direction: RouteDirection::Upward,
}
));
assert_hook_present(&endpoint, hook_id);
assert!(endpoint.outbound.is_empty());
}
#[test]
fn inbound_without_absolute_path_is_rejected() {
let mut endpoint = Endpoint::new(ENDPOINT_A, vec![]);
let error = endpoint
.add_inbound(echo_packet(vec![ENDPOINT_A], 1))
.unwrap_err();
assert!(matches!(error, EndpointError::EndpointPathUnset));
assert!(endpoint.inbound.is_empty());
}
#[test]
fn outbound_without_absolute_path_is_rejected() {
let mut endpoint = Endpoint::new(ENDPOINT_A, vec![]);
let error = endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A], 1))
.unwrap_err();
assert!(matches!(error, EndpointError::EndpointPathUnset));
assert!(endpoint.outbound.is_empty());
}
@@ -0,0 +1,176 @@
use crate::{Endpoint, Leaf, Packet};
use alloc::{string::ToString, vec, vec::Vec};
use crossbeam_channel::{Receiver, Sender};
pub(super) const ENDPOINT_A: u32 = 0;
pub(super) const ENDPOINT_B: u32 = 1;
pub(super) const ENDPOINT_C: u32 = 2;
const LEAF_CONTROLLER: u32 = 100;
const LEAF_COMMS: u32 = 101;
const LEAF_RESPONDER: u32 = 102;
/// Builds a test packet whose route is the only field varied by routing tests.
///
/// Keeping the payload stable makes each assertion about endpoint behavior rather
/// than packet construction, which is important because forged and malformed cases
/// should fail before any leaf-level procedure handling would matter.
pub(super) fn echo_packet(path: Vec<u32>, hook_id: u16) -> Packet {
Packet {
hook_id,
end_hook: true,
path,
procedure_id: "echo".to_string(),
data: "ABC123".as_bytes().to_vec(),
}
}
/// Creates a bare endpoint at a known absolute path.
///
/// Most routing tests do not need leaves; they only need the endpoint's local path,
/// connection table, and hook table. This helper keeps that setup explicit without
/// hiding the routing state that each test is validating.
pub(super) fn endpoint_at(id: u32, path: Vec<u32>) -> Endpoint {
let mut endpoint = Endpoint::new(id, vec![]);
endpoint.path = path;
endpoint
}
/// Returns the only outbound packet queued for `next_hop`.
///
/// Routing bugs often show up as packets being sent to the final destination rather
/// than the immediate neighbor. Tests use this helper to assert both that exactly one
/// packet exists and that it was queued for the expected adjacent endpoint.
pub(super) fn single_outbound_packet(endpoint: &Endpoint, next_hop: u32) -> &Packet {
let queue = endpoint
.outbound
.get(&next_hop)
.unwrap_or_else(|| panic!("expected one outbound queue for {next_hop}"));
assert_eq!(queue.len(), 1, "expected exactly one outbound packet");
queue.front().unwrap()
}
/// Returns the only inbound packet delivered to `local_id`.
///
/// Local delivery is intentionally separate from transit forwarding, so the tests
/// assert against the local inbound queue instead of only checking that routing did
/// not produce an error.
pub(super) fn single_inbound_packet(endpoint: &Endpoint, local_id: u32) -> &Packet {
let queue = endpoint
.inbound
.get(&local_id)
.unwrap_or_else(|| panic!("expected one inbound queue for {local_id}"));
assert_eq!(queue.len(), 1, "expected exactly one inbound packet");
queue.front().unwrap()
}
/// Asserts that local hook state still contains `hook_id`.
///
/// Tests use this instead of open-coded map checks so every lifecycle assertion
/// explains the intended routing invariant when it fails.
pub(super) fn assert_hook_present(endpoint: &Endpoint, hook_id: u16) {
assert!(
endpoint.hooks.contains_key(&hook_id),
"expected hook {hook_id} to remain registered"
);
}
/// Asserts that local hook state no longer contains `hook_id`.
///
/// Upward `end_hook` packets are the only cases that should remove hook state;
/// downward and local packets with the same flag must leave hooks alone.
pub(super) fn assert_hook_removed(endpoint: &Endpoint, hook_id: u16) {
assert!(
!endpoint.hooks.contains_key(&hook_id),
"expected hook {hook_id} to be cleaned up"
);
}
pub(super) struct ControllerLeaf {
pub(super) has_run: bool,
}
pub(super) struct CommsLeaf {
pub(super) tx: Sender<Vec<u8>>,
pub(super) rx: Receiver<Vec<u8>>,
pub(super) remote_id: u32,
pub(super) is_authority: bool,
pub(super) started: bool,
}
pub(super) struct ResponderLeaf;
impl Leaf for ControllerLeaf {
fn get_id(&self) -> u32 {
LEAF_CONTROLLER
}
fn update(&mut self, endpoint: &mut Endpoint) {
if !self.has_run {
// The controller starts exactly one request so the end-to-end test can
// assert deterministic routing without accumulating retries.
let hook_id = endpoint.get_hook_id();
let packet = echo_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id);
let _ = endpoint.add_outbound(packet);
self.has_run = true;
}
}
}
impl Leaf for CommsLeaf {
fn get_id(&self) -> u32 {
LEAF_COMMS
}
fn update(&mut self, endpoint: &mut Endpoint) {
if !self.started {
endpoint
.connections
.insert((self.remote_id, self.is_authority));
self.started = true;
}
while !self.rx.is_empty() {
let data = self.rx.recv().unwrap();
// Transport bytes are untrusted. Dropping malformed frames here keeps
// the oneshot harness faithful to a router boundary: invalid wire data
// must not panic or poison later valid packets on the same connection.
if let Ok(packet) = Packet::deserialize(&data) {
let _ = endpoint.add_inbound(packet);
}
}
endpoint.take_outbound_clear(self.remote_id, |packet| {
let data = packet.serialize().unwrap();
let _ = self.tx.send(data);
});
}
}
impl Leaf for ResponderLeaf {
fn get_id(&self) -> u32 {
LEAF_RESPONDER
}
fn update(&mut self, endpoint: &mut Endpoint) {
let local_id = endpoint.path.last().cloned().unwrap_or(0);
let mut packets = Vec::new();
endpoint.take_inbound_clear(local_id, |packet| {
let mut response = echo_packet(vec![ENDPOINT_A], packet.hook_id);
response.hook_id = packet.hook_id;
response.data = packet.data.clone();
packets.push(response);
});
for packet in packets {
// Upward responses require local hook state before routing; this mirrors
// a callee accepting the call and authorizing the matching response hook.
endpoint.hooks.insert(packet.hook_id, 0);
let _ = endpoint.add_outbound(packet);
}
}
}
@@ -1,6 +1,6 @@
use super::*; use alloc::{string::ToString, vec, vec::Vec};
use alloc::string::ToString;
use alloc::vec; use crate::{DeserializeError, EndpointError, Packet, SerializeError};
// ── Helpers ─────────────────────────────────────────────────────────────── // ── Helpers ───────────────────────────────────────────────────────────────
@@ -226,3 +226,27 @@ fn invalid_utf8_in_procedure_id() {
DeserializeError::InvalidUtf8 DeserializeError::InvalidUtf8
); );
} }
#[test]
fn serialize_error_wraps_into_endpoint_error() {
let error: EndpointError = SerializeError::BodyTooLarge.into();
assert_eq!(
error,
EndpointError::PacketSerialize {
source: SerializeError::BodyTooLarge,
}
);
}
#[test]
fn deserialize_error_wraps_into_endpoint_error() {
let error: EndpointError = DeserializeError::BufferTooShort.into();
assert_eq!(
error,
EndpointError::PacketDeserialize {
source: DeserializeError::BufferTooShort,
}
);
}
-14
View File
@@ -1,14 +0,0 @@
use alloc::{
collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque},
vec::Vec,
};
use crate::packet::Packet;
pub type Path = Vec<u32>;
pub type EndpointName = u32;
pub type HookID = u16;
pub type ConnectionSet = BTreeSet<(EndpointName, bool)>;
pub type HookMap = BTreeMap<HookID, EndpointName>;
pub type PacketQueue = VecDeque<Packet>;
pub type RouteMap = BTreeMap<EndpointName, PacketQueue>;