Move protocol to workspace root.

This commit is contained in:
Michael Mikovsky
2026-05-31 08:58:08 -06:00
parent ca1daedebe
commit 0a44bc93de
29 changed files with 844 additions and 71 deletions
+79
View File
@@ -0,0 +1,79 @@
use alloc::vec::Vec;
use super::tree::{BlockChunk, ChildKind, ChildSummary};
/// Encodes one `u32` request or response payload.
pub(super) fn encode_u32(value: u32) -> Vec<u8> {
value.to_le_bytes().to_vec()
}
/// Decodes one exact `u32` payload.
pub(super) fn decode_u32(data: &[u8]) -> Option<u32> {
if data.len() == 4 {
Some(read_u32(data, 0))
} else {
None
}
}
/// Encodes one streamed child hash entry.
pub(super) fn encode_child_summary(summary: ChildSummary) -> Vec<u8> {
let mut data = Vec::with_capacity(12);
data.extend_from_slice(&summary.id.to_le_bytes());
data.extend_from_slice(&summary.kind.discriminant().to_le_bytes());
data.extend_from_slice(&summary.hash.to_le_bytes());
data
}
/// Decodes one streamed child hash entry.
pub(super) fn decode_child_summary(data: &[u8]) -> Option<ChildSummary> {
if data.len() != 12 {
return None;
}
Some(ChildSummary {
id: read_u32(data, 0),
kind: ChildKind::from_discriminant(read_u32(data, 4))?,
hash: read_u32(data, 8),
})
}
/// Encodes one streamed block chunk.
pub(super) fn encode_block_chunk(chunk: &BlockChunk) -> Vec<u8> {
let mut data = Vec::with_capacity(16 + chunk.data.len());
data.extend_from_slice(&chunk.block_id.to_le_bytes());
data.extend_from_slice(&chunk.index.to_le_bytes());
data.extend_from_slice(&chunk.total.to_le_bytes());
data.extend_from_slice(&(chunk.data.len() as u32).to_le_bytes());
data.extend_from_slice(&chunk.data);
data
}
/// Decodes one streamed block chunk.
pub(super) fn decode_block_chunk(data: &[u8]) -> Option<BlockChunk> {
if data.len() < 16 {
return None;
}
let len = read_u32(data, 12) as usize;
if data.len() != 16 + len {
return None;
}
Some(BlockChunk {
block_id: read_u32(data, 0),
index: read_u32(data, 4),
total: read_u32(data, 8),
data: data[16..].to_vec(),
})
}
/// Reads a little-endian `u32` at a known-valid offset.
fn read_u32(data: &[u8], offset: usize) -> u32 {
u32::from_le_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
])
}
@@ -0,0 +1,27 @@
//! Shared ids for the Merkle sync protocol test.
//!
//! Keeping ids in one file makes the manually managed leaf state easier to audit
//! and mirrors the table a future leaf-state macro would generate from annotated
//! RPC definitions.
pub(super) const ENDPOINT_CALLER: u32 = 0;
pub(super) const ENDPOINT_RESPONDENT: u32 = 1;
pub(super) const LEAF_MERKLE_CALLER: u32 = 300;
pub(super) const LEAF_MERKLE_RESPONDENT: u32 = 301;
pub(super) const LEAF_MOCK_CONNECTION: u32 = 302;
pub(super) const PROC_GET_ROOT_HASH: u32 = 10;
pub(super) const PROC_GET_CHILD_HASHES: u32 = 11;
pub(super) const PROC_GET_BLOCK_STREAM: u32 = 12;
pub(super) const PROC_ROOT_HASH: u32 = 20;
pub(super) const PROC_CHILD_HASH_ENTRY: u32 = 21;
pub(super) const PROC_BLOCK_CHUNK: u32 = 22;
pub(super) const ROOT_NODE: u32 = 0;
pub(super) const BRANCH_LEFT: u32 = 1;
pub(super) const BRANCH_RIGHT: u32 = 2;
pub(super) const BLOCK_ALPHA: u32 = 10;
pub(super) const BLOCK_BRAVO: u32 = 11;
pub(super) const BLOCK_CHARLIE: u32 = 20;
pub(super) const BLOCK_DELTA: u32 = 21;
+119
View File
@@ -0,0 +1,119 @@
use alloc::{boxed::Box, rc::Rc, vec};
use core::cell::RefCell;
use crate::protocol::Endpoint;
use super::{
constants::{ENDPOINT_CALLER, ENDPOINT_RESPONDENT},
leaves::{MerkleCallerLeaf, MerkleRespondentLeaf, MockConnectionLeaf},
state::{CallerReport, RespondentReport},
tree::{MerkleStore, local_fixture, remote_fixture},
};
/// Complete two-endpoint Merkle sync test harness.
///
/// Endpoint A owns the caller leaf and one mock connection leaf. Endpoint B owns the
/// respondent leaf and the opposite mock connection leaf. Reports are shared out of
/// the boxed leaf objects so tests can assert state without downcasting trait
/// objects.
pub(super) struct MerkleHarness {
pub(super) endpoint_a: Endpoint,
pub(super) endpoint_b: Endpoint,
pub(super) caller_report: Rc<RefCell<CallerReport>>,
pub(super) respondent_report: Rc<RefCell<RespondentReport>>,
pub(super) remote_root_hash: u32,
}
impl MerkleHarness {
/// Creates the divergent fixture used by the main sync test.
pub(super) fn divergent() -> Self {
Self::with_stores(local_fixture(), remote_fixture())
}
/// Creates a custom caller/respondent fixture.
pub(super) fn with_stores(local: MerkleStore, remote: MerkleStore) -> Self {
let remote_root_hash = remote.root_hash();
let caller_report = Rc::new(RefCell::new(CallerReport::default()));
let respondent_report = Rc::new(RefCell::new(RespondentReport::default()));
let (tx_a, rx_a) = crossbeam_channel::unbounded();
let (tx_b, rx_b) = crossbeam_channel::unbounded();
let mut endpoint_a = Endpoint::new(
ENDPOINT_CALLER,
vec![
Box::new(MerkleCallerLeaf::new(local, caller_report.clone())),
Box::new(MockConnectionLeaf::new(
tx_b,
rx_a,
ENDPOINT_RESPONDENT,
false,
)),
],
);
endpoint_a.path = vec![ENDPOINT_CALLER];
let mut endpoint_b = Endpoint::new(
ENDPOINT_RESPONDENT,
vec![
Box::new(MerkleRespondentLeaf::new(remote, respondent_report.clone())),
Box::new(MockConnectionLeaf::new(tx_a, rx_b, ENDPOINT_CALLER, true)),
],
);
endpoint_b.path = vec![ENDPOINT_CALLER, ENDPOINT_RESPONDENT];
// Register routes before the first caller update so initial packet delivery
// does not depend on leaf ordering.
endpoint_a.connections.insert((ENDPOINT_RESPONDENT, false));
endpoint_b.connections.insert((ENDPOINT_CALLER, true));
Self {
endpoint_a,
endpoint_b,
caller_report,
respondent_report,
remote_root_hash,
}
}
/// Drives one deterministic protocol loop.
pub(super) fn tick(&mut self) {
self.endpoint_a.update();
self.endpoint_b.update();
}
/// Runs until the caller reports completion.
pub(super) fn run_until_done(&mut self, max_ticks: usize) -> usize {
for tick in 1..=max_ticks {
self.tick();
if self.caller_report.borrow().done {
return tick;
}
}
panic!("Merkle sync did not finish within {max_ticks} ticks");
}
/// Runs until the respondent has sent at least `target_frames` frames.
pub(super) fn run_until_respondent_frames(
&mut self,
target_frames: usize,
max_ticks: usize,
) -> usize {
for tick in 1..=max_ticks {
self.tick();
if self.respondent_report.borrow().frames_sent >= target_frames {
return tick;
}
}
panic!("respondent did not send {target_frames} frames within {max_ticks} ticks");
}
/// Verifies the requested four-leaf topology.
pub(super) fn assert_four_leaf_topology(&self) {
assert_eq!(self.endpoint_a.leaves.len(), 2);
assert_eq!(self.endpoint_b.leaves.len(), 2);
}
}
+403
View File
@@ -0,0 +1,403 @@
use alloc::{collections::VecDeque, rc::Rc, vec, vec::Vec};
use core::cell::RefCell;
use crossbeam_channel::{Receiver, Sender};
use crate::protocol::{Endpoint, Leaf, Packet};
use super::{
codec::{decode_block_chunk, decode_child_summary, decode_u32},
constants::{
ENDPOINT_CALLER, ENDPOINT_RESPONDENT, LEAF_MERKLE_CALLER, LEAF_MERKLE_RESPONDENT,
LEAF_MOCK_CONNECTION, PROC_BLOCK_CHUNK, PROC_CHILD_HASH_ENTRY, PROC_GET_BLOCK_STREAM,
PROC_GET_CHILD_HASHES, PROC_GET_ROOT_HASH, PROC_ROOT_HASH, ROOT_NODE,
},
rpc::{
block_chunk_frame, block_stream_request, child_hash_frame, child_hashes_request,
root_hash_frame, root_hash_request,
},
state::{CallerPhase, CallerReport, RespondentReport, ResponseStream},
tree::{BlockChunk, ChildKind, MerkleStore},
};
/// Leaf that simulates a serialized transport connection with crossbeam channels.
///
/// This is intentionally tiny and reusable. Both endpoints in the Merkle test have
/// exactly one of these leaves, giving the requested four-leaf topology: caller,
/// respondent, and two mock connections.
pub(super) struct MockConnectionLeaf {
pub(super) tx: Sender<Vec<u8>>,
pub(super) rx: Receiver<Vec<u8>>,
pub(super) remote_id: u32,
pub(super) is_authority: bool,
pub(super) started: bool,
}
/// Caller leaf that drives the Merkle synchronization algorithm.
pub(super) struct MerkleCallerLeaf {
local: MerkleStore,
phase: CallerPhase,
pending_nodes: VecDeque<u32>,
pending_blocks: VecDeque<u32>,
report: Rc<RefCell<CallerReport>>,
}
/// Respondent leaf that serves Merkle hash and block streams.
pub(super) struct MerkleRespondentLeaf {
remote: MerkleStore,
active_stream: Option<ResponseStream>,
report: Rc<RefCell<RespondentReport>>,
}
impl MockConnectionLeaf {
/// Creates one side of a mock connection.
pub(super) fn new(
tx: Sender<Vec<u8>>,
rx: Receiver<Vec<u8>>,
remote_id: u32,
is_authority: bool,
) -> Self {
Self {
tx,
rx,
remote_id,
is_authority,
started: false,
}
}
}
impl MerkleCallerLeaf {
/// Creates a caller with a local store and externally visible report.
pub(super) fn new(local: MerkleStore, report: Rc<RefCell<CallerReport>>) -> Self {
Self {
local,
phase: CallerPhase::NeedRoot,
pending_nodes: VecDeque::new(),
pending_blocks: VecDeque::new(),
report,
}
}
}
impl MerkleRespondentLeaf {
/// Creates a respondent backed by the authoritative remote store.
pub(super) fn new(remote: MerkleStore, report: Rc<RefCell<RespondentReport>>) -> Self {
Self {
remote,
active_stream: None,
report,
}
}
}
impl Leaf for MockConnectionLeaf {
fn get_id(&self) -> u32 {
LEAF_MOCK_CONNECTION
}
fn update(&mut self, endpoint: &mut Endpoint) {
if !self.started {
endpoint
.connections
.insert((self.remote_id, self.is_authority));
self.started = true;
}
while !self.rx.is_empty() {
let data = self.rx.recv().unwrap();
// Mock transports move untrusted bytes. Malformed frames are dropped so
// the sync state machine is tested only after packet parsing succeeds.
if let Ok(packet) = Packet::deserialize(&data) {
let _ = endpoint.add_inbound_from(self.remote_id, packet);
}
}
endpoint.take_outbound_clear(self.remote_id, |packet| {
let data = packet.serialize().unwrap();
let _ = self.tx.send(data);
});
}
}
impl Leaf for MerkleCallerLeaf {
fn get_id(&self) -> u32 {
LEAF_MERKLE_CALLER
}
fn update(&mut self, endpoint: &mut Endpoint) {
self.receive_responses(endpoint);
self.dispatch_next_request(endpoint);
}
}
impl Leaf for MerkleRespondentLeaf {
fn get_id(&self) -> u32 {
LEAF_MERKLE_RESPONDENT
}
fn update(&mut self, endpoint: &mut Endpoint) {
self.open_stream_from_request(endpoint);
self.send_one_response_frame(endpoint);
}
}
impl MerkleCallerLeaf {
/// Consumes all response packets currently delivered to endpoint A.
fn receive_responses(&mut self, endpoint: &mut Endpoint) {
endpoint.take_inbound_clear(ENDPOINT_CALLER, |packet| {
self.report
.borrow_mut()
.received_procedures
.push(packet.procedure_id);
self.handle_response_packet(packet);
});
}
/// Handles one response packet according to the current caller phase.
fn handle_response_packet(&mut self, packet: &Packet) {
match &mut self.phase {
CallerPhase::AwaitRoot { hook_id } => {
assert_eq!(packet.hook_id, *hook_id);
assert_eq!(packet.procedure_id, PROC_ROOT_HASH);
let remote_root = decode_u32(&packet.data).expect("root hash payload");
if packet.end_hook {
self.finish_root_response(remote_root);
}
}
CallerPhase::AwaitChildren {
hook_id,
node_id: _,
entries,
} => {
assert_eq!(packet.hook_id, *hook_id);
assert_eq!(packet.procedure_id, PROC_CHILD_HASH_ENTRY);
entries.push(decode_child_summary(&packet.data).expect("child summary payload"));
if packet.end_hook {
self.finish_child_response();
}
}
CallerPhase::AwaitBlock {
hook_id,
block_id: _,
chunks,
} => {
assert_eq!(packet.hook_id, *hook_id);
assert_eq!(packet.procedure_id, PROC_BLOCK_CHUNK);
chunks.push(decode_block_chunk(&packet.data).expect("block chunk payload"));
if packet.end_hook {
self.finish_block_response();
}
}
CallerPhase::NeedRoot | CallerPhase::Ready | CallerPhase::Done => {
panic!("unexpected Merkle response in phase {:?}", self.phase);
}
}
}
/// Applies the completed root response and decides whether tree walking is needed.
fn finish_root_response(&mut self, remote_root: u32) {
if self.local.root_hash() == remote_root {
self.mark_done();
} else {
self.pending_nodes.push_back(ROOT_NODE);
self.phase = CallerPhase::Ready;
}
}
/// Applies a completed child-hash stream.
fn finish_child_response(&mut self) {
let CallerPhase::AwaitChildren {
hook_id: _,
node_id: _,
entries,
} = core::mem::replace(&mut self.phase, CallerPhase::Ready)
else {
unreachable!();
};
for entry in entries {
if self.local.hash_for(entry.kind, entry.id) == entry.hash {
continue;
}
match entry.kind {
ChildKind::Branch => self.pending_nodes.push_back(entry.id),
ChildKind::Block => self.pending_blocks.push_back(entry.id),
}
}
}
/// Applies a completed block stream to the local store.
fn finish_block_response(&mut self) {
let CallerPhase::AwaitBlock {
hook_id: _,
block_id,
mut chunks,
} = core::mem::replace(&mut self.phase, CallerPhase::Ready)
else {
unreachable!();
};
chunks.sort_by_key(|chunk| chunk.index);
assert_eq!(
chunks.len(),
chunks.first().map(|chunk| chunk.total).unwrap_or(0) as usize
);
let new_chunks: Vec<Vec<u8>> = chunks.into_iter().map(|chunk| chunk.data).collect();
self.local.replace_block(block_id, new_chunks.clone());
let mut report = self.report.borrow_mut();
report.synchronized_blocks.push(block_id);
report.applied_block_chunks.push((block_id, new_chunks));
}
/// Sends the next request if the caller is not waiting on a response stream.
fn dispatch_next_request(&mut self, endpoint: &mut Endpoint) {
match self.phase {
CallerPhase::NeedRoot => {
let hook_id = self.send_request(endpoint, PROC_GET_ROOT_HASH, Vec::new());
endpoint.add_outbound(root_hash_request(hook_id)).unwrap();
self.phase = CallerPhase::AwaitRoot { hook_id };
}
CallerPhase::Ready => {
if let Some(node_id) = self.pending_nodes.pop_front() {
let hook_id = self.send_request(endpoint, PROC_GET_CHILD_HASHES, Vec::new());
endpoint
.add_outbound(child_hashes_request(hook_id, node_id))
.unwrap();
self.phase = CallerPhase::AwaitChildren {
hook_id,
node_id,
entries: Vec::new(),
};
} else if let Some(block_id) = self.pending_blocks.pop_front() {
let hook_id = self.send_request(endpoint, PROC_GET_BLOCK_STREAM, Vec::new());
endpoint
.add_outbound(block_stream_request(hook_id, block_id))
.unwrap();
self.phase = CallerPhase::AwaitBlock {
hook_id,
block_id,
chunks: Vec::new(),
};
} else {
self.mark_done();
}
}
CallerPhase::AwaitRoot { .. }
| CallerPhase::AwaitChildren { .. }
| CallerPhase::AwaitBlock { .. }
| CallerPhase::Done => {}
}
}
/// Reserves a hook id and records the logical RPC request.
fn send_request(&mut self, endpoint: &mut Endpoint, procedure_id: u32, _data: Vec<u8>) -> u16 {
let hook_id = endpoint.get_hook_id();
self.report
.borrow_mut()
.requested_procedures
.push(procedure_id);
hook_id
}
/// Marks the synchronization complete and records the final local root.
fn mark_done(&mut self) {
self.phase = CallerPhase::Done;
let mut report = self.report.borrow_mut();
report.done = true;
report.final_root_hash = Some(self.local.root_hash());
}
}
impl MerkleRespondentLeaf {
/// Opens one response stream from the first pending local request.
fn open_stream_from_request(&mut self, endpoint: &mut Endpoint) {
if self.active_stream.is_some() {
return;
}
let mut request = None;
endpoint.take_inbound_clear(ENDPOINT_RESPONDENT, |packet| {
if request.is_none() {
request = Some((packet.hook_id, packet.procedure_id, packet.data.clone()));
}
});
let Some((hook_id, procedure_id, data)) = request else {
return;
};
let frames = self.frames_for_request(procedure_id, &data);
self.report.borrow_mut().requests_seen.push(procedure_id);
if !frames.is_empty() {
self.report.borrow_mut().streams_started += 1;
self.active_stream = Some(ResponseStream::new(hook_id, frames));
}
}
/// Builds response frames for one request procedure.
fn frames_for_request(&self, procedure_id: u32, data: &[u8]) -> Vec<super::rpc::OutgoingFrame> {
match procedure_id {
PROC_GET_ROOT_HASH => vec![root_hash_frame(self.remote.root_hash())],
PROC_GET_CHILD_HASHES => {
let node_id = decode_u32(data).expect("child hash request node id");
self.remote
.child_summaries(node_id)
.into_iter()
.map(child_hash_frame)
.collect()
}
PROC_GET_BLOCK_STREAM => {
let block_id = decode_u32(data).expect("block stream request block id");
let chunks = self.remote.block_chunks(block_id);
let total = chunks.len() as u32;
chunks
.into_iter()
.enumerate()
.map(|(index, data)| {
block_chunk_frame(BlockChunk {
block_id,
index: index as u32,
total,
data,
})
})
.collect()
}
_ => Vec::new(),
}
}
/// Sends at most one response frame per update loop.
fn send_one_response_frame(&mut self, endpoint: &mut Endpoint) {
let Some(stream) = self.active_stream.as_mut() else {
return;
};
if stream.is_empty() {
self.active_stream = None;
return;
}
let packet = stream.next_packet().expect("active stream frame");
if endpoint.add_outbound(packet).is_err() {
return;
}
self.report.borrow_mut().frames_sent += 1;
stream.advance();
if stream.is_complete() {
self.report.borrow_mut().streams_completed += 1;
self.active_stream = None;
}
}
}
+8
View File
@@ -0,0 +1,8 @@
mod codec;
mod constants;
mod harness;
mod leaves;
mod rpc;
mod state;
mod tests;
mod tree;
+86
View File
@@ -0,0 +1,86 @@
use alloc::{vec, vec::Vec};
use crate::protocol::Packet;
use super::{
codec::{encode_block_chunk, encode_child_summary, encode_u32},
constants::{
ENDPOINT_CALLER, ENDPOINT_RESPONDENT, PROC_BLOCK_CHUNK, PROC_CHILD_HASH_ENTRY,
PROC_GET_BLOCK_STREAM, PROC_GET_CHILD_HASHES, PROC_GET_ROOT_HASH, PROC_ROOT_HASH,
},
tree::{BlockChunk, ChildSummary},
};
/// One outbound response frame before it is wrapped in endpoint routing fields.
///
/// A response stream owns a list of these frames and asks each frame to become a
/// packet only when the loop is ready to send it. That keeps retry behavior simple:
/// a failed send does not consume the frame.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) struct OutgoingFrame {
procedure_id: u32,
data: Vec<u8>,
}
impl OutgoingFrame {
/// Wraps the frame in an upward packet for `hook_id`.
pub(super) fn to_packet(&self, hook_id: u16, end_hook: bool) -> Packet {
Packet {
hook_id,
end_hook,
path: vec![ENDPOINT_CALLER],
procedure_id: self.procedure_id,
data: self.data.clone(),
}
}
}
/// Builds the initial root-hash request.
pub(super) fn root_hash_request(hook_id: u16) -> Packet {
request_packet(PROC_GET_ROOT_HASH, hook_id, Vec::new())
}
/// Builds a request for one branch node's child hashes.
pub(super) fn child_hashes_request(hook_id: u16, node_id: u32) -> Packet {
request_packet(PROC_GET_CHILD_HASHES, hook_id, encode_u32(node_id))
}
/// Builds a request for one mismatched block's data stream.
pub(super) fn block_stream_request(hook_id: u16, block_id: u32) -> Packet {
request_packet(PROC_GET_BLOCK_STREAM, hook_id, encode_u32(block_id))
}
/// Builds a single root-hash response frame.
pub(super) fn root_hash_frame(root_hash: u32) -> OutgoingFrame {
OutgoingFrame {
procedure_id: PROC_ROOT_HASH,
data: encode_u32(root_hash),
}
}
/// Builds one streamed child hash entry response frame.
pub(super) fn child_hash_frame(summary: ChildSummary) -> OutgoingFrame {
OutgoingFrame {
procedure_id: PROC_CHILD_HASH_ENTRY,
data: encode_child_summary(summary),
}
}
/// Builds one streamed block chunk response frame.
pub(super) fn block_chunk_frame(chunk: BlockChunk) -> OutgoingFrame {
OutgoingFrame {
procedure_id: PROC_BLOCK_CHUNK,
data: encode_block_chunk(&chunk),
}
}
/// Builds a downward request packet.
fn request_packet(procedure_id: u32, hook_id: u16, data: Vec<u8>) -> Packet {
Packet {
hook_id,
end_hook: false,
path: vec![ENDPOINT_CALLER, ENDPOINT_RESPONDENT],
procedure_id,
data,
}
}
+100
View File
@@ -0,0 +1,100 @@
use alloc::vec::Vec;
use crate::protocol::Packet;
use super::{
rpc::OutgoingFrame,
tree::{BlockChunk, ChildSummary},
};
/// Caller-side synchronization phase.
///
/// This is the manual state machine a future macro should be able to derive from
/// RPC declarations. Each awaiting state owns the partial stream it is collecting,
/// making it clear which packets are legal at each step.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) enum CallerPhase {
NeedRoot,
AwaitRoot {
hook_id: u16,
},
Ready,
AwaitChildren {
hook_id: u16,
node_id: u32,
entries: Vec<ChildSummary>,
},
AwaitBlock {
hook_id: u16,
block_id: u32,
chunks: Vec<BlockChunk>,
},
Done,
}
/// Test-visible caller observations.
///
/// The leaf itself lives behind `Box<dyn Leaf>`, so the harness keeps a shared
/// report handle for assertions without needing downcasts.
#[derive(Debug, Default)]
pub(super) struct CallerReport {
pub(super) done: bool,
pub(super) requested_procedures: Vec<u32>,
pub(super) received_procedures: Vec<u32>,
pub(super) synchronized_blocks: Vec<u32>,
pub(super) applied_block_chunks: Vec<(u32, Vec<Vec<u8>>)>,
pub(super) final_root_hash: Option<u32>,
}
/// Test-visible respondent observations.
#[derive(Debug, Default)]
pub(super) struct RespondentReport {
pub(super) requests_seen: Vec<u32>,
pub(super) streams_started: usize,
pub(super) streams_completed: usize,
pub(super) frames_sent: usize,
}
/// Respondent-owned response stream.
///
/// It stores encoded frames and exposes packet construction one frame at a time.
/// Since `next_packet` does not advance, a failed route can be retried by calling it
/// again on the next loop.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) struct ResponseStream {
hook_id: u16,
frames: Vec<OutgoingFrame>,
next_index: usize,
}
impl ResponseStream {
/// Creates a response stream for one request hook.
pub(super) fn new(hook_id: u16, frames: Vec<OutgoingFrame>) -> Self {
Self {
hook_id,
frames,
next_index: 0,
}
}
/// Builds the next packet without advancing the stream.
pub(super) fn next_packet(&self) -> Option<Packet> {
let frame = self.frames.get(self.next_index)?;
Some(frame.to_packet(self.hook_id, self.next_index + 1 == self.frames.len()))
}
/// Marks the current frame as successfully sent.
pub(super) fn advance(&mut self) {
self.next_index += 1;
}
/// Returns true once every frame has been sent.
pub(super) fn is_complete(&self) -> bool {
self.next_index >= self.frames.len()
}
/// Returns true when the request generated no frames.
pub(super) fn is_empty(&self) -> bool {
self.frames.is_empty()
}
}
+78
View File
@@ -0,0 +1,78 @@
use super::{
constants::{
BLOCK_BRAVO, BLOCK_CHARLIE, PROC_GET_BLOCK_STREAM, PROC_GET_CHILD_HASHES,
PROC_GET_ROOT_HASH,
},
harness::MerkleHarness,
tree::remote_fixture,
};
#[test]
fn merkle_sync_walks_hash_tree_and_streams_changed_blocks() {
let mut harness = MerkleHarness::divergent();
harness.assert_four_leaf_topology();
let ticks = harness.run_until_done(100);
assert!(
ticks > 20,
"sync should require many request/stream iterations"
);
let caller = harness.caller_report.borrow();
assert_eq!(caller.final_root_hash, Some(harness.remote_root_hash));
assert_eq!(caller.synchronized_blocks, [BLOCK_BRAVO, BLOCK_CHARLIE]);
assert_eq!(
caller.requested_procedures,
[
PROC_GET_ROOT_HASH,
PROC_GET_CHILD_HASHES,
PROC_GET_CHILD_HASHES,
PROC_GET_CHILD_HASHES,
PROC_GET_BLOCK_STREAM,
PROC_GET_BLOCK_STREAM,
]
);
let respondent = harness.respondent_report.borrow();
assert_eq!(respondent.requests_seen, caller.requested_procedures);
assert_eq!(respondent.streams_started, 6);
assert_eq!(respondent.streams_completed, 6);
assert_eq!(respondent.frames_sent, 12);
assert_eq!(harness.endpoint_b.hook_count(), 0);
}
#[test]
fn identical_tree_stops_after_root_hash() {
let remote = remote_fixture();
let mut harness = MerkleHarness::with_stores(remote.clone(), remote);
harness.run_until_done(20);
let caller = harness.caller_report.borrow();
assert_eq!(caller.final_root_hash, Some(harness.remote_root_hash));
assert_eq!(caller.requested_procedures, [PROC_GET_ROOT_HASH]);
assert!(caller.synchronized_blocks.is_empty());
let respondent = harness.respondent_report.borrow();
assert_eq!(respondent.frames_sent, 1);
assert_eq!(respondent.streams_started, 1);
assert_eq!(respondent.streams_completed, 1);
}
#[test]
fn block_stream_hook_persists_until_final_frame() {
let mut harness = MerkleHarness::divergent();
harness.run_until_respondent_frames(8, 100);
assert_eq!(
harness.endpoint_b.hook_count(),
1,
"first block stream should keep its hook after a non-final chunk"
);
harness.run_until_done(100);
assert!(
harness.endpoint_b.hook_count() == 0,
"final block stream packet should clean respondent hook state"
);
}
+255
View File
@@ -0,0 +1,255 @@
use alloc::{collections::BTreeMap, vec, vec::Vec};
use super::constants::{
BLOCK_ALPHA, BLOCK_BRAVO, BLOCK_CHARLIE, BLOCK_DELTA, BRANCH_LEFT, BRANCH_RIGHT, ROOT_NODE,
};
/// Type of child referenced by a Merkle node summary.
///
/// The sync caller uses this to decide whether a mismatched child should recurse
/// with `GET_CHILD_HASHES` or transfer data with `GET_BLOCK_STREAM`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum ChildKind {
Branch,
Block,
}
/// One child entry in a streamed Merkle summary response.
///
/// A respondent streams these one per loop. The caller compares each `hash` with
/// its local store and queues either another node walk or a block transfer.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct ChildSummary {
pub(super) id: u32,
pub(super) kind: ChildKind,
pub(super) hash: u32,
}
/// One chunk in a streamed block response.
///
/// Chunks carry their total so the caller can replace the local block only after
/// the final stream packet arrives. This keeps partially received data out of the
/// Merkle hash until the hook completes.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) struct BlockChunk {
pub(super) block_id: u32,
pub(super) index: u32,
pub(super) total: u32,
pub(super) data: Vec<u8>,
}
/// Static edge in the test Merkle tree.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct TreeChild {
id: u32,
kind: ChildKind,
}
/// In-memory Merkle store used by the caller and respondent leaves.
///
/// This is deliberately small but extensible: adding wider trees, extra branches,
/// or different block chunking only changes this store, not the endpoint routing
/// harness. The hash is not cryptographic; it is deterministic test content used to
/// exercise the protocol state machine.
#[derive(Debug, Clone)]
pub(super) struct MerkleStore {
root_id: u32,
children: BTreeMap<u32, Vec<TreeChild>>,
blocks: BTreeMap<u32, Vec<Vec<u8>>>,
}
impl MerkleStore {
/// Creates an empty store with the standard root id.
fn new() -> Self {
Self {
root_id: ROOT_NODE,
children: BTreeMap::new(),
blocks: BTreeMap::new(),
}
}
/// Returns the deterministic root hash for the current tree contents.
pub(super) fn root_hash(&self) -> u32 {
self.node_hash(self.root_id)
}
/// Returns child summaries for `node_id` in stable order.
pub(super) fn child_summaries(&self, node_id: u32) -> Vec<ChildSummary> {
self.children
.get(&node_id)
.map(|children| {
children
.iter()
.map(|child| ChildSummary {
id: child.id,
kind: child.kind,
hash: self.hash_for(child.kind, child.id),
})
.collect()
})
.unwrap_or_default()
}
/// Returns the local hash for a branch or block child.
pub(super) fn hash_for(&self, kind: ChildKind, id: u32) -> u32 {
match kind {
ChildKind::Branch => self.node_hash(id),
ChildKind::Block => self.block_hash(id),
}
}
/// Returns the stored chunks for a block, preserving stream order.
pub(super) fn block_chunks(&self, block_id: u32) -> Vec<Vec<u8>> {
self.blocks.get(&block_id).cloned().unwrap_or_default()
}
/// Replaces one local block after a complete block stream arrives.
pub(super) fn replace_block(&mut self, block_id: u32, chunks: Vec<Vec<u8>>) {
self.blocks.insert(block_id, chunks);
}
/// Computes a deterministic hash for a branch node.
fn node_hash(&self, node_id: u32) -> u32 {
let mut hash = mix_u32(0x4E4F_4445, node_id);
if let Some(children) = self.children.get(&node_id) {
for child in children {
hash = mix_u32(hash, child.id);
hash = mix_u32(hash, child.kind.discriminant());
hash = mix_u32(hash, self.hash_for(child.kind, child.id));
}
}
hash
}
/// Computes a deterministic hash for a data block.
fn block_hash(&self, block_id: u32) -> u32 {
let mut hash = mix_u32(0x424C_4F43, block_id);
if let Some(chunks) = self.blocks.get(&block_id) {
for chunk in chunks {
hash = mix_u32(hash, chunk.len() as u32);
hash = hash_bytes(hash, chunk);
}
}
hash
}
}
impl ChildKind {
/// Stable wire discriminant for streamed child summaries.
pub(super) fn discriminant(self) -> u32 {
match self {
ChildKind::Branch => 0,
ChildKind::Block => 1,
}
}
/// Decodes a stable wire discriminant.
pub(super) fn from_discriminant(value: u32) -> Option<Self> {
match value {
0 => Some(Self::Branch),
1 => Some(Self::Block),
_ => None,
}
}
}
/// Remote store containing the authoritative content.
pub(super) fn remote_fixture() -> MerkleStore {
let mut store = base_tree();
store
.blocks
.insert(BLOCK_ALPHA, chunks(&["alpha-", "same"]));
store
.blocks
.insert(BLOCK_BRAVO, chunks(&["bravo-", "remote-", "v2"]));
store
.blocks
.insert(BLOCK_CHARLIE, chunks(&["charlie-", "remote"]));
store.blocks.insert(BLOCK_DELTA, chunks(&["delta-same"]));
store
}
/// Local store with two stale blocks and two already matching blocks.
pub(super) fn local_fixture() -> MerkleStore {
let mut store = base_tree();
store
.blocks
.insert(BLOCK_ALPHA, chunks(&["alpha-", "same"]));
store
.blocks
.insert(BLOCK_BRAVO, chunks(&["bravo-", "local-", "v1"]));
store
.blocks
.insert(BLOCK_CHARLIE, chunks(&["charlie-", "local"]));
store.blocks.insert(BLOCK_DELTA, chunks(&["delta-same"]));
store
}
/// Tree topology shared by the local and remote fixtures.
fn base_tree() -> MerkleStore {
let mut store = MerkleStore::new();
store.children.insert(
ROOT_NODE,
vec![
TreeChild {
id: BRANCH_LEFT,
kind: ChildKind::Branch,
},
TreeChild {
id: BRANCH_RIGHT,
kind: ChildKind::Branch,
},
],
);
store.children.insert(
BRANCH_LEFT,
vec![
TreeChild {
id: BLOCK_ALPHA,
kind: ChildKind::Block,
},
TreeChild {
id: BLOCK_BRAVO,
kind: ChildKind::Block,
},
],
);
store.children.insert(
BRANCH_RIGHT,
vec![
TreeChild {
id: BLOCK_CHARLIE,
kind: ChildKind::Block,
},
TreeChild {
id: BLOCK_DELTA,
kind: ChildKind::Block,
},
],
);
store
}
/// Converts string slices into owned byte chunks.
fn chunks(parts: &[&str]) -> Vec<Vec<u8>> {
parts.iter().map(|part| part.as_bytes().to_vec()).collect()
}
/// FNV-like byte mixing used only for deterministic test hashes.
fn hash_bytes(mut hash: u32, bytes: &[u8]) -> u32 {
for byte in bytes {
hash ^= u32::from(*byte);
hash = hash.wrapping_mul(16_777_619);
}
hash
}
/// Mixes one little-endian integer into the deterministic test hash.
fn mix_u32(hash: u32, value: u32) -> u32 {
hash_bytes(hash, &value.to_le_bytes())
}
+495
View File
@@ -0,0 +1,495 @@
mod streams;
mod support;
use crate::protocol::{Endpoint, EndpointError, RouteDirection};
use alloc::{boxed::Box, vec};
use support::{
CommsLeaf, ControllerLeaf, ENDPOINT_A, ENDPOINT_B, ENDPOINT_C, ResponderLeaf,
assert_hook_present, assert_hook_removed, echo_packet, echo_packet_with_end, endpoint_at,
single_inbound_packet, single_outbound_packet,
};
#[test]
fn test_oneshot() {
let (tx_a, rx_a) = crossbeam_channel::unbounded();
let (tx_b, rx_b) = crossbeam_channel::unbounded();
let mut endpoint_a = Endpoint::new(
ENDPOINT_A,
vec![
Box::new(ControllerLeaf { has_run: false }),
Box::new(CommsLeaf {
tx: tx_b,
rx: rx_a,
remote_id: ENDPOINT_B,
is_authority: false,
started: false,
}),
],
);
endpoint_a.path = vec![ENDPOINT_A];
let mut endpoint_b = Endpoint::new(
ENDPOINT_B,
vec![
Box::new(ResponderLeaf),
Box::new(CommsLeaf {
tx: tx_a,
rx: rx_b,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
}),
],
);
endpoint_b.path = vec![ENDPOINT_A, ENDPOINT_B];
// Connections are registered routing state. The comms leaves also insert them
// during updates, but the first application packet should not depend on leaf order.
endpoint_a.connections.insert((ENDPOINT_B, false));
endpoint_b.connections.insert((ENDPOINT_A, true));
// Cycle 1: A sends request to B
endpoint_a.update();
endpoint_b.update();
// Cycle 2: B receives request and sends response to A
endpoint_b.update();
endpoint_a.update();
// Cycle 3: A's CommsLeaf needs one more update to pull the packet from the channel
// and put it into the inbound queue.
endpoint_a.update();
// Assertions on state
assert!(
endpoint_a.inbound.contains_key(&ENDPOINT_A),
"Endpoint A should have received response"
);
assert_eq!(
endpoint_a.inbound.get(&ENDPOINT_A).unwrap().len(),
1,
"Endpoint A should have exactly one packet"
);
let response = &endpoint_a
.inbound
.get(&ENDPOINT_A)
.unwrap()
.front()
.unwrap();
assert!(response.end_hook);
assert_eq!(response.data, "ABC123".as_bytes());
assert!(
endpoint_b.hook_count() == 0,
"responder hook should be cleaned after the upward response"
);
// assert_eq!(response.hook_id, HOOK_ECHO);
}
#[test]
fn inbound_downward_packet_for_local_endpoint_opens_hook() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.connections.insert((ENDPOINT_A, true));
endpoint
.add_inbound_from(
ENDPOINT_A,
echo_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id),
)
.unwrap();
let packet = single_inbound_packet(&endpoint, ENDPOINT_B);
assert!(!packet.end_hook);
assert_eq!(packet.path, vec![ENDPOINT_A, ENDPOINT_B]);
assert_hook_present(&endpoint, hook_id);
assert_eq!(endpoint.hook_peer(hook_id), Some(ENDPOINT_A));
assert!(endpoint.outbound.is_empty());
}
#[test]
fn outbound_packet_for_local_endpoint_is_delivered_locally() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id))
.unwrap();
let packet = single_inbound_packet(&endpoint, ENDPOINT_B);
assert!(!packet.end_hook);
assert_eq!(packet.data, "ABC123".as_bytes());
assert_hook_removed(&endpoint, hook_id);
assert!(endpoint.outbound.is_empty());
}
#[test]
fn inbound_downward_packet_routes_to_immediate_child() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.connections.insert((ENDPOINT_A, true));
endpoint.connections.insert((ENDPOINT_C, false));
endpoint
.add_inbound_from(
ENDPOINT_A,
echo_packet(vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C], hook_id),
)
.unwrap();
let packet = single_outbound_packet(&endpoint, ENDPOINT_C);
assert!(!packet.end_hook);
assert_eq!(packet.path, vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C]);
assert_hook_present(&endpoint, hook_id);
assert_eq!(endpoint.hook_peer(hook_id), Some(ENDPOINT_C));
assert!(!endpoint.outbound.contains_key(&ENDPOINT_A));
}
#[test]
fn outbound_downward_packet_routes_to_immediate_child() {
let mut endpoint = endpoint_at(ENDPOINT_A, vec![ENDPOINT_A]);
let hook_id = endpoint.get_hook_id();
endpoint.accept_hook(hook_id, ENDPOINT_B);
endpoint.connections.insert((ENDPOINT_B, false));
endpoint
.add_outbound(echo_packet_with_end(
vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C],
hook_id,
true,
))
.unwrap();
let packet = single_outbound_packet(&endpoint, ENDPOINT_B);
assert!(packet.end_hook);
assert_eq!(packet.path, vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C]);
assert_hook_removed(&endpoint, hook_id);
assert!(!endpoint.outbound.contains_key(&ENDPOINT_C));
}
#[test]
fn inbound_upward_packet_with_hook_routes_to_parent() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.accept_hook(hook_id, ENDPOINT_C);
endpoint.connections.insert((ENDPOINT_A, true));
endpoint.connections.insert((ENDPOINT_C, false));
endpoint
.add_inbound_from(
ENDPOINT_C,
echo_packet_with_end(vec![ENDPOINT_A], hook_id, true),
)
.unwrap();
let packet = single_outbound_packet(&endpoint, ENDPOINT_A);
assert!(packet.end_hook);
assert_eq!(packet.hook_id, hook_id);
assert_hook_removed(&endpoint, hook_id);
assert!(!endpoint.outbound.contains_key(&ENDPOINT_C));
}
#[test]
fn inbound_upward_packet_without_hook_is_rejected() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.connections.insert((ENDPOINT_A, true));
endpoint.connections.insert((ENDPOINT_C, false));
let error = endpoint
.add_inbound_from(
ENDPOINT_C,
echo_packet_with_end(vec![ENDPOINT_A], hook_id, true),
)
.unwrap_err();
assert!(matches!(
error,
EndpointError::UnknownHook { hook_id: observed_hook_id } if observed_hook_id == hook_id
));
assert!(endpoint.inbound.is_empty());
assert!(endpoint.outbound.is_empty());
}
#[test]
fn forged_upward_packet_with_unknown_hook_is_rejected() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
endpoint.accept_hook(7, ENDPOINT_C);
endpoint.connections.insert((ENDPOINT_A, true));
endpoint.connections.insert((ENDPOINT_C, false));
let error = endpoint
.add_inbound_from(ENDPOINT_C, echo_packet_with_end(vec![ENDPOINT_A], 99, true))
.unwrap_err();
assert!(matches!(error, EndpointError::UnknownHook { hook_id: 99 }));
assert_hook_present(&endpoint, 7);
assert!(endpoint.outbound.is_empty());
}
#[test]
fn forged_sideways_packet_is_rejected_as_incorrect_path() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.accept_hook(hook_id, ENDPOINT_A);
endpoint.connections.insert((ENDPOINT_A, true));
let error = endpoint
.add_inbound_from(
ENDPOINT_A,
echo_packet(vec![ENDPOINT_A, ENDPOINT_C], hook_id),
)
.unwrap_err();
assert!(matches!(error, EndpointError::DestinationOutsideLocalTree));
assert_hook_present(&endpoint, hook_id);
assert!(endpoint.inbound.is_empty());
assert!(endpoint.outbound.is_empty());
}
#[test]
fn malformed_frame_is_dropped_by_comms_leaf() {
let (tx_to_endpoint, rx_for_endpoint) = crossbeam_channel::unbounded();
let (tx_unused, _rx_unused) = crossbeam_channel::unbounded();
let mut endpoint = Endpoint::new(
ENDPOINT_B,
vec![Box::new(CommsLeaf {
tx: tx_unused,
rx: rx_for_endpoint,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
})],
);
endpoint.path = vec![ENDPOINT_A, ENDPOINT_B];
tx_to_endpoint.send(vec![0, 1, 2, 3]).unwrap();
endpoint.update();
assert!(endpoint.inbound.is_empty());
assert!(endpoint.outbound.is_empty());
}
#[test]
fn malformed_frame_does_not_block_following_valid_packet() {
let (tx_to_endpoint, rx_for_endpoint) = crossbeam_channel::unbounded();
let (tx_unused, _rx_unused) = crossbeam_channel::unbounded();
let hook_id = 42;
let mut endpoint = Endpoint::new(
ENDPOINT_B,
vec![Box::new(CommsLeaf {
tx: tx_unused,
rx: rx_for_endpoint,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
})],
);
endpoint.path = vec![ENDPOINT_A, ENDPOINT_B];
tx_to_endpoint.send(vec![0, 1, 2, 3]).unwrap();
tx_to_endpoint
.send(
echo_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id)
.serialize()
.unwrap(),
)
.unwrap();
endpoint.update();
let packet = single_inbound_packet(&endpoint, ENDPOINT_B);
assert!(!packet.end_hook);
assert_eq!(packet.hook_id, hook_id);
assert_hook_present(&endpoint, hook_id);
}
#[test]
fn forged_frame_without_required_hook_is_dropped_by_comms_leaf() {
let (tx_to_endpoint, rx_for_endpoint) = crossbeam_channel::unbounded();
let (tx_unused, _rx_unused) = crossbeam_channel::unbounded();
let mut endpoint = Endpoint::new(
ENDPOINT_B,
vec![Box::new(CommsLeaf {
tx: tx_unused,
rx: rx_for_endpoint,
remote_id: ENDPOINT_C,
is_authority: false,
started: false,
})],
);
endpoint.path = vec![ENDPOINT_A, ENDPOINT_B];
endpoint.accept_hook(7, ENDPOINT_C);
endpoint.connections.insert((ENDPOINT_A, true));
tx_to_endpoint
.send(
echo_packet_with_end(vec![ENDPOINT_A], 12, true)
.serialize()
.unwrap(),
)
.unwrap();
endpoint.update();
assert_hook_present(&endpoint, 7);
assert!(endpoint.inbound.is_empty());
assert!(endpoint.outbound.is_empty());
}
#[test]
fn upward_outbound_without_hook_is_rejected() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
endpoint.accept_hook(7, ENDPOINT_A);
endpoint.connections.insert((ENDPOINT_A, true));
let new_hook = endpoint.get_hook_id();
let error = endpoint
.add_outbound(echo_packet_with_end(vec![ENDPOINT_A], new_hook, true))
.unwrap_err();
assert!(matches!(
error,
EndpointError::UnknownHook { hook_id: observed_hook_id } if observed_hook_id == new_hook
));
assert_hook_present(&endpoint, 7);
assert!(endpoint.outbound.is_empty());
}
#[test]
fn downward_outbound_without_hook_is_allowed() {
let mut endpoint = endpoint_at(ENDPOINT_A, vec![ENDPOINT_A]);
endpoint.connections.insert((ENDPOINT_B, false));
let new_hook = endpoint.get_hook_id();
endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A, ENDPOINT_B], new_hook))
.unwrap();
assert_eq!(endpoint.outbound.get(&ENDPOINT_B).unwrap().len(), 1);
assert_hook_present(&endpoint, new_hook);
assert_eq!(endpoint.hook_peer(new_hook), Some(ENDPOINT_B));
}
#[test]
fn deeper_upward_route_uses_parent_as_next_hop() {
let mut endpoint = endpoint_at(ENDPOINT_C, vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C]);
let new_hook = endpoint.get_hook_id();
endpoint.accept_hook(new_hook, ENDPOINT_B);
endpoint.connections.insert((ENDPOINT_B, true));
endpoint
.add_outbound(echo_packet_with_end(vec![ENDPOINT_A], new_hook, true))
.unwrap();
assert!(endpoint.outbound.contains_key(&ENDPOINT_B));
assert!(!endpoint.outbound.contains_key(&ENDPOINT_A));
assert_hook_removed(&endpoint, new_hook);
}
#[test]
fn downward_route_without_connection_is_rejected() {
let mut endpoint = endpoint_at(ENDPOINT_A, vec![ENDPOINT_A]);
let hook_id = endpoint.get_hook_id();
let error = endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id))
.unwrap_err();
assert!(matches!(
error,
EndpointError::MissingConnection {
next_hop: ENDPOINT_B,
direction: RouteDirection::Downward,
}
));
assert_hook_removed(&endpoint, hook_id);
assert!(endpoint.outbound.is_empty());
}
#[test]
fn upward_route_without_connection_is_rejected_even_with_hook() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.accept_hook(hook_id, ENDPOINT_A);
let error = endpoint
.add_outbound(echo_packet_with_end(vec![ENDPOINT_A], hook_id, true))
.unwrap_err();
assert!(matches!(
error,
EndpointError::MissingConnection {
next_hop: ENDPOINT_A,
direction: RouteDirection::Upward,
}
));
assert_hook_present(&endpoint, hook_id);
assert!(endpoint.outbound.is_empty());
}
#[test]
fn end_hook_removes_hook_after_packet_is_queued() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.accept_hook(hook_id, ENDPOINT_A);
endpoint.connections.insert((ENDPOINT_A, true));
endpoint
.add_outbound(echo_packet_with_end(vec![ENDPOINT_A], hook_id, true))
.unwrap();
assert_hook_removed(&endpoint, hook_id);
assert_eq!(
single_outbound_packet(&endpoint, ENDPOINT_A).hook_id,
hook_id
);
}
#[test]
fn failed_end_hook_route_keeps_hook_state() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.accept_hook(hook_id, ENDPOINT_A);
let error = endpoint
.add_outbound(echo_packet_with_end(vec![ENDPOINT_A], hook_id, true))
.unwrap_err();
assert!(matches!(
error,
EndpointError::MissingConnection {
next_hop: ENDPOINT_A,
direction: RouteDirection::Upward,
}
));
assert_hook_present(&endpoint, hook_id);
assert!(endpoint.outbound.is_empty());
}
#[test]
fn inbound_without_absolute_path_is_rejected() {
let mut endpoint = Endpoint::new(ENDPOINT_A, vec![]);
let error = endpoint
.add_inbound(echo_packet(vec![ENDPOINT_A], 1))
.unwrap_err();
assert!(matches!(error, EndpointError::EndpointPathUnset));
assert!(endpoint.inbound.is_empty());
}
#[test]
fn outbound_without_absolute_path_is_rejected() {
let mut endpoint = Endpoint::new(ENDPOINT_A, vec![]);
let error = endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A], 1))
.unwrap_err();
assert!(matches!(error, EndpointError::EndpointPathUnset));
assert!(endpoint.outbound.is_empty());
}
+335
View File
@@ -0,0 +1,335 @@
use crate::protocol::{Endpoint, Leaf, Packet};
use alloc::{boxed::Box, format, vec, vec::Vec};
use super::support::{CommsLeaf, ENDPOINT_A, ENDPOINT_B, assert_hook_present, assert_hook_removed};
const LEAF_STREAM_CALLER: u32 = 200;
const LEAF_STREAM_RESPONDENT: u32 = 201;
const STREAM_HOOK_ID: u16 = 0;
/// Builds the initial downwards packet that opens the stream on the respondent.
///
/// The request keeps `end_hook = false` because it expects a return stream. Downward
/// routing now paves that hook automatically at every endpoint that accepts or
/// forwards the request.
fn stream_open_packet(hook_id: u16) -> Packet {
Packet {
hook_id,
end_hook: false,
path: vec![ENDPOINT_A, ENDPOINT_B],
procedure_id: 2,
data: b"open".to_vec(),
}
}
/// Builds one upward stream frame for a previously opened hook.
///
/// `end_hook` is false for every intermediate frame and true only for the final
/// frame. This is the behavior the routing layer relies on to keep hook state until
/// the stream has actually finished sending upward.
fn stream_frame_packet(hook_id: u16, index: usize, end_hook: bool) -> Packet {
Packet {
hook_id,
end_hook,
path: vec![ENDPOINT_A],
procedure_id: 3,
data: format!("stream-{index}").into_bytes(),
}
}
/// Caller leaf that opens exactly one stream request.
///
/// The first allocated hook id is deterministic in these tests (`0`) because the
/// endpoint starts with no existing hooks. Keeping the caller this small makes the
/// per-loop stream assertions about respondent behavior rather than caller retries.
struct StreamCallerLeaf {
has_run: bool,
}
/// Respondent leaf that converts the first request into a one-way stream.
///
/// This mimics a leaf spawning stream state, not a new endpoint: once a request is
/// delivered locally, the leaf records the hook and emits at most one frame on each
/// later `update`. A failed route does not advance the stream, so retry behavior can
/// be tested by restoring the connection on a later loop.
struct StreamRespondentLeaf {
stream: Option<StreamState>,
total_packets: usize,
}
/// In-flight stream state owned by the respondent leaf.
///
/// The endpoint routing layer only knows hooks and packets. This leaf-level state is
/// the minimal application-side record needed to emit ordered frames one at a time.
struct StreamState {
hook_id: u16,
next_index: usize,
}
impl StreamRespondentLeaf {
/// Creates a respondent that will emit `total_packets` stream frames.
fn new(total_packets: usize) -> Self {
Self {
stream: None,
total_packets,
}
}
}
impl Leaf for StreamCallerLeaf {
fn get_id(&self) -> u32 {
LEAF_STREAM_CALLER
}
fn update(&mut self, endpoint: &mut Endpoint) {
if self.has_run {
return;
}
let hook_id = endpoint.get_hook_id();
let _ = endpoint.add_outbound(stream_open_packet(hook_id));
self.has_run = true;
}
}
impl Leaf for StreamRespondentLeaf {
fn get_id(&self) -> u32 {
LEAF_STREAM_RESPONDENT
}
fn update(&mut self, endpoint: &mut Endpoint) {
self.open_stream_from_pending_request(endpoint);
self.send_next_frame(endpoint);
}
}
impl StreamRespondentLeaf {
/// Opens stream state from the first locally delivered request packet.
///
/// Downward request routing has already paved the hook before the packet reaches
/// this leaf. The leaf only owns stream ordering; endpoint routing owns hook
/// authorization and cleanup.
fn open_stream_from_pending_request(&mut self, endpoint: &mut Endpoint) {
if self.stream.is_some() {
return;
}
let local_id = endpoint.path.last().cloned().unwrap_or(0);
let mut opened_hook = None;
endpoint.take_inbound_clear(local_id, |packet| {
if opened_hook.is_none() {
opened_hook = Some(packet.hook_id);
}
});
if let Some(hook_id) = opened_hook {
self.stream = Some(StreamState {
hook_id,
next_index: 0,
});
}
}
/// Emits at most one frame for the active stream.
///
/// The stream only advances after the routing layer accepts the packet. This is
/// important for final packets: a failed final route must leave hook state and
/// stream progress intact so the next loop can retry instead of silently losing
/// the end-of-stream marker.
fn send_next_frame(&mut self, endpoint: &mut Endpoint) {
let Some(stream) = self.stream.as_mut() else {
return;
};
if stream.next_index >= self.total_packets {
self.stream = None;
return;
}
let index = stream.next_index;
let end_hook = index + 1 == self.total_packets;
let packet = stream_frame_packet(stream.hook_id, index, end_hook);
if endpoint.add_outbound(packet).is_ok() {
stream.next_index += 1;
if end_hook {
self.stream = None;
}
}
}
}
/// Two endpoint, four leaf stream harness.
///
/// Each endpoint has exactly one application leaf and one mock connection leaf. The
/// channel leaves are intentionally the same `CommsLeaf` used by the oneshot tests
/// so stream behavior exercises the same serialization and routing boundary.
fn stream_endpoints(total_packets: usize) -> (Endpoint, Endpoint) {
let (tx_a, rx_a) = crossbeam_channel::unbounded();
let (tx_b, rx_b) = crossbeam_channel::unbounded();
let mut endpoint_a = Endpoint::new(
ENDPOINT_A,
vec![
Box::new(StreamCallerLeaf { has_run: false }),
Box::new(CommsLeaf {
tx: tx_b,
rx: rx_a,
remote_id: ENDPOINT_B,
is_authority: false,
started: false,
}),
],
);
endpoint_a.path = vec![ENDPOINT_A];
let mut endpoint_b = Endpoint::new(
ENDPOINT_B,
vec![
Box::new(StreamRespondentLeaf::new(total_packets)),
Box::new(CommsLeaf {
tx: tx_a,
rx: rx_b,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
}),
],
);
endpoint_b.path = vec![ENDPOINT_A, ENDPOINT_B];
// Register routes before the first application packet so leaf order is not a
// hidden prerequisite for the initial request leaving endpoint A.
endpoint_a.connections.insert((ENDPOINT_B, false));
endpoint_b.connections.insert((ENDPOINT_A, true));
(endpoint_a, endpoint_b)
}
/// Asserts the requested two-endpoint, four-leaf topology.
fn assert_four_leaf_topology(endpoint_a: &Endpoint, endpoint_b: &Endpoint) {
assert_eq!(
endpoint_a.leaves.len(),
2,
"caller endpoint should have two leaves"
);
assert_eq!(
endpoint_b.leaves.len(),
2,
"respondent endpoint should have two leaves"
);
}
/// Drives the initial request until it is queued locally on endpoint B.
fn deliver_stream_request(endpoint_a: &mut Endpoint, endpoint_b: &mut Endpoint) {
endpoint_a.update();
endpoint_b.update();
}
/// Drives one respondent stream loop and delivers any produced frame to endpoint A.
fn drive_stream_loop(endpoint_a: &mut Endpoint, endpoint_b: &mut Endpoint) {
endpoint_b.update();
endpoint_a.update();
}
/// Returns stream packets that endpoint A has received so far.
fn received_stream_packets(endpoint: &Endpoint) -> Vec<&Packet> {
endpoint
.inbound
.get(&ENDPOINT_A)
.map(|queue| queue.iter().collect())
.unwrap_or_default()
}
/// Verifies ordered stream payloads and final-frame markers.
fn assert_received_stream(endpoint: &Endpoint, expected_count: usize, final_seen: bool) {
let packets = received_stream_packets(endpoint);
assert_eq!(packets.len(), expected_count);
for (index, packet) in packets.iter().enumerate() {
assert_eq!(packet.hook_id, STREAM_HOOK_ID);
assert_eq!(packet.data, format!("stream-{index}").as_bytes());
assert_eq!(
packet.end_hook,
final_seen && index + 1 == expected_count,
"only the last received packet should close the stream"
);
}
}
#[test]
fn one_directional_stream_returns_one_packet_per_loop() {
let total_packets = 3;
let (mut endpoint_a, mut endpoint_b) = stream_endpoints(total_packets);
assert_four_leaf_topology(&endpoint_a, &endpoint_b);
deliver_stream_request(&mut endpoint_a, &mut endpoint_b);
assert_received_stream(&endpoint_a, 0, false);
assert_hook_present(&endpoint_a, STREAM_HOOK_ID);
assert_hook_present(&endpoint_b, STREAM_HOOK_ID);
for index in 0..total_packets {
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
let final_seen = index + 1 == total_packets;
assert_received_stream(&endpoint_a, index + 1, final_seen);
if final_seen {
assert_hook_removed(&endpoint_a, STREAM_HOOK_ID);
assert_hook_removed(&endpoint_b, STREAM_HOOK_ID);
} else {
assert_hook_present(&endpoint_a, STREAM_HOOK_ID);
assert_hook_present(&endpoint_b, STREAM_HOOK_ID);
}
}
}
#[test]
fn stream_does_not_emit_before_request_is_processed_by_respondent() {
let (mut endpoint_a, mut endpoint_b) = stream_endpoints(2);
deliver_stream_request(&mut endpoint_a, &mut endpoint_b);
assert_received_stream(&endpoint_a, 0, false);
assert!(endpoint_b.outbound.is_empty());
assert_hook_present(&endpoint_a, STREAM_HOOK_ID);
assert_hook_present(&endpoint_b, STREAM_HOOK_ID);
}
#[test]
fn stream_stops_after_final_packet() {
let total_packets = 2;
let (mut endpoint_a, mut endpoint_b) = stream_endpoints(total_packets);
deliver_stream_request(&mut endpoint_a, &mut endpoint_b);
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
assert_received_stream(&endpoint_a, total_packets, true);
assert_hook_removed(&endpoint_b, STREAM_HOOK_ID);
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
assert_received_stream(&endpoint_a, total_packets, true);
assert_hook_removed(&endpoint_b, STREAM_HOOK_ID);
}
#[test]
fn failed_final_stream_route_keeps_hook_and_retries() {
let (mut endpoint_a, mut endpoint_b) = stream_endpoints(1);
deliver_stream_request(&mut endpoint_a, &mut endpoint_b);
endpoint_b.connections.remove(&(ENDPOINT_A, true));
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
assert_received_stream(&endpoint_a, 0, false);
assert_hook_present(&endpoint_b, STREAM_HOOK_ID);
endpoint_b.connections.insert((ENDPOINT_A, true));
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
assert_received_stream(&endpoint_a, 1, true);
assert_hook_removed(&endpoint_b, STREAM_HOOK_ID);
}
+178
View File
@@ -0,0 +1,178 @@
use crate::protocol::{Endpoint, Leaf, Packet};
use alloc::{vec, vec::Vec};
use crossbeam_channel::{Receiver, Sender};
pub(super) const ENDPOINT_A: u32 = 0;
pub(super) const ENDPOINT_B: u32 = 1;
pub(super) const ENDPOINT_C: u32 = 2;
const LEAF_CONTROLLER: u32 = 100;
const LEAF_COMMS: u32 = 101;
const LEAF_RESPONDER: u32 = 102;
/// Builds a test packet whose route is the only field varied by routing tests.
///
/// Keeping the payload stable makes each assertion about endpoint behavior rather
/// than packet construction, which is important because forged and malformed cases
/// should fail before any leaf-level procedure handling would matter.
pub(super) fn echo_packet(path: Vec<u32>, hook_id: u16) -> Packet {
echo_packet_with_end(path, hook_id, false)
}
/// Builds a test packet with an explicit hook-lifetime marker.
pub(super) fn echo_packet_with_end(path: Vec<u32>, hook_id: u16, end_hook: bool) -> Packet {
Packet {
hook_id,
end_hook,
path,
procedure_id: 1,
data: "ABC123".as_bytes().to_vec(),
}
}
/// Creates a bare endpoint at a known absolute path.
///
/// Most routing tests do not need leaves; they only need the endpoint's local path,
/// connection table, and hook table. This helper keeps that setup explicit without
/// hiding the routing state that each test is validating.
pub(super) fn endpoint_at(id: u32, path: Vec<u32>) -> Endpoint {
let mut endpoint = Endpoint::new(id, vec![]);
endpoint.path = path;
endpoint
}
/// Returns the only outbound packet queued for `next_hop`.
///
/// Routing bugs often show up as packets being sent to the final destination rather
/// than the immediate neighbor. Tests use this helper to assert both that exactly one
/// packet exists and that it was queued for the expected adjacent endpoint.
pub(super) fn single_outbound_packet(endpoint: &Endpoint, next_hop: u32) -> &Packet {
let queue = endpoint
.outbound
.get(&next_hop)
.unwrap_or_else(|| panic!("expected one outbound queue for {next_hop}"));
assert_eq!(queue.len(), 1, "expected exactly one outbound packet");
queue.front().unwrap()
}
/// Returns the only inbound packet delivered to `local_id`.
///
/// Local delivery is intentionally separate from transit forwarding, so the tests
/// assert against the local inbound queue instead of only checking that routing did
/// not produce an error.
pub(super) fn single_inbound_packet(endpoint: &Endpoint, local_id: u32) -> &Packet {
let queue = endpoint
.inbound
.get(&local_id)
.unwrap_or_else(|| panic!("expected one inbound queue for {local_id}"));
assert_eq!(queue.len(), 1, "expected exactly one inbound packet");
queue.front().unwrap()
}
/// Asserts that local hook state still contains `hook_id`.
///
/// Tests use this instead of open-coded map checks so every lifecycle assertion
/// explains the intended routing invariant when it fails.
pub(super) fn assert_hook_present(endpoint: &Endpoint, hook_id: u16) {
assert!(
endpoint.has_hook(hook_id),
"expected hook {hook_id} to remain registered"
);
}
/// Asserts that local hook state no longer contains `hook_id`.
///
/// Upward `end_hook` packets are the only cases that should remove hook state;
/// downward and local packets with the same flag must leave hooks alone.
pub(super) fn assert_hook_removed(endpoint: &Endpoint, hook_id: u16) {
assert!(
!endpoint.has_hook(hook_id),
"expected hook {hook_id} to be cleaned up"
);
}
pub(super) struct ControllerLeaf {
pub(super) has_run: bool,
}
pub(super) struct CommsLeaf {
pub(super) tx: Sender<Vec<u8>>,
pub(super) rx: Receiver<Vec<u8>>,
pub(super) remote_id: u32,
pub(super) is_authority: bool,
pub(super) started: bool,
}
pub(super) struct ResponderLeaf;
impl Leaf for ControllerLeaf {
fn get_id(&self) -> u32 {
LEAF_CONTROLLER
}
fn update(&mut self, endpoint: &mut Endpoint) {
if !self.has_run {
// The controller starts exactly one request so the end-to-end test can
// assert deterministic routing without accumulating retries.
let hook_id = endpoint.get_hook_id();
let packet = echo_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id);
let _ = endpoint.add_outbound(packet);
self.has_run = true;
}
}
}
impl Leaf for CommsLeaf {
fn get_id(&self) -> u32 {
LEAF_COMMS
}
fn update(&mut self, endpoint: &mut Endpoint) {
if !self.started {
endpoint
.connections
.insert((self.remote_id, self.is_authority));
self.started = true;
}
while !self.rx.is_empty() {
let data = self.rx.recv().unwrap();
// Transport bytes are untrusted. Dropping malformed frames here keeps
// the oneshot harness faithful to a router boundary: invalid wire data
// must not panic or poison later valid packets on the same connection.
if let Ok(packet) = Packet::deserialize(&data) {
let _ = endpoint.add_inbound_from(self.remote_id, packet);
}
}
endpoint.take_outbound_clear(self.remote_id, |packet| {
let data = packet.serialize().unwrap();
let _ = self.tx.send(data);
});
}
}
impl Leaf for ResponderLeaf {
fn get_id(&self) -> u32 {
LEAF_RESPONDER
}
fn update(&mut self, endpoint: &mut Endpoint) {
let local_id = endpoint.path.last().cloned().unwrap_or(0);
let mut packets = Vec::new();
endpoint.take_inbound_clear(local_id, |packet| {
let mut response = echo_packet_with_end(vec![ENDPOINT_A], packet.hook_id, true);
response.hook_id = packet.hook_id;
response.data = packet.data.clone();
packets.push(response);
});
for packet in packets {
let _ = endpoint.add_outbound(packet);
}
}
}
+224
View File
@@ -0,0 +1,224 @@
use alloc::{vec, vec::Vec};
use crate::protocol::{DeserializeError, EndpointError, Packet, SerializeError};
// ── Helpers ───────────────────────────────────────────────────────────────
fn make_packet() -> Packet {
Packet {
hook_id: 42,
end_hook: false,
path: vec![1, 2, 3],
procedure_id: 0xAABB_CCDD,
data: vec![0xDE, 0xAD, 0xBE, 0xEF],
}
}
fn make_packet_flags(end_hook: bool) -> Packet {
Packet {
end_hook,
..make_packet()
}
}
fn body_len_offset(buf: &[u8]) -> usize {
let path_len = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]) as usize;
8 + (path_len * 4)
}
fn procedure_id_offset(buf: &[u8]) -> usize {
body_len_offset(buf) + 4
}
// ── Round-trip ────────────────────────────────────────────────────────────
#[test]
fn full_round_trip() {
let packet = make_packet();
let buf = packet.serialize().unwrap();
let result = Packet::deserialize(&buf).unwrap();
assert_eq!(result.hook_id, packet.hook_id);
assert_eq!(result.end_hook, packet.end_hook);
assert_eq!(result.path, packet.path);
assert_eq!(result.procedure_id, packet.procedure_id);
assert_eq!(result.data, packet.data);
}
#[test]
fn procedure_id_is_fixed_width_u32() {
let packet = make_packet();
let buf = packet.serialize().unwrap();
let proc_offset = procedure_id_offset(&buf);
assert_eq!(
&buf[proc_offset..proc_offset + 4],
&packet.procedure_id.to_le_bytes()
);
assert_eq!(&buf[proc_offset + 4..], packet.data.as_slice());
}
// ── Flags ─────────────────────────────────────────────────────────────────
#[test]
fn flags_end_hook_false() {
let packet = make_packet_flags(false);
let result = Packet::deserialize(&packet.serialize().unwrap()).unwrap();
assert!(!result.end_hook);
}
#[test]
fn flags_end_hook_true() {
let packet = make_packet_flags(true);
let result = Packet::deserialize(&packet.serialize().unwrap()).unwrap();
assert!(result.end_hook);
}
// ── Empty fields ──────────────────────────────────────────────────────────
#[test]
fn empty_path() {
let packet = Packet {
path: vec![],
..make_packet()
};
let result = Packet::deserialize(&packet.serialize().unwrap()).unwrap();
assert_eq!(result.path, &[] as &[u32]);
}
#[test]
fn zero_procedure_id() {
let packet = Packet {
procedure_id: 0,
..make_packet()
};
let result = Packet::deserialize(&packet.serialize().unwrap()).unwrap();
assert_eq!(result.procedure_id, 0);
}
#[test]
fn empty_data() {
let packet = Packet {
data: vec![],
..make_packet()
};
let result = Packet::deserialize(&packet.serialize().unwrap()).unwrap();
assert_eq!(result.data, &[] as &[u8]);
}
#[test]
fn all_fields_empty() {
let packet = Packet {
hook_id: 0,
end_hook: false,
path: vec![],
procedure_id: 0,
data: vec![],
};
let result = Packet::deserialize(&packet.serialize().unwrap()).unwrap();
assert_eq!(result.hook_id, 0);
assert_eq!(result.path, Vec::<u32>::new());
assert_eq!(result.procedure_id, 0);
assert_eq!(result.data, &[] as &[u8]);
}
// ── Truncation / corruption ───────────────────────────────────────────────
#[test]
fn truncated_in_fixed_prefix() {
let buf = make_packet().serialize().unwrap();
// Cut inside the fixed 8-byte prefix.
assert_eq!(
Packet::deserialize(&buf[..4]).unwrap_err(),
DeserializeError::BufferTooShort
);
}
#[test]
fn truncated_in_path() {
let buf = make_packet().serialize().unwrap();
// Cut to just past the fixed prefix, mid-path.
assert_eq!(
Packet::deserialize(&buf[..9]).unwrap_err(),
DeserializeError::BufferTooShort
);
}
#[test]
fn truncated_before_body_len() {
let buf = make_packet().serialize().unwrap();
let body_len_offset = body_len_offset(&buf);
assert_eq!(
Packet::deserialize(&buf[..body_len_offset + 2]).unwrap_err(),
DeserializeError::BufferTooShort
);
}
#[test]
fn truncated_in_body() {
let buf = make_packet().serialize().unwrap();
// Remove last byte — well into the body.
assert_eq!(
Packet::deserialize(&buf[..buf.len() - 1]).unwrap_err(),
DeserializeError::BodyLengthMismatch
);
}
#[test]
fn empty_buffer_rejected() {
assert_eq!(
Packet::deserialize(&[]).unwrap_err(),
DeserializeError::BufferTooShort
);
}
#[test]
fn body_length_mismatch_is_rejected() {
let mut buf = make_packet().serialize().unwrap();
let body_len_offset = body_len_offset(&buf);
let inflated_body_len = 999u32;
buf[body_len_offset..body_len_offset + 4].copy_from_slice(&inflated_body_len.to_le_bytes());
assert_eq!(
Packet::deserialize(&buf).unwrap_err(),
DeserializeError::BodyLengthMismatch
);
}
#[test]
fn body_too_short_for_procedure_id_is_rejected() {
let mut buf = make_packet().serialize().unwrap();
let body_len_offset = body_len_offset(&buf);
let short_body_len = 3u32;
buf[body_len_offset..body_len_offset + 4].copy_from_slice(&short_body_len.to_le_bytes());
assert_eq!(
Packet::deserialize(&buf).unwrap_err(),
DeserializeError::BufferTooShort
);
}
#[test]
fn serialize_error_wraps_into_endpoint_error() {
let error: EndpointError = SerializeError::BodyTooLarge.into();
assert_eq!(
error,
EndpointError::PacketSerialize {
source: SerializeError::BodyTooLarge,
}
);
}
#[test]
fn deserialize_error_wraps_into_endpoint_error() {
let error: EndpointError = DeserializeError::BufferTooShort.into();
assert_eq!(
error,
EndpointError::PacketDeserialize {
source: DeserializeError::BufferTooShort,
}
);
}