where
A: rkyv::Portable
@@ -204,34 +215,3 @@ fn align_section(bytes: &[u8]) -> AlignedVec {
aligned.extend_from_slice(bytes);
aligned
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::protocol::{HookTarget, PacketType};
- use alloc::{string::String, vec};
-
- #[test]
- fn framing_roundtrip_preserves_call() {
- let header = PacketHeader {
- packet_type: PacketType::Call,
- src_path: Vec::new(),
- dst_path: vec![String::from("child")],
- dst_leaf: Some(String::from("echo")),
- hook_id: None,
- };
- let call = CallMessage {
- procedure_id: String::from("org.product.v1.echo.roundtrip"),
- data: b"ping".to_vec(),
- response_hook: Some(HookTarget {
- hook_id: 1,
- return_path: Vec::new(),
- }),
- };
-
- let frame = encode_packet(&header, &call).expect("frame should encode");
- let parsed = decode_frame(&frame).expect("frame should decode");
- assert_eq!(parsed.deserialize_header(), header);
- assert_eq!(parsed.deserialize_call().expect("call should decode"), call);
- }
-}
diff --git a/src/protocol/introspection.rs b/src/protocol/introspection.rs
index e2b34e1..1f046e1 100644
--- a/src/protocol/introspection.rs
+++ b/src/protocol/introspection.rs
@@ -1,4 +1,4 @@
-//! Required introspection payloads.
+//! Required introspection payloads for discovery.
use alloc::{string::String, vec::Vec};
use rkyv::{Archive, Deserialize, Serialize};
diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs
index 5cbc603..f2d8229 100644
--- a/src/protocol/mod.rs
+++ b/src/protocol/mod.rs
@@ -4,14 +4,37 @@
pub mod codec;
pub mod introspection;
+pub mod traits;
+pub mod tree;
mod types;
pub mod validation;
+#[cfg(test)]
+mod tests;
+
pub use codec::{
- FrameBytes, FrameError, ParsedFrame, decode_frame, deserialize_archived_bytes, encode_packet,
+ FrameBytes, FrameCodec, FrameError, ParsedFrame, RkyvCodec, deserialize_archived_bytes,
};
pub use introspection::{EndpointIntrospection, LeafIntrospection, LeafIntrospectionSummary};
+pub use traits::{HookStore, LeafMetadata, PacketFraming, PacketProcessor, RouteResolution};
pub use types::{
CallMessage, DataMessage, FaultMessage, HookTarget, PacketHeader, PacketType, ProtocolFault,
};
pub use validation::{ValidationError, validate_call, validate_header, validate_procedure_id};
+
+pub fn encode_packet(header: &PacketHeader, payload: &P) -> Result
+where
+ P: for<'a> rkyv::Serialize<
+ rkyv::api::high::HighSerializer<
+ rkyv::util::AlignedVec,
+ rkyv::ser::allocator::ArenaHandle<'a>,
+ rkyv::rancor::Error,
+ >,
+ >,
+{
+ codec::encode_packet(header, payload)
+}
+
+pub fn decode_frame(bytes: &[u8]) -> Result, FrameError> {
+ codec::decode_frame(bytes)
+}
diff --git a/src/protocol/tests/mod.rs b/src/protocol/tests/mod.rs
new file mode 100644
index 0000000..e90d000
--- /dev/null
+++ b/src/protocol/tests/mod.rs
@@ -0,0 +1,2 @@
+mod protocol;
+mod tree;
diff --git a/src/protocol/tests/protocol.rs b/src/protocol/tests/protocol.rs
new file mode 100644
index 0000000..0664c33
--- /dev/null
+++ b/src/protocol/tests/protocol.rs
@@ -0,0 +1,118 @@
+use alloc::{borrow::ToOwned, string::String, vec, vec::Vec};
+
+use crate::protocol::{
+ CallMessage, FaultMessage, FrameError, HookTarget, PacketHeader, PacketType, ProtocolFault,
+ ValidationError, decode_frame, encode_packet, validate_call, validate_header,
+ validate_procedure_id,
+};
+
+fn path(parts: &[&str]) -> Vec {
+ parts.iter().map(|part| (*part).to_owned()).collect()
+}
+
+#[test]
+fn packet_framing_roundtrip_preserves_header_and_payload() {
+ let header = PacketHeader {
+ packet_type: PacketType::Call,
+ src_path: path(&["root", "caller"]),
+ dst_path: path(&["root", "callee"]),
+ dst_leaf: Some("echo".to_owned()),
+ hook_id: None,
+ };
+ let call = CallMessage {
+ procedure_id: "unshell.echo.v1.alpha.invoke".to_owned(),
+ data: vec![1, 2, 3, 4],
+ response_hook: Some(HookTarget {
+ hook_id: 7,
+ return_path: path(&["root", "caller"]),
+ }),
+ };
+
+ let frame = encode_packet(&header, &call).expect("frame should encode");
+ let parsed = decode_frame(&frame).expect("frame should decode");
+
+ assert_eq!(parsed.header(), &header);
+ assert_eq!(parsed.packet_type(), PacketType::Call);
+ assert_eq!(
+ parsed.deserialize_call().expect("call should deserialize"),
+ call
+ );
+}
+
+#[test]
+fn header_and_call_validation_reject_invalid_combinations() {
+ let invalid_header = PacketHeader {
+ packet_type: PacketType::Data,
+ src_path: path(&["peer"]),
+ dst_path: path(&["host"]),
+ dst_leaf: Some("echo".to_owned()),
+ hook_id: None,
+ };
+ assert_eq!(
+ validate_header(&invalid_header),
+ Err(ValidationError::HeaderInvariant(
+ "Data and Fault packets must not carry dst_leaf"
+ ))
+ );
+
+ let header = PacketHeader {
+ packet_type: PacketType::Call,
+ src_path: path(&["caller"]),
+ dst_path: path(&["callee"]),
+ dst_leaf: Some("echo".to_owned()),
+ hook_id: None,
+ };
+ let invalid_call = CallMessage {
+ procedure_id: "unshell.echo.v1.alpha.invoke".to_owned(),
+ data: Vec::new(),
+ response_hook: Some(HookTarget {
+ hook_id: 5,
+ return_path: path(&["elsewhere"]),
+ }),
+ };
+ assert_eq!(
+ validate_call(&header, &invalid_call),
+ Err(ValidationError::CallInvariant(
+ "response_hook.return_path must equal header.src_path"
+ ))
+ );
+}
+
+#[test]
+fn procedure_validation_accepts_introspection_and_rejects_bad_shapes() {
+ assert_eq!(validate_procedure_id(""), Ok(()));
+ assert_eq!(
+ validate_procedure_id("unshell.echo.v01.alpha.invoke"),
+ Err(ValidationError::ProcedureId(
+ "version segment must be v followed by a positive decimal integer"
+ ))
+ );
+ assert_eq!(
+ validate_procedure_id("too.short.v1"),
+ Err(ValidationError::ProcedureId(
+ "must contain exactly 5 segments"
+ ))
+ );
+}
+
+#[test]
+fn truncated_frames_are_rejected() {
+ let header = PacketHeader {
+ packet_type: PacketType::Fault,
+ src_path: path(&["src"]),
+ dst_path: path(&["dst"]),
+ dst_leaf: None,
+ hook_id: Some(9),
+ };
+ let message = FaultMessage {
+ fault: ProtocolFault::InternalError,
+ };
+
+ let frame = encode_packet(&header, &message).expect("frame should encode");
+ let truncated = &frame[..frame.len() - 1];
+
+ assert!(matches!(
+ decode_frame(truncated),
+ Err(FrameError::Truncated)
+ ));
+}
diff --git a/src/protocol/tests/tree.rs b/src/protocol/tests/tree.rs
new file mode 100644
index 0000000..d97b07c
--- /dev/null
+++ b/src/protocol/tests/tree.rs
@@ -0,0 +1,155 @@
+use alloc::{borrow::ToOwned, string::String, vec, vec::Vec};
+
+use crate::protocol::tree::{
+ DefaultRouteProvider, Endpoint, Ingress, LeafBehavior, LeafNode, LeafSpec, LocalEvent,
+ ProtocolEndpoint, RouteDecision, RouteProvider, TreeNode,
+};
+use crate::protocol::{
+ DataMessage, EndpointIntrospection, FaultMessage, PacketHeader, PacketType, ProtocolFault,
+ decode_frame, deserialize_archived_bytes, encode_packet,
+};
+
+fn path(parts: &[&str]) -> Vec {
+ parts.iter().map(|part| (*part).to_owned()).collect()
+}
+
+#[test]
+fn tree_node_paths_flatten_explicitly() {
+ let tree = TreeNode::Root {
+ children: vec![TreeNode::Endpoint {
+ segment: "branch".to_owned(),
+ leaves: vec![LeafNode {
+ name: "echo".to_owned(),
+ procedures: vec!["unshell.echo.v1.alpha.invoke".to_owned()],
+ }],
+ children: vec![TreeNode::Endpoint {
+ segment: "leaf".to_owned(),
+ leaves: Vec::new(),
+ children: Vec::new(),
+ }],
+ }],
+ };
+
+ assert_eq!(
+ tree.paths(),
+ vec![
+ Vec::::new(),
+ path(&["branch"]),
+ path(&["branch", "leaf"])
+ ]
+ );
+}
+
+#[test]
+fn longest_prefix_routing_prefers_most_specific_child() {
+ let provider = DefaultRouteProvider;
+ let child_paths = vec![path(&["a"]), path(&["a", "b"]), path(&["x"])];
+
+ assert_eq!(
+ provider.route_destination(&Vec::new(), &child_paths, true, &path(&["a", "b", "c"])),
+ RouteDecision::Child(1)
+ );
+ assert_eq!(
+ provider.route_destination(&path(&["a"]), &child_paths, true, &path(&["z"])),
+ RouteDecision::Parent
+ );
+}
+
+#[test]
+fn protocol_endpoint_introspection_returns_leaf_summary() {
+ let mut endpoint = ProtocolEndpoint::new(
+ path(&["root"]),
+ Some(Vec::new()),
+ Vec::new(),
+ vec![LeafSpec {
+ name: "echo".to_owned(),
+ procedures: vec!["unshell.echo.v1.alpha.invoke".to_owned()],
+ behavior: LeafBehavior::Echo,
+ }],
+ );
+
+ let hook_id = endpoint.allocate_hook_id();
+ let frame = endpoint
+ .make_call(path(&["root"]), None, "", Some(hook_id), Vec::new())
+ .expect("introspection call should encode");
+
+ let outcome = endpoint
+ .receive(&Ingress::Local, frame)
+ .expect("endpoint should handle introspection");
+
+ assert!(outcome.events.is_empty());
+ assert_eq!(outcome.forwards.len(), 1);
+ assert_eq!(outcome.forwards[0].0, RouteDecision::Parent);
+
+ let parsed = decode_frame(&outcome.forwards[0].1).expect("response should decode");
+ let response = parsed
+ .deserialize_data()
+ .expect("response data should deserialize");
+ let introspection = deserialize_archived_bytes::<
+ rkyv::Archived,
+ EndpointIntrospection,
+ >(&response.data)
+ .expect("introspection payload should deserialize");
+
+ assert!(response.end_hook);
+ assert_eq!(introspection.leaves.len(), 1);
+ assert_eq!(introspection.leaves[0].leaf_name, "echo");
+ assert_eq!(
+ introspection.leaves[0].procedures,
+ vec!["unshell.echo.v1.alpha.invoke".to_owned()]
+ );
+}
+
+#[test]
+fn invalid_hook_peer_emits_local_fault_event() {
+ let mut endpoint = ProtocolEndpoint::new(path(&["client"]), None, Vec::new(), Vec::new());
+ let hook_id = endpoint.allocate_hook_id();
+
+ endpoint
+ .make_call(
+ path(&["server"]),
+ None,
+ "unshell.echo.v1.alpha.invoke",
+ Some(hook_id),
+ vec![1, 2, 3],
+ )
+ .expect("call should establish an active hook");
+
+ let frame = encode_packet(
+ &PacketHeader {
+ packet_type: PacketType::Data,
+ src_path: path(&["client"]),
+ dst_path: path(&["client"]),
+ dst_leaf: None,
+ hook_id: Some(hook_id),
+ },
+ &DataMessage {
+ procedure_id: "unshell.echo.v1.alpha.invoke".to_owned(),
+ data: vec![9],
+ end_hook: false,
+ },
+ )
+ .expect("data frame should encode");
+
+ let outcome = endpoint
+ .receive(&Ingress::Local, frame)
+ .expect("invalid peer should be handled");
+
+ assert!(outcome.forwards.is_empty());
+ assert_eq!(outcome.events.len(), 1);
+ assert!(!outcome.dropped);
+
+ match &outcome.events[0] {
+ LocalEvent::Fault { header, message } => {
+ assert_eq!(header.packet_type, PacketType::Fault);
+ assert_eq!(header.hook_id, Some(hook_id));
+ assert_eq!(
+ message,
+ &FaultMessage {
+ fault: ProtocolFault::InvalidHookPeer,
+ }
+ );
+ }
+ other => panic!("expected fault event, got {other:?}"),
+ }
+}
diff --git a/src/protocol/traits.rs b/src/protocol/traits.rs
new file mode 100644
index 0000000..629f9bb
--- /dev/null
+++ b/src/protocol/traits.rs
@@ -0,0 +1,142 @@
+//! Protocol implementation traits exposed by the core crate.
+//!
+//! These traits collect the core contracts needed to plug framing, routing,
+//! hook storage, leaf metadata, and packet processing into an implementation.
+
+use alloc::{string::String, vec::Vec};
+
+use super::{
+ FrameBytes, FrameCodec, LeafIntrospection, LeafIntrospectionSummary,
+ tree::{
+ ActiveHook, Endpoint, EndpointError, EndpointOutcome, HookKey, HookTable, Ingress,
+ LeafNode, LeafSpec, PendingHook, RouteProvider,
+ },
+};
+
+/// Packet framing contract for the canonical wire format.
+pub trait PacketFraming: FrameCodec {}
+
+impl PacketFraming for T where T: FrameCodec + ?Sized {}
+
+/// Route resolution contract for endpoint path delivery.
+pub trait RouteResolution: RouteProvider {}
+
+impl RouteResolution for T where T: RouteProvider + ?Sized {}
+
+/// Hook storage contract for pending and active protocol flows.
+pub trait HookStore {
+ fn allocate_hook_id(&self, return_path: &[String]) -> u64;
+ fn insert_pending(&mut self, pending: PendingHook);
+ fn insert_active(&mut self, active: ActiveHook);
+ fn activate_pending(&mut self, key: &HookKey, peer_path: Vec) -> Option<()>;
+ fn remove_pending(&mut self, key: &HookKey) -> Option;
+ fn remove_active(&mut self, key: &HookKey) -> Option;
+ fn pending(&self, key: &HookKey) -> Option<&PendingHook>;
+ fn active(&self, key: &HookKey) -> Option<&ActiveHook>;
+ fn active_mut(&mut self, key: &HookKey) -> Option<&mut ActiveHook>;
+}
+
+impl HookStore for HookTable {
+ fn allocate_hook_id(&self, return_path: &[String]) -> u64 {
+ HookTable::allocate_hook_id(self, return_path)
+ }
+
+ fn insert_pending(&mut self, pending: PendingHook) {
+ HookTable::insert_pending(self, pending);
+ }
+
+ fn insert_active(&mut self, active: ActiveHook) {
+ HookTable::insert_active(self, active);
+ }
+
+ fn activate_pending(&mut self, key: &HookKey, peer_path: Vec) -> Option<()> {
+ HookTable::activate_pending(self, key, peer_path)
+ }
+
+ fn remove_pending(&mut self, key: &HookKey) -> Option {
+ HookTable::remove_pending(self, key)
+ }
+
+ fn remove_active(&mut self, key: &HookKey) -> Option {
+ HookTable::remove_active(self, key)
+ }
+
+ fn pending(&self, key: &HookKey) -> Option<&PendingHook> {
+ HookTable::pending(self, key)
+ }
+
+ fn active(&self, key: &HookKey) -> Option<&ActiveHook> {
+ HookTable::active(self, key)
+ }
+
+ fn active_mut(&mut self, key: &HookKey) -> Option<&mut ActiveHook> {
+ HookTable::active_mut(self, key)
+ }
+}
+
+/// Leaf metadata contract used for protocol discovery payloads.
+pub trait LeafMetadata {
+ fn leaf_name(&self) -> &str;
+ fn procedures(&self) -> &[String];
+
+ fn summary(&self) -> LeafIntrospectionSummary {
+ LeafIntrospectionSummary {
+ leaf_name: self.leaf_name().into(),
+ procedures: self.procedures().to_vec(),
+ }
+ }
+
+ fn introspection(&self) -> LeafIntrospection {
+ LeafIntrospection {
+ leaf_name: self.leaf_name().into(),
+ procedures: self.procedures().to_vec(),
+ }
+ }
+}
+
+impl LeafMetadata for LeafSpec {
+ fn leaf_name(&self) -> &str {
+ &self.name
+ }
+
+ fn procedures(&self) -> &[String] {
+ &self.procedures
+ }
+}
+
+impl LeafMetadata for LeafNode {
+ fn leaf_name(&self) -> &str {
+ &self.name
+ }
+
+ fn procedures(&self) -> &[String] {
+ &self.procedures
+ }
+}
+
+/// Packet processor and local runtime contract for framed protocol traffic.
+pub trait PacketProcessor {
+ fn path(&self) -> &[String];
+ fn receive(
+ &mut self,
+ ingress: &Ingress,
+ frame: FrameBytes,
+ ) -> Result;
+}
+
+impl PacketProcessor for T
+where
+ T: Endpoint + ?Sized,
+{
+ fn path(&self) -> &[String] {
+ Endpoint::path(self)
+ }
+
+ fn receive(
+ &mut self,
+ ingress: &Ingress,
+ frame: FrameBytes,
+ ) -> Result {
+ Endpoint::receive(self, ingress, frame)
+ }
+}
diff --git a/src/tree/endpoint.rs b/src/protocol/tree/endpoint.rs
similarity index 53%
rename from src/tree/endpoint.rs
rename to src/protocol/tree/endpoint.rs
index 0e721f2..215a3c6 100644
--- a/src/tree/endpoint.rs
+++ b/src/protocol/tree/endpoint.rs
@@ -1,4 +1,4 @@
-//! Minimal endpoint runtime for protocol tests.
+//! Endpoint runtime and traits.
use alloc::{
collections::{BTreeMap, BTreeSet},
@@ -9,36 +9,31 @@ use alloc::{
use core::fmt;
use rkyv::{rancor::Error as RkyvError, to_bytes};
-use crate::{
- protocol::{
- CallMessage, DataMessage, EndpointIntrospection, FaultMessage, FrameBytes, FrameError,
- HookTarget, LeafIntrospection, LeafIntrospectionSummary, PacketHeader, PacketType,
- ProtocolFault, decode_frame, encode_packet, introspection::INTROSPECTION_PROCEDURE_ID,
- validate_call, validate_header, validate_procedure_id,
- },
- tree::{ActiveHook, HookKey, HookTable, PendingHook, RouteDecision, route_destination},
+use crate::protocol::{
+ CallMessage, DataMessage, EndpointIntrospection, FaultMessage, FrameBytes, FrameError,
+ HookTarget, LeafIntrospection, LeafIntrospectionSummary, PacketHeader, PacketType,
+ ProtocolFault, ValidationError, decode_frame, encode_packet,
+ introspection::INTROSPECTION_PROCEDURE_ID, validate_call, validate_header,
+ validate_procedure_id,
};
-/// Local connection state defined by the protocol.
+use super::{ActiveHook, HookKey, HookTable, PendingHook, RouteDecision, route_destination};
+
+/// Local connection state.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
- /// Connected but not routable.
Unregistered,
- /// Admitted into local routing.
Registered,
}
/// Registered child route.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChildRoute {
- /// Child endpoint path.
pub path: Vec,
- /// Local connection state.
pub state: ConnectionState,
}
impl ChildRoute {
- /// Creates a registered child route.
pub fn registered(path: Vec) -> Self {
Self {
path,
@@ -47,73 +42,58 @@ impl ChildRoute {
}
}
-/// Basic leaf behavior used by the test protocol runtime.
+/// Leaf behavior for test runtime.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LeafBehavior {
- /// Echoes the call data back in one `Data` packet.
Echo,
}
/// Static leaf description.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LeafSpec {
- /// Local leaf name.
pub name: String,
- /// Supported procedures.
pub procedures: Vec,
- /// Test behavior.
pub behavior: LeafBehavior,
}
-/// How a packet arrived at the endpoint.
+/// Arrival side.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Ingress {
- /// From the direct parent.
Parent,
- /// From a direct child path.
Child(Vec),
- /// Originated locally.
Local,
}
-/// Locally delivered events produced by protocol processing.
+/// Local events.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LocalEvent {
- /// A supported local call with no response hook.
Call {
header: PacketHeader,
message: CallMessage,
},
- /// Locally delivered data.
Data {
header: PacketHeader,
message: DataMessage,
},
- /// Locally delivered or synthesized fault.
Fault {
header: PacketHeader,
message: FaultMessage,
},
}
-/// Output from processing one frame.
+/// Processing outcome.
#[derive(Debug, Default)]
pub struct EndpointOutcome {
- /// Frames to forward. The frame bytes are moved, not cloned.
pub forwards: Vec<(RouteDecision, FrameBytes)>,
- /// Events delivered locally.
pub events: Vec,
- /// Whether the packet was silently dropped.
pub dropped: bool,
}
-/// Endpoint processing failure.
+/// Processing error.
#[derive(Debug)]
pub enum EndpointError {
- /// Frame parsing failed.
Frame(FrameError),
- /// Validation failed.
- Validation(crate::protocol::ValidationError),
+ Validation(ValidationError),
}
impl fmt::Display for EndpointError {
@@ -125,8 +105,7 @@ impl fmt::Display for EndpointError {
}
}
-#[cfg(feature = "std")]
-impl std::error::Error for EndpointError {}
+impl core::error::Error for EndpointError {}
impl From for EndpointError {
fn from(value: FrameError) -> Self {
@@ -134,15 +113,25 @@ impl From for EndpointError {
}
}
-impl From for EndpointError {
- fn from(value: crate::protocol::ValidationError) -> Self {
+impl From for EndpointError {
+ fn from(value: ValidationError) -> Self {
Self::Validation(value)
}
}
-/// Local endpoint model suitable for tests and later integration work.
+/// Core trait for a protocol endpoint.
+pub trait Endpoint {
+ fn path(&self) -> &[String];
+ fn receive(
+ &mut self,
+ ingress: &Ingress,
+ frame: FrameBytes,
+ ) -> Result;
+}
+
+/// Default endpoint implementation.
#[derive(Debug, Default)]
-pub struct Endpoint {
+pub struct ProtocolEndpoint {
path: Vec,
parent_path: Option>,
children: Vec,
@@ -151,8 +140,7 @@ pub struct Endpoint {
hooks: HookTable,
}
-impl Endpoint {
- /// Creates an endpoint with explicit path, parent, children, and leaves.
+impl ProtocolEndpoint {
pub fn new(
path: Vec,
parent_path: Option>,
@@ -172,21 +160,6 @@ impl Endpoint {
}
}
- /// Returns the local endpoint path.
- pub fn path(&self) -> &[String] {
- &self.path
- }
-
- /// Returns the hook table for assertions.
- pub fn hooks(&self) -> &HookTable {
- &self.hooks
- }
-
- /// Registers an endpoint-level procedure.
- ///
- /// # Errors
- ///
- /// Returns [`EndpointError`] when the procedure id is invalid.
pub fn add_endpoint_procedure(
&mut self,
procedure_id: impl Into,
@@ -197,16 +170,10 @@ impl Endpoint {
Ok(())
}
- /// Allocates a new local hook id.
pub fn allocate_hook_id(&self) -> u64 {
self.hooks.allocate_hook_id(&self.path)
}
- /// Creates an outbound `Call` frame and registers host-side hook state when needed.
- ///
- /// # Errors
- ///
- /// Returns [`EndpointError`] when validation or framing fails.
pub fn make_call(
&mut self,
dst_path: Vec,
@@ -250,11 +217,6 @@ impl Endpoint {
Ok(encode_packet(&header, &call)?)
}
- /// Creates an outbound `Data` frame.
- ///
- /// # Errors
- ///
- /// Returns [`EndpointError`] when validation or framing fails.
pub fn make_data(
&self,
dst_path: Vec,
@@ -281,150 +243,6 @@ impl Endpoint {
Ok(encode_packet(&header, &message)?)
}
- /// Processes one framed packet.
- ///
- /// # Errors
- ///
- /// Returns [`EndpointError`] when frame decoding or validation fails.
- pub fn receive(
- &mut self,
- ingress: &Ingress,
- frame: FrameBytes,
- ) -> Result {
- enum OwnedPayload {
- Call(PacketHeader, CallMessage),
- Data(PacketHeader, DataMessage),
- Fault(PacketHeader, FaultMessage),
- }
-
- let owned = {
- let parsed = decode_frame(&frame)?;
- let header = parsed.deserialize_header();
- validate_header(&header)?;
- match header.packet_type {
- PacketType::Call => OwnedPayload::Call(header, parsed.deserialize_call()?),
- PacketType::Data => OwnedPayload::Data(header, parsed.deserialize_data()?),
- PacketType::Fault => OwnedPayload::Fault(header, parsed.deserialize_fault()?),
- }
- };
-
- let src_path = match &owned {
- OwnedPayload::Call(header, _) => &header.src_path,
- OwnedPayload::Data(header, _) => &header.src_path,
- OwnedPayload::Fault(header, _) => &header.src_path,
- };
-
- if !self.valid_source_for_ingress(ingress, src_path) {
- return Ok(EndpointOutcome {
- dropped: true,
- ..EndpointOutcome::default()
- });
- }
-
- match owned {
- OwnedPayload::Call(header, message) => {
- self.receive_call(ingress, frame, header, message)
- }
- OwnedPayload::Data(header, message) => self.receive_data(header, message),
- OwnedPayload::Fault(header, message) => self.receive_fault(header, message),
- }
- }
-
- fn receive_call(
- &mut self,
- ingress: &Ingress,
- frame: FrameBytes,
- header: PacketHeader,
- message: CallMessage,
- ) -> Result {
- if !matches!(ingress, Ingress::Parent | Ingress::Local) {
- return Ok(EndpointOutcome {
- dropped: true,
- ..EndpointOutcome::default()
- });
- }
-
- validate_call(&header, &message)?;
- match self.decide_route(&header.dst_path) {
- RouteDecision::Child(index) => Ok(EndpointOutcome {
- forwards: vec![(RouteDecision::Child(index), frame)],
- ..EndpointOutcome::default()
- }),
- RouteDecision::Parent => Ok(EndpointOutcome {
- forwards: vec![(RouteDecision::Parent, frame)],
- ..EndpointOutcome::default()
- }),
- RouteDecision::Drop => Ok(EndpointOutcome {
- dropped: true,
- ..EndpointOutcome::default()
- }),
- RouteDecision::Local => self.handle_local_call(header, message),
- }
- }
-
- fn receive_data(
- &mut self,
- header: PacketHeader,
- message: DataMessage,
- ) -> Result {
- match self.decide_route(&header.dst_path) {
- RouteDecision::Child(_) | RouteDecision::Parent => Ok(EndpointOutcome {
- dropped: true,
- ..EndpointOutcome::default()
- }),
- RouteDecision::Drop => Ok(EndpointOutcome {
- dropped: true,
- ..EndpointOutcome::default()
- }),
- RouteDecision::Local => self.handle_local_data(header, message),
- }
- }
-
- fn receive_fault(
- &mut self,
- header: PacketHeader,
- message: FaultMessage,
- ) -> Result {
- match self.decide_route(&header.dst_path) {
- RouteDecision::Child(_) | RouteDecision::Parent => Ok(EndpointOutcome {
- dropped: true,
- ..EndpointOutcome::default()
- }),
- RouteDecision::Drop => Ok(EndpointOutcome {
- dropped: true,
- ..EndpointOutcome::default()
- }),
- RouteDecision::Local => {
- let key = HookKey::new(
- self.path.clone(),
- header.hook_id.expect("validated hook id"),
- );
- let matches_active = self
- .hooks
- .active(&key)
- .map(|active| active.peer_path == header.src_path)
- .unwrap_or(false);
- let matches_pending = self
- .hooks
- .pending(&key)
- .map(|pending| pending.caller_src_path == header.src_path)
- .unwrap_or(false);
- if !(matches_active || matches_pending) {
- return Ok(EndpointOutcome {
- dropped: true,
- ..EndpointOutcome::default()
- });
- }
- self.hooks.remove_active(&key);
- self.hooks.remove_pending(&key);
- Ok(EndpointOutcome {
- events: vec![LocalEvent::Fault { header, message }],
- ..EndpointOutcome::default()
- })
- }
- }
- }
-
fn handle_local_call(
&mut self,
header: PacketHeader,
@@ -453,11 +271,7 @@ impl Endpoint {
Some(leaf_name) => self
.leaves
.get(leaf_name)
- .map(|leaf| {
- leaf.procedures
- .iter()
- .any(|candidate| candidate == &message.procedure_id)
- })
+ .map(|leaf| leaf.procedures.iter().any(|p| p == &message.procedure_id))
.unwrap_or(false),
None => self.endpoint_procedures.contains(&message.procedure_id),
};
@@ -466,7 +280,7 @@ impl Endpoint {
let fault = if header
.dst_leaf
.as_ref()
- .is_some_and(|leaf_name| !self.leaves.contains_key(leaf_name))
+ .is_some_and(|name| !self.leaves.contains_key(name))
{
ProtocolFault::UnknownLeaf
} else {
@@ -482,15 +296,10 @@ impl Endpoint {
match header
.dst_leaf
.as_ref()
- .and_then(|leaf_name| self.leaves.get(leaf_name))
+ .and_then(|name| self.leaves.get(name))
{
- Some(LeafSpec {
- behavior: LeafBehavior::Echo,
- ..
- }) if key.is_some() => {
- let hook = message
- .response_hook
- .expect("key and hook are synchronized");
+ Some(leaf) if leaf.behavior == LeafBehavior::Echo && key.is_some() => {
+ let hook = message.response_hook.expect("synchronized");
let response = DataMessage {
procedure_id: message.procedure_id.clone(),
data: message.data,
@@ -535,13 +344,11 @@ impl Endpoint {
let Some(leaf) = self.leaves.get(leaf_name) else {
return self.emit_fault_if_possible(Some(key), ProtocolFault::UnknownLeaf);
};
- // WARNING: introspection nests one archived payload inside `DataMessage.data`.
- // This inner allocation is required because the protocol defines `data` as opaque bytes.
to_bytes::(&LeafIntrospection {
leaf_name: leaf_name.clone(),
procedures: leaf.procedures.clone(),
})
- .expect("leaf introspection should serialize")
+ .expect("serialize")
.to_vec()
} else {
to_bytes::(&EndpointIntrospection {
@@ -554,7 +361,7 @@ impl Endpoint {
})
.collect(),
})
- .expect("endpoint introspection should serialize")
+ .expect("serialize")
.to_vec()
};
@@ -583,21 +390,13 @@ impl Endpoint {
header: PacketHeader,
message: DataMessage,
) -> Result {
- let key = HookKey::new(
- self.path.clone(),
- header.hook_id.expect("validated hook id"),
- );
+ let key = HookKey::new(self.path.clone(), header.hook_id.expect("validated"));
if self.hooks.active(&key).is_none() {
- let pending_matches = self
- .hooks
- .pending(&key)
- .map(|pending| {
- pending.caller_src_path == header.src_path
- && pending.procedure_id == message.procedure_id
- })
- .unwrap_or(false);
- if pending_matches {
+ let matches = self.hooks.pending(&key).is_some_and(|p| {
+ p.caller_src_path == header.src_path && p.procedure_id == message.procedure_id
+ });
+ if matches {
self.hooks.activate_pending(&key, header.src_path.clone());
}
}
@@ -632,13 +431,40 @@ impl Endpoint {
if message.end_hook {
self.hooks.remove_active(&key);
}
-
Ok(EndpointOutcome {
events: vec![LocalEvent::Data { header, message }],
..EndpointOutcome::default()
})
}
+ fn handle_local_fault(
+ &mut self,
+ header: PacketHeader,
+ message: FaultMessage,
+ ) -> Result {
+ let key = HookKey::new(self.path.clone(), header.hook_id.expect("validated"));
+ let matches = self
+ .hooks
+ .active(&key)
+ .is_some_and(|a| a.peer_path == header.src_path)
+ || self
+ .hooks
+ .pending(&key)
+ .is_some_and(|p| p.caller_src_path == header.src_path);
+ if !matches {
+ return Ok(EndpointOutcome {
+ dropped: true,
+ ..EndpointOutcome::default()
+ });
+ }
+ self.hooks.remove_active(&key);
+ self.hooks.remove_pending(&key);
+ Ok(EndpointOutcome {
+ events: vec![LocalEvent::Fault { header, message }],
+ ..EndpointOutcome::default()
+ })
+ }
+
fn emit_fault_if_possible(
&mut self,
key: Option,
@@ -659,8 +485,7 @@ impl Endpoint {
dst_leaf: None,
hook_id: Some(key.hook_id),
};
- let message = FaultMessage { fault };
- let frame = encode_packet(&header, &message)?;
+ let frame = encode_packet(&header, &FaultMessage { fault })?;
Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Parent, frame)],
..EndpointOutcome::default()
@@ -671,8 +496,8 @@ impl Endpoint {
let child_paths: Vec> = self
.children
.iter()
- .filter(|child| child.state == ConnectionState::Registered)
- .map(|child| child.path.clone())
+ .filter(|c| c.state == ConnectionState::Registered)
+ .map(|c| c.path.clone())
.collect();
route_destination(
&self.path,
@@ -687,107 +512,79 @@ impl Endpoint {
Ingress::Parent => self
.parent_path
.as_ref()
- .map_or(self.path.is_empty(), |path| path == src_path),
+ .map_or(self.path.is_empty(), |p| p == src_path),
Ingress::Child(path) => path == src_path,
Ingress::Local => src_path == self.path,
}
}
}
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::protocol::introspection::ArchivedEndpointIntrospection;
- use crate::protocol::{HookTarget, deserialize_archived_bytes};
+impl Endpoint for ProtocolEndpoint {
+ fn path(&self) -> &[String] {
+ &self.path
+ }
- fn echo_leaf() -> LeafSpec {
- LeafSpec {
- name: String::from("echo"),
- procedures: vec![String::from("org.product.v1.echo.roundtrip")],
- behavior: LeafBehavior::Echo,
+ fn receive(
+ &mut self,
+ ingress: &Ingress,
+ frame: FrameBytes,
+ ) -> Result {
+ let parsed = decode_frame(&frame)?;
+ let header = parsed.deserialize_header();
+ validate_header(&header)?;
+ if !self.valid_source_for_ingress(ingress, &header.src_path) {
+ return Ok(EndpointOutcome {
+ dropped: true,
+ ..EndpointOutcome::default()
+ });
+ }
+
+ match header.packet_type {
+ PacketType::Call => {
+ let message = parsed.deserialize_call()?;
+ if !matches!(ingress, Ingress::Parent | Ingress::Local) {
+ return Ok(EndpointOutcome {
+ dropped: true,
+ ..EndpointOutcome::default()
+ });
+ }
+ validate_call(&header, &message)?;
+ match self.decide_route(&header.dst_path) {
+ RouteDecision::Child(idx) => Ok(EndpointOutcome {
+ forwards: vec![(RouteDecision::Child(idx), frame)],
+ ..EndpointOutcome::default()
+ }),
+ RouteDecision::Parent => Ok(EndpointOutcome {
+ forwards: vec![(RouteDecision::Parent, frame)],
+ ..EndpointOutcome::default()
+ }),
+ RouteDecision::Drop => Ok(EndpointOutcome {
+ dropped: true,
+ ..EndpointOutcome::default()
+ }),
+ RouteDecision::Local => self.handle_local_call(header, message),
+ }
+ }
+ PacketType::Data => {
+ let message = parsed.deserialize_data()?;
+ match self.decide_route(&header.dst_path) {
+ RouteDecision::Local => self.handle_local_data(header, message),
+ _ => Ok(EndpointOutcome {
+ dropped: true,
+ ..EndpointOutcome::default()
+ }),
+ }
+ }
+ PacketType::Fault => {
+ let message = parsed.deserialize_fault()?;
+ match self.decide_route(&header.dst_path) {
+ RouteDecision::Local => self.handle_local_fault(header, message),
+ _ => Ok(EndpointOutcome {
+ dropped: true,
+ ..EndpointOutcome::default()
+ }),
+ }
+ }
}
}
-
- #[test]
- fn introspection_returns_payload_and_clears_hook() {
- let mut child = Endpoint::new(
- vec![String::from("child")],
- Some(Vec::new()),
- Vec::new(),
- vec![echo_leaf()],
- );
- let header = PacketHeader {
- packet_type: PacketType::Call,
- src_path: Vec::new(),
- dst_path: vec![String::from("child")],
- dst_leaf: None,
- hook_id: None,
- };
- let call = CallMessage {
- procedure_id: String::new(),
- data: Vec::new(),
- response_hook: Some(HookTarget {
- hook_id: 1,
- return_path: Vec::new(),
- }),
- };
-
- let outcome = child
- .receive(
- &Ingress::Parent,
- encode_packet(&header, &call).expect("frame"),
- )
- .expect("receive should succeed");
- let (_, frame) = outcome
- .forwards
- .first()
- .expect("forwarded frame should exist");
- let parsed = decode_frame(frame).expect("data frame");
- let data = parsed.deserialize_data().expect("data payload");
- let payload = deserialize_archived_bytes::<
- ArchivedEndpointIntrospection,
- EndpointIntrospection,
- >(&data.data)
- .expect("introspection payload");
- assert_eq!(payload.leaves.len(), 1);
- assert_eq!(child.hooks().active_len(), 0);
- }
-
- #[test]
- fn invalid_peer_generates_local_fault_event() {
- let mut root = Endpoint::new(Vec::new(), None, Vec::new(), Vec::new());
- let _call = root
- .make_call(
- vec![String::from("child")],
- None,
- String::from("org.product.v1.echo.roundtrip"),
- Some(7),
- Vec::new(),
- )
- .expect("call should encode");
- let frame = root
- .make_data(
- Vec::new(),
- 7,
- String::from("org.product.v1.echo.roundtrip"),
- b"bad".to_vec(),
- false,
- )
- .expect("data should encode");
- let parsed = decode_frame(&frame).expect("frame should decode");
- let mut header = parsed.deserialize_header();
- header.src_path = vec![String::from("other")];
- let bad_frame = encode_packet(
- &header,
- &parsed.deserialize_data().expect("data should decode"),
- )
- .expect("bad frame should encode");
- let outcome = root
- .receive(&Ingress::Child(vec![String::from("other")]), bad_frame)
- .expect("receive should work");
- assert!(matches!(
- outcome.events.first(),
- Some(LocalEvent::Fault { .. })
- ));
- }
}
diff --git a/src/tree/hook.rs b/src/protocol/tree/hook.rs
similarity index 77%
rename from src/tree/hook.rs
rename to src/protocol/tree/hook.rs
index ae4599a..ccc4b34 100644
--- a/src/tree/hook.rs
+++ b/src/protocol/tree/hook.rs
@@ -12,7 +12,6 @@ pub struct HookKey {
}
impl HookKey {
- /// Creates a new hook key.
pub fn new(return_path: Vec, hook_id: u64) -> Self {
Self {
return_path,
@@ -24,32 +23,21 @@ impl HookKey {
/// Pending hook context created by a received call.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PendingHook {
- /// Original caller path.
pub caller_src_path: Vec,
- /// Hook host path.
pub return_path: Vec,
- /// Hook identifier.
pub hook_id: u64,
- /// Procedure anchored to the call.
pub procedure_id: String,
- /// Destination leaf from the call.
pub dst_leaf: Option,
}
/// Active hook context used for ordinary data traffic.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ActiveHook {
- /// Path of the endpoint hosting the hook.
pub return_path: Vec,
- /// Hook identifier.
pub hook_id: u64,
- /// Expected direct peer for hook traffic.
pub peer_path: Vec,
- /// Procedure bound to the hook.
pub procedure_id: String,
- /// Original destination leaf.
pub dst_leaf: Option,
- /// Whether the peer has indicated completion.
pub peer_finished: bool,
}
@@ -61,7 +49,6 @@ pub struct HookTable {
}
impl HookTable {
- /// Allocates the lowest inactive hook id for a return path.
pub fn allocate_hook_id(&self, return_path: &[String]) -> u64 {
let mut hook_id = 0u64;
loop {
@@ -73,22 +60,18 @@ impl HookTable {
}
}
- /// Inserts pending hook state.
pub fn insert_pending(&mut self, pending: PendingHook) {
// WARNING: hook tables intentionally own their path and procedure strings.
- // Hook state must outlive any individual frame buffer, so borrowing framed
- // transport memory here would be unsound.
+ // Hook state must outlive any individual frame buffer.
let key = HookKey::new(pending.return_path.clone(), pending.hook_id);
self.pending.insert(key, pending);
}
- /// Inserts active hook state.
pub fn insert_active(&mut self, active: ActiveHook) {
let key = HookKey::new(active.return_path.clone(), active.hook_id);
self.active.insert(key, active);
}
- /// Promotes pending hook state to active state.
pub fn activate_pending(&mut self, key: &HookKey, peer_path: Vec) -> Option<()> {
let pending = self.pending.remove(key)?;
self.active.insert(
@@ -105,37 +88,30 @@ impl HookTable {
Some(())
}
- /// Removes pending state.
pub fn remove_pending(&mut self, key: &HookKey) -> Option {
self.pending.remove(key)
}
- /// Removes active state.
pub fn remove_active(&mut self, key: &HookKey) -> Option {
self.active.remove(key)
}
- /// Returns pending state.
pub fn pending(&self, key: &HookKey) -> Option<&PendingHook> {
self.pending.get(key)
}
- /// Returns active state.
pub fn active(&self, key: &HookKey) -> Option<&ActiveHook> {
self.active.get(key)
}
- /// Returns mutable active state.
pub fn active_mut(&mut self, key: &HookKey) -> Option<&mut ActiveHook> {
self.active.get_mut(key)
}
- /// Returns the number of pending hooks.
pub fn pending_len(&self) -> usize {
self.pending.len()
}
- /// Returns the number of active hooks.
pub fn active_len(&self) -> usize {
self.active.len()
}
diff --git a/src/tree/mod.rs b/src/protocol/tree/mod.rs
similarity index 62%
rename from src/tree/mod.rs
rename to src/protocol/tree/mod.rs
index fc593ca..a436512 100644
--- a/src/tree/mod.rs
+++ b/src/protocol/tree/mod.rs
@@ -6,7 +6,10 @@ mod routing;
pub use endpoint::{
ChildRoute, ConnectionState, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafBehavior,
- LeafSpec, LocalEvent,
+ LeafSpec, LocalEvent, ProtocolEndpoint,
};
pub use hook::{ActiveHook, HookKey, HookTable, PendingHook};
-pub use routing::{LeafNode, RouteDecision, TreeNode, is_prefix, route_destination};
+pub use routing::{
+ DefaultRouteProvider, LeafNode, RouteDecision, RouteProvider, TreeNode, is_prefix,
+ route_destination,
+};
diff --git a/src/tree/routing.rs b/src/protocol/tree/routing.rs
similarity index 68%
rename from src/tree/routing.rs
rename to src/protocol/tree/routing.rs
index c54c8ba..3c27d8c 100644
--- a/src/tree/routing.rs
+++ b/src/protocol/tree/routing.rs
@@ -2,7 +2,7 @@
use alloc::{string::String, vec::Vec};
-/// Explicit test tree declaration.
+/// Explicit test tree declaration used for configuration.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TreeNode {
/// The tree root.
@@ -67,7 +67,7 @@ pub enum RouteDecision {
Drop,
}
-/// Returns `true` if `prefix` is a prefix of `path`.
+/// Returns `true` if `prefix` is a path prefix of `path`.
pub fn is_prefix(prefix: &[String], path: &[String]) -> bool {
prefix.len() <= path.len()
&& prefix
@@ -76,30 +76,56 @@ pub fn is_prefix(prefix: &[String], path: &[String]) -> bool {
.all(|(left, right)| left == right)
}
-/// Routes a destination path using the protocol's longest-prefix rule.
+/// Trait for resolving a destination path to a routing decision.
+pub trait RouteProvider {
+ /// Computes the routing decision for a destination path.
+ fn route_destination(
+ &self,
+ local_path: &[String],
+ child_paths: &[Vec],
+ has_parent: bool,
+ dst_path: &[String],
+ ) -> RouteDecision;
+}
+
+/// Default routing implementation using the protocol's longest-prefix rule.
+pub struct DefaultRouteProvider;
+
+impl RouteProvider for DefaultRouteProvider {
+ fn route_destination(
+ &self,
+ local_path: &[String],
+ child_paths: &[Vec],
+ has_parent: bool,
+ dst_path: &[String],
+ ) -> RouteDecision {
+ let child = child_paths
+ .iter()
+ .enumerate()
+ .filter(|(_, child_path)| is_prefix(child_path, dst_path))
+ .max_by_key(|(_, child_path)| child_path.len())
+ .map(|(index, _)| index);
+
+ if let Some(index) = child {
+ return RouteDecision::Child(index);
+ }
+ if local_path == dst_path {
+ return RouteDecision::Local;
+ }
+ if has_parent && !is_prefix(local_path, dst_path) {
+ return RouteDecision::Parent;
+ }
+ RouteDecision::Drop
+ }
+}
+
pub fn route_destination(
local_path: &[String],
child_paths: &[Vec],
has_parent: bool,
dst_path: &[String],
) -> RouteDecision {
- let child = child_paths
- .iter()
- .enumerate()
- .filter(|(_, child_path)| is_prefix(child_path, dst_path))
- .max_by_key(|(_, child_path)| child_path.len())
- .map(|(index, _)| index);
-
- if let Some(index) = child {
- return RouteDecision::Child(index);
- }
- if local_path == dst_path {
- return RouteDecision::Local;
- }
- if has_parent && !is_prefix(local_path, dst_path) {
- return RouteDecision::Parent;
- }
- RouteDecision::Drop
+ DefaultRouteProvider.route_destination(local_path, child_paths, has_parent, dst_path)
}
#[cfg(test)]
@@ -109,12 +135,13 @@ mod tests {
#[test]
fn longest_prefix_wins() {
+ let provider = DefaultRouteProvider;
let children = vec![
vec![String::from("a")],
vec![String::from("a"), String::from("b")],
];
assert_eq!(
- route_destination(
+ provider.route_destination(
&Vec::::new(),
&children,
false,
diff --git a/src/protocol/types.rs b/src/protocol/types.rs
index 82cbe33..9bcc937 100644
--- a/src/protocol/types.rs
+++ b/src/protocol/types.rs
@@ -1,4 +1,7 @@
-//! Archived protocol message types.
+//! Canonical UnShell protocol message types.
+//!
+//! These types define the wire format and are designed for zero-copy
+//! access via `rkyv`.
use alloc::{string::String, vec::Vec};
use rkyv::{Archive, Deserialize, Serialize};
diff --git a/src/protocol/validation.rs b/src/protocol/validation.rs
index ebae7c3..9551b76 100644
--- a/src/protocol/validation.rs
+++ b/src/protocol/validation.rs
@@ -1,10 +1,9 @@
//! Stateless protocol validation.
-use core::fmt;
-
use crate::protocol::{
CallMessage, PacketHeader, PacketType, introspection::INTROSPECTION_PROCEDURE_ID,
};
+use core::fmt;
/// Validation failures for protocol structures.
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -27,14 +26,9 @@ impl fmt::Display for ValidationError {
}
}
-#[cfg(feature = "std")]
-impl std::error::Error for ValidationError {}
+impl core::error::Error for ValidationError {}
/// Validates packet header invariants from the protocol.
-///
-/// # Errors
-///
-/// Returns [`ValidationError`] when the header shape does not match the packet type.
pub fn validate_header(header: &PacketHeader) -> Result<(), ValidationError> {
match header.packet_type {
PacketType::Call => {
@@ -57,15 +51,10 @@ pub fn validate_header(header: &PacketHeader) -> Result<(), ValidationError> {
}
}
}
-
Ok(())
}
/// Validates the canonical dotted `procedure_id` shape.
-///
-/// # Errors
-///
-/// Returns [`ValidationError`] when the procedure id does not match the required format.
pub fn validate_procedure_id(procedure_id: &str) -> Result<(), ValidationError> {
if procedure_id == INTROSPECTION_PROCEDURE_ID {
return Ok(());
@@ -114,10 +103,6 @@ pub fn validate_procedure_id(procedure_id: &str) -> Result<(), ValidationError>
}
/// Validates call-specific invariants that depend on both header and payload.
-///
-/// # Errors
-///
-/// Returns [`ValidationError`] when the call payload conflicts with the header.
pub fn validate_call(header: &PacketHeader, call: &CallMessage) -> Result<(), ValidationError> {
validate_procedure_id(&call.procedure_id)?;
@@ -141,49 +126,3 @@ pub fn validate_call(header: &PacketHeader, call: &CallMessage) -> Result<(), Va
fn is_portable_procedure_char(ch: char) -> bool {
ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_'
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::protocol::{HookTarget, PacketType};
- use alloc::{string::String, vec};
-
- #[test]
- fn rejects_invalid_data_header() {
- let header = PacketHeader {
- packet_type: PacketType::Data,
- src_path: Vec::new(),
- dst_path: Vec::new(),
- dst_leaf: Some(String::from("leaf")),
- hook_id: None,
- };
- assert!(validate_header(&header).is_err());
- }
-
- #[test]
- fn validates_procedure_id_shape() {
- assert!(validate_procedure_id("org.product.v1.demo.echo").is_ok());
- assert!(validate_procedure_id("org.product.v01.demo.echo").is_err());
- assert!(validate_procedure_id("Org.product.v1.demo.echo").is_err());
- }
-
- #[test]
- fn validates_response_hook_return_path() {
- let header = PacketHeader {
- packet_type: PacketType::Call,
- src_path: vec![String::from("src")],
- dst_path: vec![String::from("dst")],
- dst_leaf: None,
- hook_id: None,
- };
- let call = CallMessage {
- procedure_id: String::from("org.product.v1.demo.echo"),
- data: Vec::new(),
- response_hook: Some(HookTarget {
- hook_id: 1,
- return_path: vec![String::from("other")],
- }),
- };
- assert!(validate_call(&header, &call).is_err());
- }
-}
diff --git a/src/transport/channel.rs b/src/transport/channel.rs
deleted file mode 100644
index 5693778..0000000
--- a/src/transport/channel.rs
+++ /dev/null
@@ -1,77 +0,0 @@
-//! Simulated transport built on `crossbeam-channel`.
-
-use crossbeam_channel::{Receiver, Sender, unbounded};
-
-use crate::{
- protocol::FrameBytes,
- transport::{Transport, TransportError},
-};
-
-/// One endpoint of a simulated duplex transport.
-#[derive(Debug, Clone)]
-pub struct ChannelTransport {
- sender: Sender,
- receiver: Receiver,
-}
-
-impl ChannelTransport {
- /// Builds a connected pair of transports.
- pub fn pair() -> (Self, Self) {
- let (ab_tx, ab_rx) = unbounded();
- let (ba_tx, ba_rx) = unbounded();
- (
- Self {
- sender: ab_tx,
- receiver: ba_rx,
- },
- Self {
- sender: ba_tx,
- receiver: ab_rx,
- },
- )
- }
-}
-
-impl Transport for ChannelTransport {
- fn send_frame(&mut self, frame: FrameBytes) -> Result<(), TransportError> {
- self.sender
- .send(frame)
- .map_err(|_| TransportError::ChannelClosed)
- }
-
- fn recv_frame(&mut self) -> Result {
- self.receiver
- .recv()
- .map_err(|_| TransportError::ChannelClosed)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::protocol::{DataMessage, PacketHeader, PacketType, decode_frame, encode_packet};
- use alloc::{string::String, vec};
-
- #[test]
- fn channel_roundtrip_moves_framed_bytes() {
- let (mut left, mut right) = ChannelTransport::pair();
- let header = PacketHeader {
- packet_type: PacketType::Data,
- src_path: vec![String::from("a")],
- dst_path: vec![String::from("b")],
- dst_leaf: None,
- hook_id: Some(7),
- };
- let data = DataMessage {
- procedure_id: String::from("org.product.v1.echo.roundtrip"),
- data: b"payload".to_vec(),
- end_hook: true,
- };
- let frame = encode_packet(&header, &data).expect("frame should encode");
-
- left.send_frame(frame).expect("send should succeed");
- let received = right.recv_frame().expect("recv should succeed");
- let parsed = decode_frame(&received).expect("received frame should decode");
- assert_eq!(parsed.deserialize_data().expect("data should decode"), data);
- }
-}
diff --git a/src/transport/mod.rs b/src/transport/mod.rs
deleted file mode 100644
index cc159b4..0000000
--- a/src/transport/mod.rs
+++ /dev/null
@@ -1,79 +0,0 @@
-//! Framed transport implementations.
-//!
-//! Transports move complete framed packets represented by [`crate::protocol::FrameBytes`].
-//! Packet parsing and validation live above this layer.
-
-use crate::protocol::FrameBytes;
-
-#[cfg(feature = "sim")]
-pub mod channel;
-#[cfg(feature = "tcp")]
-pub mod tcp;
-
-/// Maximum allowed size for a serialized header section.
-pub const MAX_HEADER_BYTES: usize = 64 * 1024;
-
-/// Maximum allowed size for a serialized payload section.
-pub const MAX_PAYLOAD_BYTES: usize = 64 * 1024 * 1024;
-
-/// Transport-layer failure.
-#[derive(Debug)]
-pub enum TransportError {
- /// The peer disconnected cleanly.
- Disconnected,
- /// The announced header length exceeded the limit.
- HeaderTooLarge(usize, usize),
- /// The announced payload length exceeded the limit.
- PayloadTooLarge(usize, usize),
- /// Underlying I/O failure.
- #[cfg(feature = "tcp")]
- Io(std::io::Error),
- /// Channel send or receive failure.
- #[cfg(feature = "sim")]
- ChannelClosed,
-}
-
-impl core::fmt::Display for TransportError {
- fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
- match self {
- Self::Disconnected => f.write_str("transport disconnected"),
- Self::HeaderTooLarge(got, max) => {
- write!(f, "header too large: {got} bytes (limit {max})")
- }
- Self::PayloadTooLarge(got, max) => {
- write!(f, "payload too large: {got} bytes (limit {max})")
- }
- #[cfg(feature = "tcp")]
- Self::Io(error) => write!(f, "transport I/O error: {error}"),
- #[cfg(feature = "sim")]
- Self::ChannelClosed => f.write_str("channel transport closed"),
- }
- }
-}
-
-#[cfg(feature = "std")]
-impl std::error::Error for TransportError {}
-
-#[cfg(feature = "tcp")]
-impl From for TransportError {
- fn from(value: std::io::Error) -> Self {
- Self::Io(value)
- }
-}
-
-/// Duplex framed transport.
-pub trait Transport: Send {
- /// Sends one complete framed packet.
- ///
- /// # Errors
- ///
- /// Returns [`TransportError`] when the underlying transport cannot deliver the frame.
- fn send_frame(&mut self, frame: FrameBytes) -> Result<(), TransportError>;
-
- /// Receives one complete framed packet.
- ///
- /// # Errors
- ///
- /// Returns [`TransportError`] when the transport disconnects or a frame cannot be read.
- fn recv_frame(&mut self) -> Result;
-}
diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs
deleted file mode 100644
index 151a3c6..0000000
--- a/src/transport/tcp.rs
+++ /dev/null
@@ -1,132 +0,0 @@
-//! TCP framed transport.
-
-use alloc::vec::Vec;
-use std::{
- io::{ErrorKind, Read, Write},
- net::{TcpStream, ToSocketAddrs},
-};
-
-use crate::{
- protocol::FrameBytes,
- transport::{MAX_HEADER_BYTES, MAX_PAYLOAD_BYTES, Transport, TransportError},
-};
-
-/// Framed TCP transport.
-pub struct TcpTransport {
- stream: TcpStream,
-}
-
-impl TcpTransport {
- /// Connects to a remote address.
- ///
- /// # Errors
- ///
- /// Returns [`TransportError`] when the TCP connection cannot be established.
- pub fn connect(addr: A) -> Result {
- Ok(Self {
- stream: TcpStream::connect(addr)?,
- })
- }
-
- /// Wraps an existing TCP stream.
- pub fn from_stream(stream: TcpStream) -> Self {
- Self { stream }
- }
-}
-
-impl Transport for TcpTransport {
- fn send_frame(&mut self, frame: FrameBytes) -> Result<(), TransportError> {
- self.stream.write_all(&frame).map_err(map_io_error)
- }
-
- fn recv_frame(&mut self) -> Result {
- let header_len = read_u32(&mut self.stream)?;
- if header_len > MAX_HEADER_BYTES {
- return Err(TransportError::HeaderTooLarge(header_len, MAX_HEADER_BYTES));
- }
-
- let mut header = vec![0u8; header_len];
- read_exact(&mut self.stream, &mut header)?;
-
- let payload_len = read_u32(&mut self.stream)?;
- if payload_len > MAX_PAYLOAD_BYTES {
- return Err(TransportError::PayloadTooLarge(
- payload_len,
- MAX_PAYLOAD_BYTES,
- ));
- }
-
- let mut payload = vec![0u8; payload_len];
- read_exact(&mut self.stream, &mut payload)?;
-
- let mut frame = Vec::with_capacity(8 + header_len + payload_len);
- frame.extend_from_slice(&(header_len as u32).to_be_bytes());
- frame.extend_from_slice(&header);
- frame.extend_from_slice(&(payload_len as u32).to_be_bytes());
- frame.extend_from_slice(&payload);
- Ok(frame.into_boxed_slice())
- }
-}
-
-fn read_u32(stream: &mut TcpStream) -> Result {
- let mut bytes = [0u8; 4];
- read_exact(stream, &mut bytes)?;
- Ok(u32::from_be_bytes(bytes) as usize)
-}
-
-fn read_exact(stream: &mut TcpStream, buffer: &mut [u8]) -> Result<(), TransportError> {
- stream.read_exact(buffer).map_err(map_io_error)
-}
-
-fn map_io_error(error: std::io::Error) -> TransportError {
- match error.kind() {
- ErrorKind::UnexpectedEof | ErrorKind::BrokenPipe | ErrorKind::ConnectionReset => {
- TransportError::Disconnected
- }
- _ => TransportError::Io(error),
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::protocol::{DataMessage, PacketHeader, PacketType, decode_frame, encode_packet};
- use alloc::{string::String, vec};
- use std::{net::TcpListener, thread};
-
- #[test]
- fn tcp_roundtrip_preserves_frame() {
- let listener = TcpListener::bind("127.0.0.1:0").expect("bind should succeed");
- let addr = listener.local_addr().expect("local address should exist");
-
- let header = PacketHeader {
- packet_type: PacketType::Data,
- src_path: vec![String::from("a")],
- dst_path: vec![String::from("b")],
- dst_leaf: None,
- hook_id: Some(9),
- };
- let payload = DataMessage {
- procedure_id: String::from("org.product.v1.echo.roundtrip"),
- data: b"payload".to_vec(),
- end_hook: true,
- };
- let frame = encode_packet(&header, &payload).expect("frame should encode");
-
- let sender = thread::spawn(move || {
- let mut transport = TcpTransport::connect(addr).expect("connect should succeed");
- transport.send_frame(frame).expect("send should succeed");
- });
-
- let (stream, _) = listener.accept().expect("accept should succeed");
- let mut transport = TcpTransport::from_stream(stream);
- let received = transport.recv_frame().expect("recv should succeed");
- let parsed = decode_frame(&received).expect("frame should decode");
-
- sender.join().expect("sender should not panic");
- assert_eq!(
- parsed.deserialize_data().expect("data should decode"),
- payload
- );
- }
-}