mirror of
https://github.com/Astatin3/unshell.git
synced 2026-06-08 22:38:01 -06:00
Reorganize protocol.
This commit is contained in:
+10
-34
@@ -1,44 +1,20 @@
|
||||
//! UnShell core protocol crate.
|
||||
//! # UnShell Core
|
||||
//!
|
||||
//! The crate now models the draft protocol in `PROTOCOL.md` directly:
|
||||
//! This crate implements the UnShell protocol as a pure, `no_std` library.
|
||||
//! It provides a trait-based architecture for routed endpoint communication
|
||||
//! using an explicit tree topology.
|
||||
//!
|
||||
//! - [`protocol`] provides the canonical wire types, framing helpers, validation,
|
||||
//! and introspection payloads.
|
||||
//! - [`tree`] provides an explicit enum-based tree declaration, longest-prefix
|
||||
//! routing helpers, and a small endpoint runtime for tests.
|
||||
//! - [`transport`] provides framed transport implementations for simulated
|
||||
//! channel-based links and TCP links.
|
||||
//! - [`logger`] remains available for lightweight logging.
|
||||
//! ## Architecture
|
||||
//!
|
||||
//! ```rust
|
||||
//! use unshell::protocol::{CallMessage, HookTarget, PacketHeader, PacketType, encode_packet};
|
||||
//! - [`protocol`] - Wire types, framing, stateless validation, routing/runtime, and implementation traits.
|
||||
//!
|
||||
//! let header = PacketHeader {
|
||||
//! packet_type: PacketType::Call,
|
||||
//! src_path: Vec::new(),
|
||||
//! dst_path: vec!["child".into()],
|
||||
//! dst_leaf: Some("echo".into()),
|
||||
//! hook_id: None,
|
||||
//! };
|
||||
//! let call = CallMessage {
|
||||
//! procedure_id: "org.product.v1.echo.roundtrip".into(),
|
||||
//! data: b"ping".to_vec(),
|
||||
//! response_hook: Some(HookTarget {
|
||||
//! hook_id: 1,
|
||||
//! return_path: Vec::new(),
|
||||
//! }),
|
||||
//! };
|
||||
//!
|
||||
//! let frame = encode_packet(&header, &call).expect("call should encode");
|
||||
//! assert!(!frame.is_empty());
|
||||
//! ```
|
||||
//! The library requires `alloc` for path and payload management.
|
||||
|
||||
#![no_std]
|
||||
|
||||
#![cfg_attr(not(feature = "std"), no_std)]
|
||||
extern crate alloc;
|
||||
|
||||
pub mod logger;
|
||||
pub mod protocol;
|
||||
pub mod transport;
|
||||
pub mod tree;
|
||||
|
||||
pub use ush_obfuscate as obfuscate;
|
||||
// pub use ush_obfuscate as obfuscate;
|
||||
|
||||
+38
-65
@@ -1,14 +1,21 @@
|
||||
//! # Logger Module
|
||||
//!
|
||||
//! A lightweight, no_std-compatible logging system.
|
||||
//! A lightweight global logging system for core-only environments.
|
||||
//!
|
||||
//! ## Usage
|
||||
//!
|
||||
//! ```rust
|
||||
//! use unshell::{info, warn, error};
|
||||
//! use unshell::logger::Logger;
|
||||
//! use unshell::logger::{Logger, Record};
|
||||
//!
|
||||
//! struct Sink;
|
||||
//! impl Logger for Sink {
|
||||
//! fn log(&self, _record: &Record<'_>) {}
|
||||
//! }
|
||||
//!
|
||||
//! static LOGGER: Sink = Sink;
|
||||
//! unshell::logger::set_logger(&LOGGER);
|
||||
//!
|
||||
//! // Uses the default (no-op) logger until one is installed.
|
||||
//! info!("Starting up");
|
||||
//! warn!("Something is off");
|
||||
//! error!("Critical failure");
|
||||
@@ -18,19 +25,25 @@
|
||||
//!
|
||||
//! Call [`set_logger`] with any type that implements [`Logger`]:
|
||||
//!
|
||||
//! ```rust,no_run
|
||||
//! ```rust
|
||||
//! use unshell::logger::{Logger, LogLevel, Record, set_logger};
|
||||
//!
|
||||
//! struct StdoutLogger;
|
||||
//! impl Logger for StdoutLogger {
|
||||
//! struct MemoryLogger {
|
||||
//! min_level: LogLevel,
|
||||
//! }
|
||||
//!
|
||||
//! impl Logger for MemoryLogger {
|
||||
//! fn log(&self, record: &Record<'_>) {
|
||||
//! // In a no_std environment you would use the `unix-print` crate
|
||||
//! // or write to a pre-opened file descriptor.
|
||||
//! let _ = record; // placeholder
|
||||
//! if record.level < self.min_level {
|
||||
//! return;
|
||||
//! }
|
||||
//! let _ = record;
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! static MY_LOGGER: StdoutLogger = StdoutLogger;
|
||||
//! static MY_LOGGER: MemoryLogger = MemoryLogger {
|
||||
//! min_level: LogLevel::Info,
|
||||
//! };
|
||||
//! set_logger(&MY_LOGGER);
|
||||
//! ```
|
||||
//!
|
||||
@@ -41,7 +54,7 @@
|
||||
//! because:
|
||||
//!
|
||||
//! 1. The payload is single-threaded.
|
||||
//! 2. The router and CLI set the logger before spawning node threads.
|
||||
//! 2. Integrators install the logger before concurrent execution begins.
|
||||
//!
|
||||
//! If you need to change the logger after threads start, synchronise access
|
||||
//! with a `Mutex` or an atomic pointer in your logger implementation.
|
||||
@@ -107,8 +120,8 @@ pub struct Record<'a> {
|
||||
|
||||
/// A sink for log records.
|
||||
///
|
||||
/// Implement this to direct log output wherever you want (stdout, a file,
|
||||
/// a TCP connection, a memory buffer for tests).
|
||||
/// Implement this to direct log output wherever you want, such as a device
|
||||
/// sink, a ring buffer, or a test collector.
|
||||
pub trait Logger: Sync {
|
||||
/// Receive and process a log record.
|
||||
fn log(&self, record: &Record<'_>);
|
||||
@@ -129,7 +142,7 @@ impl Logger for NullLogger {
|
||||
/// Written once at startup via [`set_logger`], then only read.
|
||||
/// # Safety
|
||||
/// This is `static mut` to avoid a dependency on synchronisation primitives
|
||||
/// in a no_std context. It is safe as long as `set_logger` is called before
|
||||
/// in a core-only context. It is safe as long as `set_logger` is called before
|
||||
/// any threads are spawned (see module-level docs).
|
||||
static mut GLOBAL_LOGGER: &dyn Logger = &NullLogger;
|
||||
|
||||
@@ -189,73 +202,33 @@ pub fn log(level: LogLevel, message: &str, file: Option<&'static str>, line: Opt
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// A minimal stdout logger for use in std binaries (router, CLI)
|
||||
// A minimal compatibility logger
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// A simple logger that prints to stderr.
|
||||
/// A simple filter-only logger.
|
||||
///
|
||||
/// Suitable for the router and operator CLI binaries.
|
||||
/// Do not use in the payload binary (which may not have stderr available).
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// use unshell::logger::{StderrLogger, set_logger};
|
||||
///
|
||||
/// static LOGGER: StderrLogger = StderrLogger::new(unshell::logger::LogLevel::Info);
|
||||
/// set_logger(&LOGGER);
|
||||
/// ```
|
||||
pub struct StderrLogger {
|
||||
/// Minimum level to log. Records below this level are discarded.
|
||||
/// This provides a small compatibility surface for installations that want a
|
||||
/// concrete logger type without defining their own sink yet.
|
||||
pub struct CompatibilityLogger {
|
||||
/// Minimum level to accept. Records below this level are discarded.
|
||||
min_level: LogLevel,
|
||||
}
|
||||
|
||||
impl StderrLogger {
|
||||
/// Create a new `StderrLogger` that logs records at `min_level` and above.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust
|
||||
/// use unshell::logger::{StderrLogger, LogLevel};
|
||||
/// let logger = StderrLogger::new(LogLevel::Info);
|
||||
/// ```
|
||||
impl CompatibilityLogger {
|
||||
/// Create a new `CompatibilityLogger` that accepts records at `min_level`
|
||||
/// and above.
|
||||
#[must_use]
|
||||
pub const fn new(min_level: LogLevel) -> Self {
|
||||
Self { min_level }
|
||||
}
|
||||
}
|
||||
|
||||
impl Logger for StderrLogger {
|
||||
impl Logger for CompatibilityLogger {
|
||||
fn log(&self, record: &Record<'_>) {
|
||||
if record.level < self.min_level {
|
||||
return;
|
||||
}
|
||||
// eprintln! and String require std (available only with the `tcp` feature).
|
||||
// In no_std builds this method is a no-op. The payload uses a different
|
||||
// logger (or the null logger) in no_std contexts.
|
||||
#[cfg(feature = "tcp")]
|
||||
{
|
||||
use alloc::string::String;
|
||||
let location = match (record.file, record.line) {
|
||||
(Some(f), Some(l)) => {
|
||||
let mut s = String::from(f);
|
||||
s.push(':');
|
||||
s.push_str(&format!("{l}"));
|
||||
s
|
||||
}
|
||||
_ => String::new(),
|
||||
};
|
||||
if location.is_empty() {
|
||||
eprintln!("[{}] {}", record.level.as_str(), record.message);
|
||||
} else {
|
||||
eprintln!(
|
||||
"[{}] {} - {}",
|
||||
record.level.as_str(),
|
||||
record.message,
|
||||
location
|
||||
);
|
||||
}
|
||||
}
|
||||
let _ = record;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+112
-132
@@ -1,10 +1,13 @@
|
||||
//! Framed packet encoding and decoding.
|
||||
//!
|
||||
//! This module provides the `FrameCodec` trait, which abstracts the conversion
|
||||
//! between owned packet structures and the canonical length-prefixed wire format.
|
||||
|
||||
use alloc::{boxed::Box, vec::Vec};
|
||||
use core::fmt;
|
||||
use rkyv::{Serialize, access, deserialize, rancor::Error, to_bytes, util::AlignedVec};
|
||||
|
||||
use crate::protocol::types::{
|
||||
use super::types::{
|
||||
ArchivedCallMessage, ArchivedDataMessage, ArchivedFaultMessage, ArchivedPacketHeader,
|
||||
};
|
||||
use crate::protocol::{CallMessage, DataMessage, FaultMessage, PacketHeader, PacketType};
|
||||
@@ -39,154 +42,162 @@ impl fmt::Display for FrameError {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl std::error::Error for FrameError {}
|
||||
impl core::error::Error for FrameError {}
|
||||
|
||||
/// Borrowed view over a framed packet.
|
||||
/// A view into a framed packet, providing access to archived sections.
|
||||
pub struct ParsedFrame<'a> {
|
||||
header: PacketHeader,
|
||||
payload_bytes: &'a [u8],
|
||||
}
|
||||
|
||||
impl<'a> ParsedFrame<'a> {
|
||||
/// Returns the decoded header.
|
||||
pub fn header(&self) -> &PacketHeader {
|
||||
&self.header
|
||||
}
|
||||
|
||||
/// Returns the packet type.
|
||||
pub fn packet_type(&self) -> PacketType {
|
||||
self.header.packet_type
|
||||
}
|
||||
|
||||
/// Returns the raw payload byte section.
|
||||
pub fn payload_bytes(&self) -> &'a [u8] {
|
||||
self.payload_bytes
|
||||
}
|
||||
|
||||
/// Returns an owned header copy.
|
||||
pub fn deserialize_header(&self) -> PacketHeader {
|
||||
self.header.clone()
|
||||
}
|
||||
|
||||
/// Decodes the payload as a call.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`FrameError`] when the payload bytes are not a valid archived call.
|
||||
pub fn deserialize_call(&self) -> Result<CallMessage, FrameError> {
|
||||
deserialize_archived_bytes::<ArchivedCallMessage, CallMessage>(self.payload_bytes)
|
||||
}
|
||||
|
||||
/// Decodes the payload as data.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`FrameError`] when the payload bytes are not a valid archived data packet.
|
||||
pub fn deserialize_data(&self) -> Result<DataMessage, FrameError> {
|
||||
deserialize_archived_bytes::<ArchivedDataMessage, DataMessage>(self.payload_bytes)
|
||||
}
|
||||
|
||||
/// Decodes the payload as a fault.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`FrameError`] when the payload bytes are not a valid archived fault.
|
||||
pub fn deserialize_fault(&self) -> Result<FaultMessage, FrameError> {
|
||||
deserialize_archived_bytes::<ArchivedFaultMessage, FaultMessage>(self.payload_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
/// Encodes a packet header and payload into the canonical framed representation.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`FrameError`] when serialization fails or a framed section exceeds the wire limit.
|
||||
/// Trait for framing and unframing packets.
|
||||
pub trait FrameCodec {
|
||||
/// Encodes a packet header and payload into the canonical framed representation.
|
||||
fn encode_packet<P>(header: &PacketHeader, payload: &P) -> Result<FrameBytes, FrameError>
|
||||
where
|
||||
P: for<'a> Serialize<
|
||||
rkyv::api::high::HighSerializer<
|
||||
AlignedVec,
|
||||
rkyv::ser::allocator::ArenaHandle<'a>,
|
||||
Error,
|
||||
>,
|
||||
>;
|
||||
|
||||
/// Decodes a framed packet into a borrowed parsed view.
|
||||
fn decode_frame(bytes: &[u8]) -> Result<ParsedFrame<'_>, FrameError>;
|
||||
}
|
||||
|
||||
/// Default implementation of the `FrameCodec` using `rkyv`.
|
||||
pub struct RkyvCodec;
|
||||
|
||||
impl FrameCodec for RkyvCodec {
|
||||
fn encode_packet<P>(header: &PacketHeader, payload: &P) -> Result<FrameBytes, FrameError>
|
||||
where
|
||||
P: for<'a> Serialize<
|
||||
rkyv::api::high::HighSerializer<
|
||||
AlignedVec,
|
||||
rkyv::ser::allocator::ArenaHandle<'a>,
|
||||
Error,
|
||||
>,
|
||||
>,
|
||||
{
|
||||
// WARNING: framed packets move as one contiguous buffer across the core boundary.
|
||||
// Keeping ownership here avoids hidden copies later in routing code.
|
||||
let header_bytes = to_bytes::<Error>(header).map_err(FrameError::Serialize)?;
|
||||
let payload_bytes = to_bytes::<Error>(payload).map_err(FrameError::Serialize)?;
|
||||
let header_len =
|
||||
u32::try_from(header_bytes.len()).map_err(|_| FrameError::LengthOverflow)?;
|
||||
let payload_len =
|
||||
u32::try_from(payload_bytes.len()).map_err(|_| FrameError::LengthOverflow)?;
|
||||
|
||||
let mut frame = Vec::with_capacity(8 + header_bytes.len() + payload_bytes.len());
|
||||
frame.extend_from_slice(&header_len.to_be_bytes());
|
||||
frame.extend_from_slice(&header_bytes);
|
||||
frame.extend_from_slice(&payload_len.to_be_bytes());
|
||||
frame.extend_from_slice(&payload_bytes);
|
||||
Ok(frame.into_boxed_slice())
|
||||
}
|
||||
|
||||
fn decode_frame(bytes: &[u8]) -> Result<ParsedFrame<'_>, FrameError> {
|
||||
if bytes.len() < 8 {
|
||||
return Err(FrameError::Truncated);
|
||||
}
|
||||
|
||||
let header_len = u32::from_be_bytes(
|
||||
bytes
|
||||
.get(0..4)
|
||||
.ok_or(FrameError::Truncated)?
|
||||
.try_into()
|
||||
.expect("slice width checked"),
|
||||
) as usize;
|
||||
let header_start = 4usize;
|
||||
let header_end = header_start + header_len;
|
||||
if header_end + 4 > bytes.len() {
|
||||
return Err(FrameError::Truncated);
|
||||
}
|
||||
|
||||
let payload_len = u32::from_be_bytes(
|
||||
bytes
|
||||
.get(header_end..header_end + 4)
|
||||
.ok_or(FrameError::Truncated)?
|
||||
.try_into()
|
||||
.expect("slice width checked"),
|
||||
) as usize;
|
||||
let payload_start = header_end + 4;
|
||||
let payload_end = payload_start + payload_len;
|
||||
if payload_end != bytes.len() {
|
||||
return Err(FrameError::Truncated);
|
||||
}
|
||||
|
||||
// WARNING: the wire format puts a 4-byte length prefix before each archived section.
|
||||
// That means the section start is not guaranteed to satisfy rkyv's aligned-access
|
||||
// requirements. The header is copied into one temporary `AlignedVec` here because
|
||||
// routing cannot proceed safely without a validated header.
|
||||
let aligned_header = align_section(
|
||||
bytes
|
||||
.get(header_start..header_end)
|
||||
.ok_or(FrameError::Truncated)?,
|
||||
);
|
||||
let archived_header = access::<ArchivedPacketHeader, Error>(&aligned_header)
|
||||
.map_err(FrameError::InvalidHeader)?;
|
||||
let header = deserialize::<PacketHeader, Error>(archived_header)
|
||||
.map_err(FrameError::InvalidHeader)?;
|
||||
|
||||
Ok(ParsedFrame {
|
||||
header,
|
||||
payload_bytes: bytes
|
||||
.get(payload_start..payload_end)
|
||||
.ok_or(FrameError::Truncated)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Encodes a packet header and payload using the default codec.
|
||||
pub fn encode_packet<P>(header: &PacketHeader, payload: &P) -> Result<FrameBytes, FrameError>
|
||||
where
|
||||
P: for<'a> Serialize<
|
||||
rkyv::api::high::HighSerializer<AlignedVec, rkyv::ser::allocator::ArenaHandle<'a>, Error>,
|
||||
>,
|
||||
{
|
||||
// WARNING: the simulated and TCP transports both move complete framed packets.
|
||||
// One owned contiguous buffer at this boundary is therefore intentional and avoids
|
||||
// scattering later hidden copies through routing code.
|
||||
let header_bytes = to_bytes::<Error>(header).map_err(FrameError::Serialize)?;
|
||||
let payload_bytes = to_bytes::<Error>(payload).map_err(FrameError::Serialize)?;
|
||||
let header_len = u32::try_from(header_bytes.len()).map_err(|_| FrameError::LengthOverflow)?;
|
||||
let payload_len = u32::try_from(payload_bytes.len()).map_err(|_| FrameError::LengthOverflow)?;
|
||||
|
||||
let mut frame = Vec::with_capacity(8 + header_bytes.len() + payload_bytes.len());
|
||||
frame.extend_from_slice(&header_len.to_be_bytes());
|
||||
frame.extend_from_slice(&header_bytes);
|
||||
frame.extend_from_slice(&payload_len.to_be_bytes());
|
||||
frame.extend_from_slice(&payload_bytes);
|
||||
Ok(frame.into_boxed_slice())
|
||||
RkyvCodec::encode_packet(header, payload)
|
||||
}
|
||||
|
||||
/// Decodes a framed packet into a borrowed parsed view.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`FrameError`] when the frame is truncated or the header archive is invalid.
|
||||
/// Decodes a framed packet using the default codec.
|
||||
pub fn decode_frame(bytes: &[u8]) -> Result<ParsedFrame<'_>, FrameError> {
|
||||
if bytes.len() < 8 {
|
||||
return Err(FrameError::Truncated);
|
||||
}
|
||||
|
||||
let header_len = u32::from_be_bytes(
|
||||
bytes
|
||||
.get(0..4)
|
||||
.ok_or(FrameError::Truncated)?
|
||||
.try_into()
|
||||
.expect("slice width checked"),
|
||||
) as usize;
|
||||
let header_start = 4usize;
|
||||
let header_end = header_start + header_len;
|
||||
if header_end + 4 > bytes.len() {
|
||||
return Err(FrameError::Truncated);
|
||||
}
|
||||
|
||||
let payload_len = u32::from_be_bytes(
|
||||
bytes
|
||||
.get(header_end..header_end + 4)
|
||||
.ok_or(FrameError::Truncated)?
|
||||
.try_into()
|
||||
.expect("slice width checked"),
|
||||
) as usize;
|
||||
let payload_start = header_end + 4;
|
||||
let payload_end = payload_start + payload_len;
|
||||
if payload_end != bytes.len() {
|
||||
return Err(FrameError::Truncated);
|
||||
}
|
||||
|
||||
// WARNING: the wire format puts a 4-byte length prefix before each archived section.
|
||||
// That means the section start is not guaranteed to satisfy rkyv's aligned-access
|
||||
// requirements. The header is copied into one temporary `AlignedVec` here because
|
||||
// routing cannot proceed safely without a validated header.
|
||||
let aligned_header = align_section(
|
||||
bytes
|
||||
.get(header_start..header_end)
|
||||
.ok_or(FrameError::Truncated)?,
|
||||
);
|
||||
let archived_header = access::<ArchivedPacketHeader, Error>(&aligned_header)
|
||||
.map_err(FrameError::InvalidHeader)?;
|
||||
let header =
|
||||
deserialize::<PacketHeader, Error>(archived_header).map_err(FrameError::InvalidHeader)?;
|
||||
|
||||
Ok(ParsedFrame {
|
||||
header,
|
||||
payload_bytes: bytes
|
||||
.get(payload_start..payload_end)
|
||||
.ok_or(FrameError::Truncated)?,
|
||||
})
|
||||
RkyvCodec::decode_frame(bytes)
|
||||
}
|
||||
|
||||
/// Deserializes a standalone archived byte section.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`FrameError`] when the archived bytes are invalid for the requested type.
|
||||
pub fn deserialize_archived_bytes<A, T>(bytes: &[u8]) -> Result<T, FrameError>
|
||||
where
|
||||
A: rkyv::Portable
|
||||
@@ -204,34 +215,3 @@ fn align_section(bytes: &[u8]) -> AlignedVec {
|
||||
aligned.extend_from_slice(bytes);
|
||||
aligned
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::protocol::{HookTarget, PacketType};
|
||||
use alloc::{string::String, vec};
|
||||
|
||||
#[test]
|
||||
fn framing_roundtrip_preserves_call() {
|
||||
let header = PacketHeader {
|
||||
packet_type: PacketType::Call,
|
||||
src_path: Vec::new(),
|
||||
dst_path: vec![String::from("child")],
|
||||
dst_leaf: Some(String::from("echo")),
|
||||
hook_id: None,
|
||||
};
|
||||
let call = CallMessage {
|
||||
procedure_id: String::from("org.product.v1.echo.roundtrip"),
|
||||
data: b"ping".to_vec(),
|
||||
response_hook: Some(HookTarget {
|
||||
hook_id: 1,
|
||||
return_path: Vec::new(),
|
||||
}),
|
||||
};
|
||||
|
||||
let frame = encode_packet(&header, &call).expect("frame should encode");
|
||||
let parsed = decode_frame(&frame).expect("frame should decode");
|
||||
assert_eq!(parsed.deserialize_header(), header);
|
||||
assert_eq!(parsed.deserialize_call().expect("call should decode"), call);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
//! Required introspection payloads.
|
||||
//! Required introspection payloads for discovery.
|
||||
|
||||
use alloc::{string::String, vec::Vec};
|
||||
use rkyv::{Archive, Deserialize, Serialize};
|
||||
|
||||
+24
-1
@@ -4,14 +4,37 @@
|
||||
|
||||
pub mod codec;
|
||||
pub mod introspection;
|
||||
pub mod traits;
|
||||
pub mod tree;
|
||||
mod types;
|
||||
pub mod validation;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub use codec::{
|
||||
FrameBytes, FrameError, ParsedFrame, decode_frame, deserialize_archived_bytes, encode_packet,
|
||||
FrameBytes, FrameCodec, FrameError, ParsedFrame, RkyvCodec, deserialize_archived_bytes,
|
||||
};
|
||||
pub use introspection::{EndpointIntrospection, LeafIntrospection, LeafIntrospectionSummary};
|
||||
pub use traits::{HookStore, LeafMetadata, PacketFraming, PacketProcessor, RouteResolution};
|
||||
pub use types::{
|
||||
CallMessage, DataMessage, FaultMessage, HookTarget, PacketHeader, PacketType, ProtocolFault,
|
||||
};
|
||||
pub use validation::{ValidationError, validate_call, validate_header, validate_procedure_id};
|
||||
|
||||
pub fn encode_packet<P>(header: &PacketHeader, payload: &P) -> Result<FrameBytes, FrameError>
|
||||
where
|
||||
P: for<'a> rkyv::Serialize<
|
||||
rkyv::api::high::HighSerializer<
|
||||
rkyv::util::AlignedVec,
|
||||
rkyv::ser::allocator::ArenaHandle<'a>,
|
||||
rkyv::rancor::Error,
|
||||
>,
|
||||
>,
|
||||
{
|
||||
codec::encode_packet(header, payload)
|
||||
}
|
||||
|
||||
pub fn decode_frame(bytes: &[u8]) -> Result<ParsedFrame<'_>, FrameError> {
|
||||
codec::decode_frame(bytes)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
mod protocol;
|
||||
mod tree;
|
||||
@@ -0,0 +1,118 @@
|
||||
use alloc::{borrow::ToOwned, string::String, vec, vec::Vec};
|
||||
|
||||
use crate::protocol::{
|
||||
CallMessage, FaultMessage, FrameError, HookTarget, PacketHeader, PacketType, ProtocolFault,
|
||||
ValidationError, decode_frame, encode_packet, validate_call, validate_header,
|
||||
validate_procedure_id,
|
||||
};
|
||||
|
||||
fn path(parts: &[&str]) -> Vec<String> {
|
||||
parts.iter().map(|part| (*part).to_owned()).collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn packet_framing_roundtrip_preserves_header_and_payload() {
|
||||
let header = PacketHeader {
|
||||
packet_type: PacketType::Call,
|
||||
src_path: path(&["root", "caller"]),
|
||||
dst_path: path(&["root", "callee"]),
|
||||
dst_leaf: Some("echo".to_owned()),
|
||||
hook_id: None,
|
||||
};
|
||||
let call = CallMessage {
|
||||
procedure_id: "unshell.echo.v1.alpha.invoke".to_owned(),
|
||||
data: vec![1, 2, 3, 4],
|
||||
response_hook: Some(HookTarget {
|
||||
hook_id: 7,
|
||||
return_path: path(&["root", "caller"]),
|
||||
}),
|
||||
};
|
||||
|
||||
let frame = encode_packet(&header, &call).expect("frame should encode");
|
||||
let parsed = decode_frame(&frame).expect("frame should decode");
|
||||
|
||||
assert_eq!(parsed.header(), &header);
|
||||
assert_eq!(parsed.packet_type(), PacketType::Call);
|
||||
assert_eq!(
|
||||
parsed.deserialize_call().expect("call should deserialize"),
|
||||
call
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn header_and_call_validation_reject_invalid_combinations() {
|
||||
let invalid_header = PacketHeader {
|
||||
packet_type: PacketType::Data,
|
||||
src_path: path(&["peer"]),
|
||||
dst_path: path(&["host"]),
|
||||
dst_leaf: Some("echo".to_owned()),
|
||||
hook_id: None,
|
||||
};
|
||||
assert_eq!(
|
||||
validate_header(&invalid_header),
|
||||
Err(ValidationError::HeaderInvariant(
|
||||
"Data and Fault packets must not carry dst_leaf"
|
||||
))
|
||||
);
|
||||
|
||||
let header = PacketHeader {
|
||||
packet_type: PacketType::Call,
|
||||
src_path: path(&["caller"]),
|
||||
dst_path: path(&["callee"]),
|
||||
dst_leaf: Some("echo".to_owned()),
|
||||
hook_id: None,
|
||||
};
|
||||
let invalid_call = CallMessage {
|
||||
procedure_id: "unshell.echo.v1.alpha.invoke".to_owned(),
|
||||
data: Vec::new(),
|
||||
response_hook: Some(HookTarget {
|
||||
hook_id: 5,
|
||||
return_path: path(&["elsewhere"]),
|
||||
}),
|
||||
};
|
||||
assert_eq!(
|
||||
validate_call(&header, &invalid_call),
|
||||
Err(ValidationError::CallInvariant(
|
||||
"response_hook.return_path must equal header.src_path"
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn procedure_validation_accepts_introspection_and_rejects_bad_shapes() {
|
||||
assert_eq!(validate_procedure_id(""), Ok(()));
|
||||
assert_eq!(
|
||||
validate_procedure_id("unshell.echo.v01.alpha.invoke"),
|
||||
Err(ValidationError::ProcedureId(
|
||||
"version segment must be v followed by a positive decimal integer"
|
||||
))
|
||||
);
|
||||
assert_eq!(
|
||||
validate_procedure_id("too.short.v1"),
|
||||
Err(ValidationError::ProcedureId(
|
||||
"must contain exactly 5 segments"
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncated_frames_are_rejected() {
|
||||
let header = PacketHeader {
|
||||
packet_type: PacketType::Fault,
|
||||
src_path: path(&["src"]),
|
||||
dst_path: path(&["dst"]),
|
||||
dst_leaf: None,
|
||||
hook_id: Some(9),
|
||||
};
|
||||
let message = FaultMessage {
|
||||
fault: ProtocolFault::InternalError,
|
||||
};
|
||||
|
||||
let frame = encode_packet(&header, &message).expect("frame should encode");
|
||||
let truncated = &frame[..frame.len() - 1];
|
||||
|
||||
assert!(matches!(
|
||||
decode_frame(truncated),
|
||||
Err(FrameError::Truncated)
|
||||
));
|
||||
}
|
||||
@@ -0,0 +1,155 @@
|
||||
use alloc::{borrow::ToOwned, string::String, vec, vec::Vec};
|
||||
|
||||
use crate::protocol::tree::{
|
||||
DefaultRouteProvider, Endpoint, Ingress, LeafBehavior, LeafNode, LeafSpec, LocalEvent,
|
||||
ProtocolEndpoint, RouteDecision, RouteProvider, TreeNode,
|
||||
};
|
||||
use crate::protocol::{
|
||||
DataMessage, EndpointIntrospection, FaultMessage, PacketHeader, PacketType, ProtocolFault,
|
||||
decode_frame, deserialize_archived_bytes, encode_packet,
|
||||
};
|
||||
|
||||
fn path(parts: &[&str]) -> Vec<String> {
|
||||
parts.iter().map(|part| (*part).to_owned()).collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tree_node_paths_flatten_explicitly() {
|
||||
let tree = TreeNode::Root {
|
||||
children: vec![TreeNode::Endpoint {
|
||||
segment: "branch".to_owned(),
|
||||
leaves: vec![LeafNode {
|
||||
name: "echo".to_owned(),
|
||||
procedures: vec!["unshell.echo.v1.alpha.invoke".to_owned()],
|
||||
}],
|
||||
children: vec![TreeNode::Endpoint {
|
||||
segment: "leaf".to_owned(),
|
||||
leaves: Vec::new(),
|
||||
children: Vec::new(),
|
||||
}],
|
||||
}],
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
tree.paths(),
|
||||
vec![
|
||||
Vec::<String>::new(),
|
||||
path(&["branch"]),
|
||||
path(&["branch", "leaf"])
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn longest_prefix_routing_prefers_most_specific_child() {
|
||||
let provider = DefaultRouteProvider;
|
||||
let child_paths = vec![path(&["a"]), path(&["a", "b"]), path(&["x"])];
|
||||
|
||||
assert_eq!(
|
||||
provider.route_destination(&Vec::new(), &child_paths, true, &path(&["a", "b", "c"])),
|
||||
RouteDecision::Child(1)
|
||||
);
|
||||
assert_eq!(
|
||||
provider.route_destination(&path(&["a"]), &child_paths, true, &path(&["z"])),
|
||||
RouteDecision::Parent
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn protocol_endpoint_introspection_returns_leaf_summary() {
|
||||
let mut endpoint = ProtocolEndpoint::new(
|
||||
path(&["root"]),
|
||||
Some(Vec::new()),
|
||||
Vec::new(),
|
||||
vec![LeafSpec {
|
||||
name: "echo".to_owned(),
|
||||
procedures: vec!["unshell.echo.v1.alpha.invoke".to_owned()],
|
||||
behavior: LeafBehavior::Echo,
|
||||
}],
|
||||
);
|
||||
|
||||
let hook_id = endpoint.allocate_hook_id();
|
||||
let frame = endpoint
|
||||
.make_call(path(&["root"]), None, "", Some(hook_id), Vec::new())
|
||||
.expect("introspection call should encode");
|
||||
|
||||
let outcome = endpoint
|
||||
.receive(&Ingress::Local, frame)
|
||||
.expect("endpoint should handle introspection");
|
||||
|
||||
assert!(outcome.events.is_empty());
|
||||
assert_eq!(outcome.forwards.len(), 1);
|
||||
assert_eq!(outcome.forwards[0].0, RouteDecision::Parent);
|
||||
|
||||
let parsed = decode_frame(&outcome.forwards[0].1).expect("response should decode");
|
||||
let response = parsed
|
||||
.deserialize_data()
|
||||
.expect("response data should deserialize");
|
||||
let introspection = deserialize_archived_bytes::<
|
||||
rkyv::Archived<EndpointIntrospection>,
|
||||
EndpointIntrospection,
|
||||
>(&response.data)
|
||||
.expect("introspection payload should deserialize");
|
||||
|
||||
assert!(response.end_hook);
|
||||
assert_eq!(introspection.leaves.len(), 1);
|
||||
assert_eq!(introspection.leaves[0].leaf_name, "echo");
|
||||
assert_eq!(
|
||||
introspection.leaves[0].procedures,
|
||||
vec!["unshell.echo.v1.alpha.invoke".to_owned()]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_hook_peer_emits_local_fault_event() {
|
||||
let mut endpoint = ProtocolEndpoint::new(path(&["client"]), None, Vec::new(), Vec::new());
|
||||
let hook_id = endpoint.allocate_hook_id();
|
||||
|
||||
endpoint
|
||||
.make_call(
|
||||
path(&["server"]),
|
||||
None,
|
||||
"unshell.echo.v1.alpha.invoke",
|
||||
Some(hook_id),
|
||||
vec![1, 2, 3],
|
||||
)
|
||||
.expect("call should establish an active hook");
|
||||
|
||||
let frame = encode_packet(
|
||||
&PacketHeader {
|
||||
packet_type: PacketType::Data,
|
||||
src_path: path(&["client"]),
|
||||
dst_path: path(&["client"]),
|
||||
dst_leaf: None,
|
||||
hook_id: Some(hook_id),
|
||||
},
|
||||
&DataMessage {
|
||||
procedure_id: "unshell.echo.v1.alpha.invoke".to_owned(),
|
||||
data: vec![9],
|
||||
end_hook: false,
|
||||
},
|
||||
)
|
||||
.expect("data frame should encode");
|
||||
|
||||
let outcome = endpoint
|
||||
.receive(&Ingress::Local, frame)
|
||||
.expect("invalid peer should be handled");
|
||||
|
||||
assert!(outcome.forwards.is_empty());
|
||||
assert_eq!(outcome.events.len(), 1);
|
||||
assert!(!outcome.dropped);
|
||||
|
||||
match &outcome.events[0] {
|
||||
LocalEvent::Fault { header, message } => {
|
||||
assert_eq!(header.packet_type, PacketType::Fault);
|
||||
assert_eq!(header.hook_id, Some(hook_id));
|
||||
assert_eq!(
|
||||
message,
|
||||
&FaultMessage {
|
||||
fault: ProtocolFault::InvalidHookPeer,
|
||||
}
|
||||
);
|
||||
}
|
||||
other => panic!("expected fault event, got {other:?}"),
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,142 @@
|
||||
//! Protocol implementation traits exposed by the core crate.
|
||||
//!
|
||||
//! These traits collect the core contracts needed to plug framing, routing,
|
||||
//! hook storage, leaf metadata, and packet processing into an implementation.
|
||||
|
||||
use alloc::{string::String, vec::Vec};
|
||||
|
||||
use super::{
|
||||
FrameBytes, FrameCodec, LeafIntrospection, LeafIntrospectionSummary,
|
||||
tree::{
|
||||
ActiveHook, Endpoint, EndpointError, EndpointOutcome, HookKey, HookTable, Ingress,
|
||||
LeafNode, LeafSpec, PendingHook, RouteProvider,
|
||||
},
|
||||
};
|
||||
|
||||
/// Packet framing contract for the canonical wire format.
|
||||
pub trait PacketFraming: FrameCodec {}
|
||||
|
||||
impl<T> PacketFraming for T where T: FrameCodec + ?Sized {}
|
||||
|
||||
/// Route resolution contract for endpoint path delivery.
|
||||
pub trait RouteResolution: RouteProvider {}
|
||||
|
||||
impl<T> RouteResolution for T where T: RouteProvider + ?Sized {}
|
||||
|
||||
/// Hook storage contract for pending and active protocol flows.
|
||||
pub trait HookStore {
|
||||
fn allocate_hook_id(&self, return_path: &[String]) -> u64;
|
||||
fn insert_pending(&mut self, pending: PendingHook);
|
||||
fn insert_active(&mut self, active: ActiveHook);
|
||||
fn activate_pending(&mut self, key: &HookKey, peer_path: Vec<String>) -> Option<()>;
|
||||
fn remove_pending(&mut self, key: &HookKey) -> Option<PendingHook>;
|
||||
fn remove_active(&mut self, key: &HookKey) -> Option<ActiveHook>;
|
||||
fn pending(&self, key: &HookKey) -> Option<&PendingHook>;
|
||||
fn active(&self, key: &HookKey) -> Option<&ActiveHook>;
|
||||
fn active_mut(&mut self, key: &HookKey) -> Option<&mut ActiveHook>;
|
||||
}
|
||||
|
||||
impl HookStore for HookTable {
|
||||
fn allocate_hook_id(&self, return_path: &[String]) -> u64 {
|
||||
HookTable::allocate_hook_id(self, return_path)
|
||||
}
|
||||
|
||||
fn insert_pending(&mut self, pending: PendingHook) {
|
||||
HookTable::insert_pending(self, pending);
|
||||
}
|
||||
|
||||
fn insert_active(&mut self, active: ActiveHook) {
|
||||
HookTable::insert_active(self, active);
|
||||
}
|
||||
|
||||
fn activate_pending(&mut self, key: &HookKey, peer_path: Vec<String>) -> Option<()> {
|
||||
HookTable::activate_pending(self, key, peer_path)
|
||||
}
|
||||
|
||||
fn remove_pending(&mut self, key: &HookKey) -> Option<PendingHook> {
|
||||
HookTable::remove_pending(self, key)
|
||||
}
|
||||
|
||||
fn remove_active(&mut self, key: &HookKey) -> Option<ActiveHook> {
|
||||
HookTable::remove_active(self, key)
|
||||
}
|
||||
|
||||
fn pending(&self, key: &HookKey) -> Option<&PendingHook> {
|
||||
HookTable::pending(self, key)
|
||||
}
|
||||
|
||||
fn active(&self, key: &HookKey) -> Option<&ActiveHook> {
|
||||
HookTable::active(self, key)
|
||||
}
|
||||
|
||||
fn active_mut(&mut self, key: &HookKey) -> Option<&mut ActiveHook> {
|
||||
HookTable::active_mut(self, key)
|
||||
}
|
||||
}
|
||||
|
||||
/// Leaf metadata contract used for protocol discovery payloads.
|
||||
pub trait LeafMetadata {
|
||||
fn leaf_name(&self) -> &str;
|
||||
fn procedures(&self) -> &[String];
|
||||
|
||||
fn summary(&self) -> LeafIntrospectionSummary {
|
||||
LeafIntrospectionSummary {
|
||||
leaf_name: self.leaf_name().into(),
|
||||
procedures: self.procedures().to_vec(),
|
||||
}
|
||||
}
|
||||
|
||||
fn introspection(&self) -> LeafIntrospection {
|
||||
LeafIntrospection {
|
||||
leaf_name: self.leaf_name().into(),
|
||||
procedures: self.procedures().to_vec(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LeafMetadata for LeafSpec {
|
||||
fn leaf_name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn procedures(&self) -> &[String] {
|
||||
&self.procedures
|
||||
}
|
||||
}
|
||||
|
||||
impl LeafMetadata for LeafNode {
|
||||
fn leaf_name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn procedures(&self) -> &[String] {
|
||||
&self.procedures
|
||||
}
|
||||
}
|
||||
|
||||
/// Packet processor and local runtime contract for framed protocol traffic.
|
||||
pub trait PacketProcessor {
|
||||
fn path(&self) -> &[String];
|
||||
fn receive(
|
||||
&mut self,
|
||||
ingress: &Ingress,
|
||||
frame: FrameBytes,
|
||||
) -> Result<EndpointOutcome, EndpointError>;
|
||||
}
|
||||
|
||||
impl<T> PacketProcessor for T
|
||||
where
|
||||
T: Endpoint + ?Sized,
|
||||
{
|
||||
fn path(&self) -> &[String] {
|
||||
Endpoint::path(self)
|
||||
}
|
||||
|
||||
fn receive(
|
||||
&mut self,
|
||||
ingress: &Ingress,
|
||||
frame: FrameBytes,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
Endpoint::receive(self, ingress, frame)
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
//! Minimal endpoint runtime for protocol tests.
|
||||
//! Endpoint runtime and traits.
|
||||
|
||||
use alloc::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
@@ -9,36 +9,31 @@ use alloc::{
|
||||
use core::fmt;
|
||||
use rkyv::{rancor::Error as RkyvError, to_bytes};
|
||||
|
||||
use crate::{
|
||||
protocol::{
|
||||
CallMessage, DataMessage, EndpointIntrospection, FaultMessage, FrameBytes, FrameError,
|
||||
HookTarget, LeafIntrospection, LeafIntrospectionSummary, PacketHeader, PacketType,
|
||||
ProtocolFault, decode_frame, encode_packet, introspection::INTROSPECTION_PROCEDURE_ID,
|
||||
validate_call, validate_header, validate_procedure_id,
|
||||
},
|
||||
tree::{ActiveHook, HookKey, HookTable, PendingHook, RouteDecision, route_destination},
|
||||
use crate::protocol::{
|
||||
CallMessage, DataMessage, EndpointIntrospection, FaultMessage, FrameBytes, FrameError,
|
||||
HookTarget, LeafIntrospection, LeafIntrospectionSummary, PacketHeader, PacketType,
|
||||
ProtocolFault, ValidationError, decode_frame, encode_packet,
|
||||
introspection::INTROSPECTION_PROCEDURE_ID, validate_call, validate_header,
|
||||
validate_procedure_id,
|
||||
};
|
||||
|
||||
/// Local connection state defined by the protocol.
|
||||
use super::{ActiveHook, HookKey, HookTable, PendingHook, RouteDecision, route_destination};
|
||||
|
||||
/// Local connection state.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ConnectionState {
|
||||
/// Connected but not routable.
|
||||
Unregistered,
|
||||
/// Admitted into local routing.
|
||||
Registered,
|
||||
}
|
||||
|
||||
/// Registered child route.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ChildRoute {
|
||||
/// Child endpoint path.
|
||||
pub path: Vec<String>,
|
||||
/// Local connection state.
|
||||
pub state: ConnectionState,
|
||||
}
|
||||
|
||||
impl ChildRoute {
|
||||
/// Creates a registered child route.
|
||||
pub fn registered(path: Vec<String>) -> Self {
|
||||
Self {
|
||||
path,
|
||||
@@ -47,73 +42,58 @@ impl ChildRoute {
|
||||
}
|
||||
}
|
||||
|
||||
/// Basic leaf behavior used by the test protocol runtime.
|
||||
/// Leaf behavior for test runtime.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum LeafBehavior {
|
||||
/// Echoes the call data back in one `Data` packet.
|
||||
Echo,
|
||||
}
|
||||
|
||||
/// Static leaf description.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct LeafSpec {
|
||||
/// Local leaf name.
|
||||
pub name: String,
|
||||
/// Supported procedures.
|
||||
pub procedures: Vec<String>,
|
||||
/// Test behavior.
|
||||
pub behavior: LeafBehavior,
|
||||
}
|
||||
|
||||
/// How a packet arrived at the endpoint.
|
||||
/// Arrival side.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum Ingress {
|
||||
/// From the direct parent.
|
||||
Parent,
|
||||
/// From a direct child path.
|
||||
Child(Vec<String>),
|
||||
/// Originated locally.
|
||||
Local,
|
||||
}
|
||||
|
||||
/// Locally delivered events produced by protocol processing.
|
||||
/// Local events.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum LocalEvent {
|
||||
/// A supported local call with no response hook.
|
||||
Call {
|
||||
header: PacketHeader,
|
||||
message: CallMessage,
|
||||
},
|
||||
/// Locally delivered data.
|
||||
Data {
|
||||
header: PacketHeader,
|
||||
message: DataMessage,
|
||||
},
|
||||
/// Locally delivered or synthesized fault.
|
||||
Fault {
|
||||
header: PacketHeader,
|
||||
message: FaultMessage,
|
||||
},
|
||||
}
|
||||
|
||||
/// Output from processing one frame.
|
||||
/// Processing outcome.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct EndpointOutcome {
|
||||
/// Frames to forward. The frame bytes are moved, not cloned.
|
||||
pub forwards: Vec<(RouteDecision, FrameBytes)>,
|
||||
/// Events delivered locally.
|
||||
pub events: Vec<LocalEvent>,
|
||||
/// Whether the packet was silently dropped.
|
||||
pub dropped: bool,
|
||||
}
|
||||
|
||||
/// Endpoint processing failure.
|
||||
/// Processing error.
|
||||
#[derive(Debug)]
|
||||
pub enum EndpointError {
|
||||
/// Frame parsing failed.
|
||||
Frame(FrameError),
|
||||
/// Validation failed.
|
||||
Validation(crate::protocol::ValidationError),
|
||||
Validation(ValidationError),
|
||||
}
|
||||
|
||||
impl fmt::Display for EndpointError {
|
||||
@@ -125,8 +105,7 @@ impl fmt::Display for EndpointError {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl std::error::Error for EndpointError {}
|
||||
impl core::error::Error for EndpointError {}
|
||||
|
||||
impl From<FrameError> for EndpointError {
|
||||
fn from(value: FrameError) -> Self {
|
||||
@@ -134,15 +113,25 @@ impl From<FrameError> for EndpointError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::protocol::ValidationError> for EndpointError {
|
||||
fn from(value: crate::protocol::ValidationError) -> Self {
|
||||
impl From<ValidationError> for EndpointError {
|
||||
fn from(value: ValidationError) -> Self {
|
||||
Self::Validation(value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Local endpoint model suitable for tests and later integration work.
|
||||
/// Core trait for a protocol endpoint.
|
||||
pub trait Endpoint {
|
||||
fn path(&self) -> &[String];
|
||||
fn receive(
|
||||
&mut self,
|
||||
ingress: &Ingress,
|
||||
frame: FrameBytes,
|
||||
) -> Result<EndpointOutcome, EndpointError>;
|
||||
}
|
||||
|
||||
/// Default endpoint implementation.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Endpoint {
|
||||
pub struct ProtocolEndpoint {
|
||||
path: Vec<String>,
|
||||
parent_path: Option<Vec<String>>,
|
||||
children: Vec<ChildRoute>,
|
||||
@@ -151,8 +140,7 @@ pub struct Endpoint {
|
||||
hooks: HookTable,
|
||||
}
|
||||
|
||||
impl Endpoint {
|
||||
/// Creates an endpoint with explicit path, parent, children, and leaves.
|
||||
impl ProtocolEndpoint {
|
||||
pub fn new(
|
||||
path: Vec<String>,
|
||||
parent_path: Option<Vec<String>>,
|
||||
@@ -172,21 +160,6 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the local endpoint path.
|
||||
pub fn path(&self) -> &[String] {
|
||||
&self.path
|
||||
}
|
||||
|
||||
/// Returns the hook table for assertions.
|
||||
pub fn hooks(&self) -> &HookTable {
|
||||
&self.hooks
|
||||
}
|
||||
|
||||
/// Registers an endpoint-level procedure.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`EndpointError`] when the procedure id is invalid.
|
||||
pub fn add_endpoint_procedure(
|
||||
&mut self,
|
||||
procedure_id: impl Into<String>,
|
||||
@@ -197,16 +170,10 @@ impl Endpoint {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Allocates a new local hook id.
|
||||
pub fn allocate_hook_id(&self) -> u64 {
|
||||
self.hooks.allocate_hook_id(&self.path)
|
||||
}
|
||||
|
||||
/// Creates an outbound `Call` frame and registers host-side hook state when needed.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`EndpointError`] when validation or framing fails.
|
||||
pub fn make_call(
|
||||
&mut self,
|
||||
dst_path: Vec<String>,
|
||||
@@ -250,11 +217,6 @@ impl Endpoint {
|
||||
Ok(encode_packet(&header, &call)?)
|
||||
}
|
||||
|
||||
/// Creates an outbound `Data` frame.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`EndpointError`] when validation or framing fails.
|
||||
pub fn make_data(
|
||||
&self,
|
||||
dst_path: Vec<String>,
|
||||
@@ -281,150 +243,6 @@ impl Endpoint {
|
||||
Ok(encode_packet(&header, &message)?)
|
||||
}
|
||||
|
||||
/// Processes one framed packet.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`EndpointError`] when frame decoding or validation fails.
|
||||
pub fn receive(
|
||||
&mut self,
|
||||
ingress: &Ingress,
|
||||
frame: FrameBytes,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
enum OwnedPayload {
|
||||
Call(PacketHeader, CallMessage),
|
||||
Data(PacketHeader, DataMessage),
|
||||
Fault(PacketHeader, FaultMessage),
|
||||
}
|
||||
|
||||
let owned = {
|
||||
let parsed = decode_frame(&frame)?;
|
||||
let header = parsed.deserialize_header();
|
||||
validate_header(&header)?;
|
||||
match header.packet_type {
|
||||
PacketType::Call => OwnedPayload::Call(header, parsed.deserialize_call()?),
|
||||
PacketType::Data => OwnedPayload::Data(header, parsed.deserialize_data()?),
|
||||
PacketType::Fault => OwnedPayload::Fault(header, parsed.deserialize_fault()?),
|
||||
}
|
||||
};
|
||||
|
||||
let src_path = match &owned {
|
||||
OwnedPayload::Call(header, _) => &header.src_path,
|
||||
OwnedPayload::Data(header, _) => &header.src_path,
|
||||
OwnedPayload::Fault(header, _) => &header.src_path,
|
||||
};
|
||||
|
||||
if !self.valid_source_for_ingress(ingress, src_path) {
|
||||
return Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
});
|
||||
}
|
||||
|
||||
match owned {
|
||||
OwnedPayload::Call(header, message) => {
|
||||
self.receive_call(ingress, frame, header, message)
|
||||
}
|
||||
OwnedPayload::Data(header, message) => self.receive_data(header, message),
|
||||
OwnedPayload::Fault(header, message) => self.receive_fault(header, message),
|
||||
}
|
||||
}
|
||||
|
||||
fn receive_call(
|
||||
&mut self,
|
||||
ingress: &Ingress,
|
||||
frame: FrameBytes,
|
||||
header: PacketHeader,
|
||||
message: CallMessage,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
if !matches!(ingress, Ingress::Parent | Ingress::Local) {
|
||||
return Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
});
|
||||
}
|
||||
|
||||
validate_call(&header, &message)?;
|
||||
match self.decide_route(&header.dst_path) {
|
||||
RouteDecision::Child(index) => Ok(EndpointOutcome {
|
||||
forwards: vec![(RouteDecision::Child(index), frame)],
|
||||
..EndpointOutcome::default()
|
||||
}),
|
||||
RouteDecision::Parent => Ok(EndpointOutcome {
|
||||
forwards: vec![(RouteDecision::Parent, frame)],
|
||||
..EndpointOutcome::default()
|
||||
}),
|
||||
RouteDecision::Drop => Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
}),
|
||||
RouteDecision::Local => self.handle_local_call(header, message),
|
||||
}
|
||||
}
|
||||
|
||||
fn receive_data(
|
||||
&mut self,
|
||||
header: PacketHeader,
|
||||
message: DataMessage,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
match self.decide_route(&header.dst_path) {
|
||||
RouteDecision::Child(_) | RouteDecision::Parent => Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
}),
|
||||
RouteDecision::Drop => Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
}),
|
||||
RouteDecision::Local => self.handle_local_data(header, message),
|
||||
}
|
||||
}
|
||||
|
||||
fn receive_fault(
|
||||
&mut self,
|
||||
header: PacketHeader,
|
||||
message: FaultMessage,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
match self.decide_route(&header.dst_path) {
|
||||
RouteDecision::Child(_) | RouteDecision::Parent => Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
}),
|
||||
RouteDecision::Drop => Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
}),
|
||||
RouteDecision::Local => {
|
||||
let key = HookKey::new(
|
||||
self.path.clone(),
|
||||
header.hook_id.expect("validated hook id"),
|
||||
);
|
||||
let matches_active = self
|
||||
.hooks
|
||||
.active(&key)
|
||||
.map(|active| active.peer_path == header.src_path)
|
||||
.unwrap_or(false);
|
||||
let matches_pending = self
|
||||
.hooks
|
||||
.pending(&key)
|
||||
.map(|pending| pending.caller_src_path == header.src_path)
|
||||
.unwrap_or(false);
|
||||
if !(matches_active || matches_pending) {
|
||||
return Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
});
|
||||
}
|
||||
self.hooks.remove_active(&key);
|
||||
self.hooks.remove_pending(&key);
|
||||
Ok(EndpointOutcome {
|
||||
events: vec![LocalEvent::Fault { header, message }],
|
||||
..EndpointOutcome::default()
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_local_call(
|
||||
&mut self,
|
||||
header: PacketHeader,
|
||||
@@ -453,11 +271,7 @@ impl Endpoint {
|
||||
Some(leaf_name) => self
|
||||
.leaves
|
||||
.get(leaf_name)
|
||||
.map(|leaf| {
|
||||
leaf.procedures
|
||||
.iter()
|
||||
.any(|candidate| candidate == &message.procedure_id)
|
||||
})
|
||||
.map(|leaf| leaf.procedures.iter().any(|p| p == &message.procedure_id))
|
||||
.unwrap_or(false),
|
||||
None => self.endpoint_procedures.contains(&message.procedure_id),
|
||||
};
|
||||
@@ -466,7 +280,7 @@ impl Endpoint {
|
||||
let fault = if header
|
||||
.dst_leaf
|
||||
.as_ref()
|
||||
.is_some_and(|leaf_name| !self.leaves.contains_key(leaf_name))
|
||||
.is_some_and(|name| !self.leaves.contains_key(name))
|
||||
{
|
||||
ProtocolFault::UnknownLeaf
|
||||
} else {
|
||||
@@ -482,15 +296,10 @@ impl Endpoint {
|
||||
match header
|
||||
.dst_leaf
|
||||
.as_ref()
|
||||
.and_then(|leaf_name| self.leaves.get(leaf_name))
|
||||
.and_then(|name| self.leaves.get(name))
|
||||
{
|
||||
Some(LeafSpec {
|
||||
behavior: LeafBehavior::Echo,
|
||||
..
|
||||
}) if key.is_some() => {
|
||||
let hook = message
|
||||
.response_hook
|
||||
.expect("key and hook are synchronized");
|
||||
Some(leaf) if leaf.behavior == LeafBehavior::Echo && key.is_some() => {
|
||||
let hook = message.response_hook.expect("synchronized");
|
||||
let response = DataMessage {
|
||||
procedure_id: message.procedure_id.clone(),
|
||||
data: message.data,
|
||||
@@ -535,13 +344,11 @@ impl Endpoint {
|
||||
let Some(leaf) = self.leaves.get(leaf_name) else {
|
||||
return self.emit_fault_if_possible(Some(key), ProtocolFault::UnknownLeaf);
|
||||
};
|
||||
// WARNING: introspection nests one archived payload inside `DataMessage.data`.
|
||||
// This inner allocation is required because the protocol defines `data` as opaque bytes.
|
||||
to_bytes::<RkyvError>(&LeafIntrospection {
|
||||
leaf_name: leaf_name.clone(),
|
||||
procedures: leaf.procedures.clone(),
|
||||
})
|
||||
.expect("leaf introspection should serialize")
|
||||
.expect("serialize")
|
||||
.to_vec()
|
||||
} else {
|
||||
to_bytes::<RkyvError>(&EndpointIntrospection {
|
||||
@@ -554,7 +361,7 @@ impl Endpoint {
|
||||
})
|
||||
.collect(),
|
||||
})
|
||||
.expect("endpoint introspection should serialize")
|
||||
.expect("serialize")
|
||||
.to_vec()
|
||||
};
|
||||
|
||||
@@ -583,21 +390,13 @@ impl Endpoint {
|
||||
header: PacketHeader,
|
||||
message: DataMessage,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
let key = HookKey::new(
|
||||
self.path.clone(),
|
||||
header.hook_id.expect("validated hook id"),
|
||||
);
|
||||
let key = HookKey::new(self.path.clone(), header.hook_id.expect("validated"));
|
||||
|
||||
if self.hooks.active(&key).is_none() {
|
||||
let pending_matches = self
|
||||
.hooks
|
||||
.pending(&key)
|
||||
.map(|pending| {
|
||||
pending.caller_src_path == header.src_path
|
||||
&& pending.procedure_id == message.procedure_id
|
||||
})
|
||||
.unwrap_or(false);
|
||||
if pending_matches {
|
||||
let matches = self.hooks.pending(&key).is_some_and(|p| {
|
||||
p.caller_src_path == header.src_path && p.procedure_id == message.procedure_id
|
||||
});
|
||||
if matches {
|
||||
self.hooks.activate_pending(&key, header.src_path.clone());
|
||||
}
|
||||
}
|
||||
@@ -632,13 +431,40 @@ impl Endpoint {
|
||||
if message.end_hook {
|
||||
self.hooks.remove_active(&key);
|
||||
}
|
||||
|
||||
Ok(EndpointOutcome {
|
||||
events: vec![LocalEvent::Data { header, message }],
|
||||
..EndpointOutcome::default()
|
||||
})
|
||||
}
|
||||
|
||||
fn handle_local_fault(
|
||||
&mut self,
|
||||
header: PacketHeader,
|
||||
message: FaultMessage,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
let key = HookKey::new(self.path.clone(), header.hook_id.expect("validated"));
|
||||
let matches = self
|
||||
.hooks
|
||||
.active(&key)
|
||||
.is_some_and(|a| a.peer_path == header.src_path)
|
||||
|| self
|
||||
.hooks
|
||||
.pending(&key)
|
||||
.is_some_and(|p| p.caller_src_path == header.src_path);
|
||||
if !matches {
|
||||
return Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
});
|
||||
}
|
||||
self.hooks.remove_active(&key);
|
||||
self.hooks.remove_pending(&key);
|
||||
Ok(EndpointOutcome {
|
||||
events: vec![LocalEvent::Fault { header, message }],
|
||||
..EndpointOutcome::default()
|
||||
})
|
||||
}
|
||||
|
||||
fn emit_fault_if_possible(
|
||||
&mut self,
|
||||
key: Option<HookKey>,
|
||||
@@ -659,8 +485,7 @@ impl Endpoint {
|
||||
dst_leaf: None,
|
||||
hook_id: Some(key.hook_id),
|
||||
};
|
||||
let message = FaultMessage { fault };
|
||||
let frame = encode_packet(&header, &message)?;
|
||||
let frame = encode_packet(&header, &FaultMessage { fault })?;
|
||||
Ok(EndpointOutcome {
|
||||
forwards: vec![(RouteDecision::Parent, frame)],
|
||||
..EndpointOutcome::default()
|
||||
@@ -671,8 +496,8 @@ impl Endpoint {
|
||||
let child_paths: Vec<Vec<String>> = self
|
||||
.children
|
||||
.iter()
|
||||
.filter(|child| child.state == ConnectionState::Registered)
|
||||
.map(|child| child.path.clone())
|
||||
.filter(|c| c.state == ConnectionState::Registered)
|
||||
.map(|c| c.path.clone())
|
||||
.collect();
|
||||
route_destination(
|
||||
&self.path,
|
||||
@@ -687,107 +512,79 @@ impl Endpoint {
|
||||
Ingress::Parent => self
|
||||
.parent_path
|
||||
.as_ref()
|
||||
.map_or(self.path.is_empty(), |path| path == src_path),
|
||||
.map_or(self.path.is_empty(), |p| p == src_path),
|
||||
Ingress::Child(path) => path == src_path,
|
||||
Ingress::Local => src_path == self.path,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::protocol::introspection::ArchivedEndpointIntrospection;
|
||||
use crate::protocol::{HookTarget, deserialize_archived_bytes};
|
||||
impl Endpoint for ProtocolEndpoint {
|
||||
fn path(&self) -> &[String] {
|
||||
&self.path
|
||||
}
|
||||
|
||||
fn echo_leaf() -> LeafSpec {
|
||||
LeafSpec {
|
||||
name: String::from("echo"),
|
||||
procedures: vec![String::from("org.product.v1.echo.roundtrip")],
|
||||
behavior: LeafBehavior::Echo,
|
||||
fn receive(
|
||||
&mut self,
|
||||
ingress: &Ingress,
|
||||
frame: FrameBytes,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
let parsed = decode_frame(&frame)?;
|
||||
let header = parsed.deserialize_header();
|
||||
validate_header(&header)?;
|
||||
if !self.valid_source_for_ingress(ingress, &header.src_path) {
|
||||
return Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
});
|
||||
}
|
||||
|
||||
match header.packet_type {
|
||||
PacketType::Call => {
|
||||
let message = parsed.deserialize_call()?;
|
||||
if !matches!(ingress, Ingress::Parent | Ingress::Local) {
|
||||
return Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
});
|
||||
}
|
||||
validate_call(&header, &message)?;
|
||||
match self.decide_route(&header.dst_path) {
|
||||
RouteDecision::Child(idx) => Ok(EndpointOutcome {
|
||||
forwards: vec![(RouteDecision::Child(idx), frame)],
|
||||
..EndpointOutcome::default()
|
||||
}),
|
||||
RouteDecision::Parent => Ok(EndpointOutcome {
|
||||
forwards: vec![(RouteDecision::Parent, frame)],
|
||||
..EndpointOutcome::default()
|
||||
}),
|
||||
RouteDecision::Drop => Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
}),
|
||||
RouteDecision::Local => self.handle_local_call(header, message),
|
||||
}
|
||||
}
|
||||
PacketType::Data => {
|
||||
let message = parsed.deserialize_data()?;
|
||||
match self.decide_route(&header.dst_path) {
|
||||
RouteDecision::Local => self.handle_local_data(header, message),
|
||||
_ => Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
}),
|
||||
}
|
||||
}
|
||||
PacketType::Fault => {
|
||||
let message = parsed.deserialize_fault()?;
|
||||
match self.decide_route(&header.dst_path) {
|
||||
RouteDecision::Local => self.handle_local_fault(header, message),
|
||||
_ => Ok(EndpointOutcome {
|
||||
dropped: true,
|
||||
..EndpointOutcome::default()
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn introspection_returns_payload_and_clears_hook() {
|
||||
let mut child = Endpoint::new(
|
||||
vec![String::from("child")],
|
||||
Some(Vec::new()),
|
||||
Vec::new(),
|
||||
vec![echo_leaf()],
|
||||
);
|
||||
let header = PacketHeader {
|
||||
packet_type: PacketType::Call,
|
||||
src_path: Vec::new(),
|
||||
dst_path: vec![String::from("child")],
|
||||
dst_leaf: None,
|
||||
hook_id: None,
|
||||
};
|
||||
let call = CallMessage {
|
||||
procedure_id: String::new(),
|
||||
data: Vec::new(),
|
||||
response_hook: Some(HookTarget {
|
||||
hook_id: 1,
|
||||
return_path: Vec::new(),
|
||||
}),
|
||||
};
|
||||
|
||||
let outcome = child
|
||||
.receive(
|
||||
&Ingress::Parent,
|
||||
encode_packet(&header, &call).expect("frame"),
|
||||
)
|
||||
.expect("receive should succeed");
|
||||
let (_, frame) = outcome
|
||||
.forwards
|
||||
.first()
|
||||
.expect("forwarded frame should exist");
|
||||
let parsed = decode_frame(frame).expect("data frame");
|
||||
let data = parsed.deserialize_data().expect("data payload");
|
||||
let payload = deserialize_archived_bytes::<
|
||||
ArchivedEndpointIntrospection,
|
||||
EndpointIntrospection,
|
||||
>(&data.data)
|
||||
.expect("introspection payload");
|
||||
assert_eq!(payload.leaves.len(), 1);
|
||||
assert_eq!(child.hooks().active_len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_peer_generates_local_fault_event() {
|
||||
let mut root = Endpoint::new(Vec::new(), None, Vec::new(), Vec::new());
|
||||
let _call = root
|
||||
.make_call(
|
||||
vec![String::from("child")],
|
||||
None,
|
||||
String::from("org.product.v1.echo.roundtrip"),
|
||||
Some(7),
|
||||
Vec::new(),
|
||||
)
|
||||
.expect("call should encode");
|
||||
let frame = root
|
||||
.make_data(
|
||||
Vec::new(),
|
||||
7,
|
||||
String::from("org.product.v1.echo.roundtrip"),
|
||||
b"bad".to_vec(),
|
||||
false,
|
||||
)
|
||||
.expect("data should encode");
|
||||
let parsed = decode_frame(&frame).expect("frame should decode");
|
||||
let mut header = parsed.deserialize_header();
|
||||
header.src_path = vec![String::from("other")];
|
||||
let bad_frame = encode_packet(
|
||||
&header,
|
||||
&parsed.deserialize_data().expect("data should decode"),
|
||||
)
|
||||
.expect("bad frame should encode");
|
||||
let outcome = root
|
||||
.receive(&Ingress::Child(vec![String::from("other")]), bad_frame)
|
||||
.expect("receive should work");
|
||||
assert!(matches!(
|
||||
outcome.events.first(),
|
||||
Some(LocalEvent::Fault { .. })
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -12,7 +12,6 @@ pub struct HookKey {
|
||||
}
|
||||
|
||||
impl HookKey {
|
||||
/// Creates a new hook key.
|
||||
pub fn new(return_path: Vec<String>, hook_id: u64) -> Self {
|
||||
Self {
|
||||
return_path,
|
||||
@@ -24,32 +23,21 @@ impl HookKey {
|
||||
/// Pending hook context created by a received call.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct PendingHook {
|
||||
/// Original caller path.
|
||||
pub caller_src_path: Vec<String>,
|
||||
/// Hook host path.
|
||||
pub return_path: Vec<String>,
|
||||
/// Hook identifier.
|
||||
pub hook_id: u64,
|
||||
/// Procedure anchored to the call.
|
||||
pub procedure_id: String,
|
||||
/// Destination leaf from the call.
|
||||
pub dst_leaf: Option<String>,
|
||||
}
|
||||
|
||||
/// Active hook context used for ordinary data traffic.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ActiveHook {
|
||||
/// Path of the endpoint hosting the hook.
|
||||
pub return_path: Vec<String>,
|
||||
/// Hook identifier.
|
||||
pub hook_id: u64,
|
||||
/// Expected direct peer for hook traffic.
|
||||
pub peer_path: Vec<String>,
|
||||
/// Procedure bound to the hook.
|
||||
pub procedure_id: String,
|
||||
/// Original destination leaf.
|
||||
pub dst_leaf: Option<String>,
|
||||
/// Whether the peer has indicated completion.
|
||||
pub peer_finished: bool,
|
||||
}
|
||||
|
||||
@@ -61,7 +49,6 @@ pub struct HookTable {
|
||||
}
|
||||
|
||||
impl HookTable {
|
||||
/// Allocates the lowest inactive hook id for a return path.
|
||||
pub fn allocate_hook_id(&self, return_path: &[String]) -> u64 {
|
||||
let mut hook_id = 0u64;
|
||||
loop {
|
||||
@@ -73,22 +60,18 @@ impl HookTable {
|
||||
}
|
||||
}
|
||||
|
||||
/// Inserts pending hook state.
|
||||
pub fn insert_pending(&mut self, pending: PendingHook) {
|
||||
// WARNING: hook tables intentionally own their path and procedure strings.
|
||||
// Hook state must outlive any individual frame buffer, so borrowing framed
|
||||
// transport memory here would be unsound.
|
||||
// Hook state must outlive any individual frame buffer.
|
||||
let key = HookKey::new(pending.return_path.clone(), pending.hook_id);
|
||||
self.pending.insert(key, pending);
|
||||
}
|
||||
|
||||
/// Inserts active hook state.
|
||||
pub fn insert_active(&mut self, active: ActiveHook) {
|
||||
let key = HookKey::new(active.return_path.clone(), active.hook_id);
|
||||
self.active.insert(key, active);
|
||||
}
|
||||
|
||||
/// Promotes pending hook state to active state.
|
||||
pub fn activate_pending(&mut self, key: &HookKey, peer_path: Vec<String>) -> Option<()> {
|
||||
let pending = self.pending.remove(key)?;
|
||||
self.active.insert(
|
||||
@@ -105,37 +88,30 @@ impl HookTable {
|
||||
Some(())
|
||||
}
|
||||
|
||||
/// Removes pending state.
|
||||
pub fn remove_pending(&mut self, key: &HookKey) -> Option<PendingHook> {
|
||||
self.pending.remove(key)
|
||||
}
|
||||
|
||||
/// Removes active state.
|
||||
pub fn remove_active(&mut self, key: &HookKey) -> Option<ActiveHook> {
|
||||
self.active.remove(key)
|
||||
}
|
||||
|
||||
/// Returns pending state.
|
||||
pub fn pending(&self, key: &HookKey) -> Option<&PendingHook> {
|
||||
self.pending.get(key)
|
||||
}
|
||||
|
||||
/// Returns active state.
|
||||
pub fn active(&self, key: &HookKey) -> Option<&ActiveHook> {
|
||||
self.active.get(key)
|
||||
}
|
||||
|
||||
/// Returns mutable active state.
|
||||
pub fn active_mut(&mut self, key: &HookKey) -> Option<&mut ActiveHook> {
|
||||
self.active.get_mut(key)
|
||||
}
|
||||
|
||||
/// Returns the number of pending hooks.
|
||||
pub fn pending_len(&self) -> usize {
|
||||
self.pending.len()
|
||||
}
|
||||
|
||||
/// Returns the number of active hooks.
|
||||
pub fn active_len(&self) -> usize {
|
||||
self.active.len()
|
||||
}
|
||||
@@ -6,7 +6,10 @@ mod routing;
|
||||
|
||||
pub use endpoint::{
|
||||
ChildRoute, ConnectionState, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafBehavior,
|
||||
LeafSpec, LocalEvent,
|
||||
LeafSpec, LocalEvent, ProtocolEndpoint,
|
||||
};
|
||||
pub use hook::{ActiveHook, HookKey, HookTable, PendingHook};
|
||||
pub use routing::{LeafNode, RouteDecision, TreeNode, is_prefix, route_destination};
|
||||
pub use routing::{
|
||||
DefaultRouteProvider, LeafNode, RouteDecision, RouteProvider, TreeNode, is_prefix,
|
||||
route_destination,
|
||||
};
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
use alloc::{string::String, vec::Vec};
|
||||
|
||||
/// Explicit test tree declaration.
|
||||
/// Explicit test tree declaration used for configuration.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum TreeNode {
|
||||
/// The tree root.
|
||||
@@ -67,7 +67,7 @@ pub enum RouteDecision {
|
||||
Drop,
|
||||
}
|
||||
|
||||
/// Returns `true` if `prefix` is a prefix of `path`.
|
||||
/// Returns `true` if `prefix` is a path prefix of `path`.
|
||||
pub fn is_prefix(prefix: &[String], path: &[String]) -> bool {
|
||||
prefix.len() <= path.len()
|
||||
&& prefix
|
||||
@@ -76,30 +76,56 @@ pub fn is_prefix(prefix: &[String], path: &[String]) -> bool {
|
||||
.all(|(left, right)| left == right)
|
||||
}
|
||||
|
||||
/// Routes a destination path using the protocol's longest-prefix rule.
|
||||
/// Trait for resolving a destination path to a routing decision.
|
||||
pub trait RouteProvider {
|
||||
/// Computes the routing decision for a destination path.
|
||||
fn route_destination(
|
||||
&self,
|
||||
local_path: &[String],
|
||||
child_paths: &[Vec<String>],
|
||||
has_parent: bool,
|
||||
dst_path: &[String],
|
||||
) -> RouteDecision;
|
||||
}
|
||||
|
||||
/// Default routing implementation using the protocol's longest-prefix rule.
|
||||
pub struct DefaultRouteProvider;
|
||||
|
||||
impl RouteProvider for DefaultRouteProvider {
|
||||
fn route_destination(
|
||||
&self,
|
||||
local_path: &[String],
|
||||
child_paths: &[Vec<String>],
|
||||
has_parent: bool,
|
||||
dst_path: &[String],
|
||||
) -> RouteDecision {
|
||||
let child = child_paths
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, child_path)| is_prefix(child_path, dst_path))
|
||||
.max_by_key(|(_, child_path)| child_path.len())
|
||||
.map(|(index, _)| index);
|
||||
|
||||
if let Some(index) = child {
|
||||
return RouteDecision::Child(index);
|
||||
}
|
||||
if local_path == dst_path {
|
||||
return RouteDecision::Local;
|
||||
}
|
||||
if has_parent && !is_prefix(local_path, dst_path) {
|
||||
return RouteDecision::Parent;
|
||||
}
|
||||
RouteDecision::Drop
|
||||
}
|
||||
}
|
||||
|
||||
pub fn route_destination(
|
||||
local_path: &[String],
|
||||
child_paths: &[Vec<String>],
|
||||
has_parent: bool,
|
||||
dst_path: &[String],
|
||||
) -> RouteDecision {
|
||||
let child = child_paths
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, child_path)| is_prefix(child_path, dst_path))
|
||||
.max_by_key(|(_, child_path)| child_path.len())
|
||||
.map(|(index, _)| index);
|
||||
|
||||
if let Some(index) = child {
|
||||
return RouteDecision::Child(index);
|
||||
}
|
||||
if local_path == dst_path {
|
||||
return RouteDecision::Local;
|
||||
}
|
||||
if has_parent && !is_prefix(local_path, dst_path) {
|
||||
return RouteDecision::Parent;
|
||||
}
|
||||
RouteDecision::Drop
|
||||
DefaultRouteProvider.route_destination(local_path, child_paths, has_parent, dst_path)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -109,12 +135,13 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn longest_prefix_wins() {
|
||||
let provider = DefaultRouteProvider;
|
||||
let children = vec![
|
||||
vec![String::from("a")],
|
||||
vec![String::from("a"), String::from("b")],
|
||||
];
|
||||
assert_eq!(
|
||||
route_destination(
|
||||
provider.route_destination(
|
||||
&Vec::<String>::new(),
|
||||
&children,
|
||||
false,
|
||||
@@ -1,4 +1,7 @@
|
||||
//! Archived protocol message types.
|
||||
//! Canonical UnShell protocol message types.
|
||||
//!
|
||||
//! These types define the wire format and are designed for zero-copy
|
||||
//! access via `rkyv`.
|
||||
|
||||
use alloc::{string::String, vec::Vec};
|
||||
use rkyv::{Archive, Deserialize, Serialize};
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
//! Stateless protocol validation.
|
||||
|
||||
use core::fmt;
|
||||
|
||||
use crate::protocol::{
|
||||
CallMessage, PacketHeader, PacketType, introspection::INTROSPECTION_PROCEDURE_ID,
|
||||
};
|
||||
use core::fmt;
|
||||
|
||||
/// Validation failures for protocol structures.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
@@ -27,14 +26,9 @@ impl fmt::Display for ValidationError {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl std::error::Error for ValidationError {}
|
||||
impl core::error::Error for ValidationError {}
|
||||
|
||||
/// Validates packet header invariants from the protocol.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`ValidationError`] when the header shape does not match the packet type.
|
||||
pub fn validate_header(header: &PacketHeader) -> Result<(), ValidationError> {
|
||||
match header.packet_type {
|
||||
PacketType::Call => {
|
||||
@@ -57,15 +51,10 @@ pub fn validate_header(header: &PacketHeader) -> Result<(), ValidationError> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validates the canonical dotted `procedure_id` shape.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`ValidationError`] when the procedure id does not match the required format.
|
||||
pub fn validate_procedure_id(procedure_id: &str) -> Result<(), ValidationError> {
|
||||
if procedure_id == INTROSPECTION_PROCEDURE_ID {
|
||||
return Ok(());
|
||||
@@ -114,10 +103,6 @@ pub fn validate_procedure_id(procedure_id: &str) -> Result<(), ValidationError>
|
||||
}
|
||||
|
||||
/// Validates call-specific invariants that depend on both header and payload.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`ValidationError`] when the call payload conflicts with the header.
|
||||
pub fn validate_call(header: &PacketHeader, call: &CallMessage) -> Result<(), ValidationError> {
|
||||
validate_procedure_id(&call.procedure_id)?;
|
||||
|
||||
@@ -141,49 +126,3 @@ pub fn validate_call(header: &PacketHeader, call: &CallMessage) -> Result<(), Va
|
||||
fn is_portable_procedure_char(ch: char) -> bool {
|
||||
ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_'
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::protocol::{HookTarget, PacketType};
|
||||
use alloc::{string::String, vec};
|
||||
|
||||
#[test]
|
||||
fn rejects_invalid_data_header() {
|
||||
let header = PacketHeader {
|
||||
packet_type: PacketType::Data,
|
||||
src_path: Vec::new(),
|
||||
dst_path: Vec::new(),
|
||||
dst_leaf: Some(String::from("leaf")),
|
||||
hook_id: None,
|
||||
};
|
||||
assert!(validate_header(&header).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validates_procedure_id_shape() {
|
||||
assert!(validate_procedure_id("org.product.v1.demo.echo").is_ok());
|
||||
assert!(validate_procedure_id("org.product.v01.demo.echo").is_err());
|
||||
assert!(validate_procedure_id("Org.product.v1.demo.echo").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validates_response_hook_return_path() {
|
||||
let header = PacketHeader {
|
||||
packet_type: PacketType::Call,
|
||||
src_path: vec![String::from("src")],
|
||||
dst_path: vec![String::from("dst")],
|
||||
dst_leaf: None,
|
||||
hook_id: None,
|
||||
};
|
||||
let call = CallMessage {
|
||||
procedure_id: String::from("org.product.v1.demo.echo"),
|
||||
data: Vec::new(),
|
||||
response_hook: Some(HookTarget {
|
||||
hook_id: 1,
|
||||
return_path: vec![String::from("other")],
|
||||
}),
|
||||
};
|
||||
assert!(validate_call(&header, &call).is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,77 +0,0 @@
|
||||
//! Simulated transport built on `crossbeam-channel`.
|
||||
|
||||
use crossbeam_channel::{Receiver, Sender, unbounded};
|
||||
|
||||
use crate::{
|
||||
protocol::FrameBytes,
|
||||
transport::{Transport, TransportError},
|
||||
};
|
||||
|
||||
/// One endpoint of a simulated duplex transport.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ChannelTransport {
|
||||
sender: Sender<FrameBytes>,
|
||||
receiver: Receiver<FrameBytes>,
|
||||
}
|
||||
|
||||
impl ChannelTransport {
|
||||
/// Builds a connected pair of transports.
|
||||
pub fn pair() -> (Self, Self) {
|
||||
let (ab_tx, ab_rx) = unbounded();
|
||||
let (ba_tx, ba_rx) = unbounded();
|
||||
(
|
||||
Self {
|
||||
sender: ab_tx,
|
||||
receiver: ba_rx,
|
||||
},
|
||||
Self {
|
||||
sender: ba_tx,
|
||||
receiver: ab_rx,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Transport for ChannelTransport {
|
||||
fn send_frame(&mut self, frame: FrameBytes) -> Result<(), TransportError> {
|
||||
self.sender
|
||||
.send(frame)
|
||||
.map_err(|_| TransportError::ChannelClosed)
|
||||
}
|
||||
|
||||
fn recv_frame(&mut self) -> Result<FrameBytes, TransportError> {
|
||||
self.receiver
|
||||
.recv()
|
||||
.map_err(|_| TransportError::ChannelClosed)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::protocol::{DataMessage, PacketHeader, PacketType, decode_frame, encode_packet};
|
||||
use alloc::{string::String, vec};
|
||||
|
||||
#[test]
|
||||
fn channel_roundtrip_moves_framed_bytes() {
|
||||
let (mut left, mut right) = ChannelTransport::pair();
|
||||
let header = PacketHeader {
|
||||
packet_type: PacketType::Data,
|
||||
src_path: vec![String::from("a")],
|
||||
dst_path: vec![String::from("b")],
|
||||
dst_leaf: None,
|
||||
hook_id: Some(7),
|
||||
};
|
||||
let data = DataMessage {
|
||||
procedure_id: String::from("org.product.v1.echo.roundtrip"),
|
||||
data: b"payload".to_vec(),
|
||||
end_hook: true,
|
||||
};
|
||||
let frame = encode_packet(&header, &data).expect("frame should encode");
|
||||
|
||||
left.send_frame(frame).expect("send should succeed");
|
||||
let received = right.recv_frame().expect("recv should succeed");
|
||||
let parsed = decode_frame(&received).expect("received frame should decode");
|
||||
assert_eq!(parsed.deserialize_data().expect("data should decode"), data);
|
||||
}
|
||||
}
|
||||
@@ -1,79 +0,0 @@
|
||||
//! Framed transport implementations.
|
||||
//!
|
||||
//! Transports move complete framed packets represented by [`crate::protocol::FrameBytes`].
|
||||
//! Packet parsing and validation live above this layer.
|
||||
|
||||
use crate::protocol::FrameBytes;
|
||||
|
||||
#[cfg(feature = "sim")]
|
||||
pub mod channel;
|
||||
#[cfg(feature = "tcp")]
|
||||
pub mod tcp;
|
||||
|
||||
/// Maximum allowed size for a serialized header section.
|
||||
pub const MAX_HEADER_BYTES: usize = 64 * 1024;
|
||||
|
||||
/// Maximum allowed size for a serialized payload section.
|
||||
pub const MAX_PAYLOAD_BYTES: usize = 64 * 1024 * 1024;
|
||||
|
||||
/// Transport-layer failure.
|
||||
#[derive(Debug)]
|
||||
pub enum TransportError {
|
||||
/// The peer disconnected cleanly.
|
||||
Disconnected,
|
||||
/// The announced header length exceeded the limit.
|
||||
HeaderTooLarge(usize, usize),
|
||||
/// The announced payload length exceeded the limit.
|
||||
PayloadTooLarge(usize, usize),
|
||||
/// Underlying I/O failure.
|
||||
#[cfg(feature = "tcp")]
|
||||
Io(std::io::Error),
|
||||
/// Channel send or receive failure.
|
||||
#[cfg(feature = "sim")]
|
||||
ChannelClosed,
|
||||
}
|
||||
|
||||
impl core::fmt::Display for TransportError {
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
match self {
|
||||
Self::Disconnected => f.write_str("transport disconnected"),
|
||||
Self::HeaderTooLarge(got, max) => {
|
||||
write!(f, "header too large: {got} bytes (limit {max})")
|
||||
}
|
||||
Self::PayloadTooLarge(got, max) => {
|
||||
write!(f, "payload too large: {got} bytes (limit {max})")
|
||||
}
|
||||
#[cfg(feature = "tcp")]
|
||||
Self::Io(error) => write!(f, "transport I/O error: {error}"),
|
||||
#[cfg(feature = "sim")]
|
||||
Self::ChannelClosed => f.write_str("channel transport closed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
impl std::error::Error for TransportError {}
|
||||
|
||||
#[cfg(feature = "tcp")]
|
||||
impl From<std::io::Error> for TransportError {
|
||||
fn from(value: std::io::Error) -> Self {
|
||||
Self::Io(value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Duplex framed transport.
|
||||
pub trait Transport: Send {
|
||||
/// Sends one complete framed packet.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`TransportError`] when the underlying transport cannot deliver the frame.
|
||||
fn send_frame(&mut self, frame: FrameBytes) -> Result<(), TransportError>;
|
||||
|
||||
/// Receives one complete framed packet.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`TransportError`] when the transport disconnects or a frame cannot be read.
|
||||
fn recv_frame(&mut self) -> Result<FrameBytes, TransportError>;
|
||||
}
|
||||
@@ -1,132 +0,0 @@
|
||||
//! TCP framed transport.
|
||||
|
||||
use alloc::vec::Vec;
|
||||
use std::{
|
||||
io::{ErrorKind, Read, Write},
|
||||
net::{TcpStream, ToSocketAddrs},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
protocol::FrameBytes,
|
||||
transport::{MAX_HEADER_BYTES, MAX_PAYLOAD_BYTES, Transport, TransportError},
|
||||
};
|
||||
|
||||
/// Framed TCP transport.
|
||||
pub struct TcpTransport {
|
||||
stream: TcpStream,
|
||||
}
|
||||
|
||||
impl TcpTransport {
|
||||
/// Connects to a remote address.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`TransportError`] when the TCP connection cannot be established.
|
||||
pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self, TransportError> {
|
||||
Ok(Self {
|
||||
stream: TcpStream::connect(addr)?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Wraps an existing TCP stream.
|
||||
pub fn from_stream(stream: TcpStream) -> Self {
|
||||
Self { stream }
|
||||
}
|
||||
}
|
||||
|
||||
impl Transport for TcpTransport {
|
||||
fn send_frame(&mut self, frame: FrameBytes) -> Result<(), TransportError> {
|
||||
self.stream.write_all(&frame).map_err(map_io_error)
|
||||
}
|
||||
|
||||
fn recv_frame(&mut self) -> Result<FrameBytes, TransportError> {
|
||||
let header_len = read_u32(&mut self.stream)?;
|
||||
if header_len > MAX_HEADER_BYTES {
|
||||
return Err(TransportError::HeaderTooLarge(header_len, MAX_HEADER_BYTES));
|
||||
}
|
||||
|
||||
let mut header = vec![0u8; header_len];
|
||||
read_exact(&mut self.stream, &mut header)?;
|
||||
|
||||
let payload_len = read_u32(&mut self.stream)?;
|
||||
if payload_len > MAX_PAYLOAD_BYTES {
|
||||
return Err(TransportError::PayloadTooLarge(
|
||||
payload_len,
|
||||
MAX_PAYLOAD_BYTES,
|
||||
));
|
||||
}
|
||||
|
||||
let mut payload = vec![0u8; payload_len];
|
||||
read_exact(&mut self.stream, &mut payload)?;
|
||||
|
||||
let mut frame = Vec::with_capacity(8 + header_len + payload_len);
|
||||
frame.extend_from_slice(&(header_len as u32).to_be_bytes());
|
||||
frame.extend_from_slice(&header);
|
||||
frame.extend_from_slice(&(payload_len as u32).to_be_bytes());
|
||||
frame.extend_from_slice(&payload);
|
||||
Ok(frame.into_boxed_slice())
|
||||
}
|
||||
}
|
||||
|
||||
fn read_u32(stream: &mut TcpStream) -> Result<usize, TransportError> {
|
||||
let mut bytes = [0u8; 4];
|
||||
read_exact(stream, &mut bytes)?;
|
||||
Ok(u32::from_be_bytes(bytes) as usize)
|
||||
}
|
||||
|
||||
fn read_exact(stream: &mut TcpStream, buffer: &mut [u8]) -> Result<(), TransportError> {
|
||||
stream.read_exact(buffer).map_err(map_io_error)
|
||||
}
|
||||
|
||||
fn map_io_error(error: std::io::Error) -> TransportError {
|
||||
match error.kind() {
|
||||
ErrorKind::UnexpectedEof | ErrorKind::BrokenPipe | ErrorKind::ConnectionReset => {
|
||||
TransportError::Disconnected
|
||||
}
|
||||
_ => TransportError::Io(error),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::protocol::{DataMessage, PacketHeader, PacketType, decode_frame, encode_packet};
|
||||
use alloc::{string::String, vec};
|
||||
use std::{net::TcpListener, thread};
|
||||
|
||||
#[test]
|
||||
fn tcp_roundtrip_preserves_frame() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").expect("bind should succeed");
|
||||
let addr = listener.local_addr().expect("local address should exist");
|
||||
|
||||
let header = PacketHeader {
|
||||
packet_type: PacketType::Data,
|
||||
src_path: vec![String::from("a")],
|
||||
dst_path: vec![String::from("b")],
|
||||
dst_leaf: None,
|
||||
hook_id: Some(9),
|
||||
};
|
||||
let payload = DataMessage {
|
||||
procedure_id: String::from("org.product.v1.echo.roundtrip"),
|
||||
data: b"payload".to_vec(),
|
||||
end_hook: true,
|
||||
};
|
||||
let frame = encode_packet(&header, &payload).expect("frame should encode");
|
||||
|
||||
let sender = thread::spawn(move || {
|
||||
let mut transport = TcpTransport::connect(addr).expect("connect should succeed");
|
||||
transport.send_frame(frame).expect("send should succeed");
|
||||
});
|
||||
|
||||
let (stream, _) = listener.accept().expect("accept should succeed");
|
||||
let mut transport = TcpTransport::from_stream(stream);
|
||||
let received = transport.recv_frame().expect("recv should succeed");
|
||||
let parsed = decode_frame(&received).expect("frame should decode");
|
||||
|
||||
sender.join().expect("sender should not panic");
|
||||
assert_eq!(
|
||||
parsed.deserialize_data().expect("data should decode"),
|
||||
payload
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user