diff --git a/unshell-protocol/src/lib.rs b/unshell-protocol/src/lib.rs index f8bfbd2..184bbad 100644 --- a/unshell-protocol/src/lib.rs +++ b/unshell-protocol/src/lib.rs @@ -34,6 +34,7 @@ type RouteMap = BTreeMap; #[cfg(test)] mod tests { + mod merkle_sync; mod oneshot; mod packet; } diff --git a/unshell-protocol/src/tests/merkle_sync/codec.rs b/unshell-protocol/src/tests/merkle_sync/codec.rs new file mode 100644 index 0000000..83c4047 --- /dev/null +++ b/unshell-protocol/src/tests/merkle_sync/codec.rs @@ -0,0 +1,79 @@ +use alloc::vec::Vec; + +use super::tree::{BlockChunk, ChildKind, ChildSummary}; + +/// Encodes one `u32` request or response payload. +pub(super) fn encode_u32(value: u32) -> Vec { + value.to_le_bytes().to_vec() +} + +/// Decodes one exact `u32` payload. +pub(super) fn decode_u32(data: &[u8]) -> Option { + if data.len() == 4 { + Some(read_u32(data, 0)) + } else { + None + } +} + +/// Encodes one streamed child hash entry. +pub(super) fn encode_child_summary(summary: ChildSummary) -> Vec { + let mut data = Vec::with_capacity(12); + data.extend_from_slice(&summary.id.to_le_bytes()); + data.extend_from_slice(&summary.kind.discriminant().to_le_bytes()); + data.extend_from_slice(&summary.hash.to_le_bytes()); + data +} + +/// Decodes one streamed child hash entry. +pub(super) fn decode_child_summary(data: &[u8]) -> Option { + if data.len() != 12 { + return None; + } + + Some(ChildSummary { + id: read_u32(data, 0), + kind: ChildKind::from_discriminant(read_u32(data, 4))?, + hash: read_u32(data, 8), + }) +} + +/// Encodes one streamed block chunk. +pub(super) fn encode_block_chunk(chunk: &BlockChunk) -> Vec { + let mut data = Vec::with_capacity(16 + chunk.data.len()); + data.extend_from_slice(&chunk.block_id.to_le_bytes()); + data.extend_from_slice(&chunk.index.to_le_bytes()); + data.extend_from_slice(&chunk.total.to_le_bytes()); + data.extend_from_slice(&(chunk.data.len() as u32).to_le_bytes()); + data.extend_from_slice(&chunk.data); + data +} + +/// Decodes one streamed block chunk. +pub(super) fn decode_block_chunk(data: &[u8]) -> Option { + if data.len() < 16 { + return None; + } + + let len = read_u32(data, 12) as usize; + if data.len() != 16 + len { + return None; + } + + Some(BlockChunk { + block_id: read_u32(data, 0), + index: read_u32(data, 4), + total: read_u32(data, 8), + data: data[16..].to_vec(), + }) +} + +/// Reads a little-endian `u32` at a known-valid offset. +fn read_u32(data: &[u8], offset: usize) -> u32 { + u32::from_le_bytes([ + data[offset], + data[offset + 1], + data[offset + 2], + data[offset + 3], + ]) +} diff --git a/unshell-protocol/src/tests/merkle_sync/constants.rs b/unshell-protocol/src/tests/merkle_sync/constants.rs new file mode 100644 index 0000000..72ccca7 --- /dev/null +++ b/unshell-protocol/src/tests/merkle_sync/constants.rs @@ -0,0 +1,27 @@ +//! Shared ids for the Merkle sync protocol test. +//! +//! Keeping ids in one file makes the manually managed leaf state easier to audit +//! and mirrors the table a future leaf-state macro would generate from annotated +//! RPC definitions. + +pub(super) const ENDPOINT_CALLER: u32 = 0; +pub(super) const ENDPOINT_RESPONDENT: u32 = 1; + +pub(super) const LEAF_MERKLE_CALLER: u32 = 300; +pub(super) const LEAF_MERKLE_RESPONDENT: u32 = 301; +pub(super) const LEAF_MOCK_CONNECTION: u32 = 302; + +pub(super) const PROC_GET_ROOT_HASH: u32 = 10; +pub(super) const PROC_GET_CHILD_HASHES: u32 = 11; +pub(super) const PROC_GET_BLOCK_STREAM: u32 = 12; +pub(super) const PROC_ROOT_HASH: u32 = 20; +pub(super) const PROC_CHILD_HASH_ENTRY: u32 = 21; +pub(super) const PROC_BLOCK_CHUNK: u32 = 22; + +pub(super) const ROOT_NODE: u32 = 0; +pub(super) const BRANCH_LEFT: u32 = 1; +pub(super) const BRANCH_RIGHT: u32 = 2; +pub(super) const BLOCK_ALPHA: u32 = 10; +pub(super) const BLOCK_BRAVO: u32 = 11; +pub(super) const BLOCK_CHARLIE: u32 = 20; +pub(super) const BLOCK_DELTA: u32 = 21; diff --git a/unshell-protocol/src/tests/merkle_sync/harness.rs b/unshell-protocol/src/tests/merkle_sync/harness.rs new file mode 100644 index 0000000..34d544c --- /dev/null +++ b/unshell-protocol/src/tests/merkle_sync/harness.rs @@ -0,0 +1,119 @@ +use alloc::{boxed::Box, rc::Rc, vec}; +use core::cell::RefCell; + +use crate::Endpoint; + +use super::{ + constants::{ENDPOINT_CALLER, ENDPOINT_RESPONDENT}, + leaves::{MerkleCallerLeaf, MerkleRespondentLeaf, MockConnectionLeaf}, + state::{CallerReport, RespondentReport}, + tree::{MerkleStore, local_fixture, remote_fixture}, +}; + +/// Complete two-endpoint Merkle sync test harness. +/// +/// Endpoint A owns the caller leaf and one mock connection leaf. Endpoint B owns the +/// respondent leaf and the opposite mock connection leaf. Reports are shared out of +/// the boxed leaf objects so tests can assert state without downcasting trait +/// objects. +pub(super) struct MerkleHarness { + pub(super) endpoint_a: Endpoint, + pub(super) endpoint_b: Endpoint, + pub(super) caller_report: Rc>, + pub(super) respondent_report: Rc>, + pub(super) remote_root_hash: u32, +} + +impl MerkleHarness { + /// Creates the divergent fixture used by the main sync test. + pub(super) fn divergent() -> Self { + Self::with_stores(local_fixture(), remote_fixture()) + } + + /// Creates a custom caller/respondent fixture. + pub(super) fn with_stores(local: MerkleStore, remote: MerkleStore) -> Self { + let remote_root_hash = remote.root_hash(); + let caller_report = Rc::new(RefCell::new(CallerReport::default())); + let respondent_report = Rc::new(RefCell::new(RespondentReport::default())); + let (tx_a, rx_a) = crossbeam_channel::unbounded(); + let (tx_b, rx_b) = crossbeam_channel::unbounded(); + + let mut endpoint_a = Endpoint::new( + ENDPOINT_CALLER, + vec![ + Box::new(MerkleCallerLeaf::new(local, caller_report.clone())), + Box::new(MockConnectionLeaf::new( + tx_b, + rx_a, + ENDPOINT_RESPONDENT, + false, + )), + ], + ); + endpoint_a.path = vec![ENDPOINT_CALLER]; + + let mut endpoint_b = Endpoint::new( + ENDPOINT_RESPONDENT, + vec![ + Box::new(MerkleRespondentLeaf::new(remote, respondent_report.clone())), + Box::new(MockConnectionLeaf::new(tx_a, rx_b, ENDPOINT_CALLER, true)), + ], + ); + endpoint_b.path = vec![ENDPOINT_CALLER, ENDPOINT_RESPONDENT]; + + // Register routes before the first caller update so initial packet delivery + // does not depend on leaf ordering. + endpoint_a.connections.insert((ENDPOINT_RESPONDENT, false)); + endpoint_b.connections.insert((ENDPOINT_CALLER, true)); + + Self { + endpoint_a, + endpoint_b, + caller_report, + respondent_report, + remote_root_hash, + } + } + + /// Drives one deterministic protocol loop. + pub(super) fn tick(&mut self) { + self.endpoint_a.update(); + self.endpoint_b.update(); + } + + /// Runs until the caller reports completion. + pub(super) fn run_until_done(&mut self, max_ticks: usize) -> usize { + for tick in 1..=max_ticks { + self.tick(); + + if self.caller_report.borrow().done { + return tick; + } + } + + panic!("Merkle sync did not finish within {max_ticks} ticks"); + } + + /// Runs until the respondent has sent at least `target_frames` frames. + pub(super) fn run_until_respondent_frames( + &mut self, + target_frames: usize, + max_ticks: usize, + ) -> usize { + for tick in 1..=max_ticks { + self.tick(); + + if self.respondent_report.borrow().frames_sent >= target_frames { + return tick; + } + } + + panic!("respondent did not send {target_frames} frames within {max_ticks} ticks"); + } + + /// Verifies the requested four-leaf topology. + pub(super) fn assert_four_leaf_topology(&self) { + assert_eq!(self.endpoint_a.leaves.len(), 2); + assert_eq!(self.endpoint_b.leaves.len(), 2); + } +} diff --git a/unshell-protocol/src/tests/merkle_sync/leaves.rs b/unshell-protocol/src/tests/merkle_sync/leaves.rs new file mode 100644 index 0000000..0803972 --- /dev/null +++ b/unshell-protocol/src/tests/merkle_sync/leaves.rs @@ -0,0 +1,404 @@ +use alloc::{collections::VecDeque, rc::Rc, vec, vec::Vec}; +use core::cell::RefCell; + +use crossbeam_channel::{Receiver, Sender}; + +use crate::{Endpoint, Leaf, Packet}; + +use super::{ + codec::{decode_block_chunk, decode_child_summary, decode_u32}, + constants::{ + ENDPOINT_CALLER, ENDPOINT_RESPONDENT, LEAF_MERKLE_CALLER, LEAF_MERKLE_RESPONDENT, + LEAF_MOCK_CONNECTION, PROC_BLOCK_CHUNK, PROC_CHILD_HASH_ENTRY, PROC_GET_BLOCK_STREAM, + PROC_GET_CHILD_HASHES, PROC_GET_ROOT_HASH, PROC_ROOT_HASH, ROOT_NODE, + }, + rpc::{ + block_chunk_frame, block_stream_request, child_hash_frame, child_hashes_request, + root_hash_frame, root_hash_request, + }, + state::{CallerPhase, CallerReport, RespondentReport, ResponseStream}, + tree::{BlockChunk, ChildKind, MerkleStore}, +}; + +/// Leaf that simulates a serialized transport connection with crossbeam channels. +/// +/// This is intentionally tiny and reusable. Both endpoints in the Merkle test have +/// exactly one of these leaves, giving the requested four-leaf topology: caller, +/// respondent, and two mock connections. +pub(super) struct MockConnectionLeaf { + pub(super) tx: Sender>, + pub(super) rx: Receiver>, + pub(super) remote_id: u32, + pub(super) is_authority: bool, + pub(super) started: bool, +} + +/// Caller leaf that drives the Merkle synchronization algorithm. +pub(super) struct MerkleCallerLeaf { + local: MerkleStore, + phase: CallerPhase, + pending_nodes: VecDeque, + pending_blocks: VecDeque, + report: Rc>, +} + +/// Respondent leaf that serves Merkle hash and block streams. +pub(super) struct MerkleRespondentLeaf { + remote: MerkleStore, + active_stream: Option, + report: Rc>, +} + +impl MockConnectionLeaf { + /// Creates one side of a mock connection. + pub(super) fn new( + tx: Sender>, + rx: Receiver>, + remote_id: u32, + is_authority: bool, + ) -> Self { + Self { + tx, + rx, + remote_id, + is_authority, + started: false, + } + } +} + +impl MerkleCallerLeaf { + /// Creates a caller with a local store and externally visible report. + pub(super) fn new(local: MerkleStore, report: Rc>) -> Self { + Self { + local, + phase: CallerPhase::NeedRoot, + pending_nodes: VecDeque::new(), + pending_blocks: VecDeque::new(), + report, + } + } +} + +impl MerkleRespondentLeaf { + /// Creates a respondent backed by the authoritative remote store. + pub(super) fn new(remote: MerkleStore, report: Rc>) -> Self { + Self { + remote, + active_stream: None, + report, + } + } +} + +impl Leaf for MockConnectionLeaf { + fn get_id(&self) -> u32 { + LEAF_MOCK_CONNECTION + } + + fn update(&mut self, endpoint: &mut Endpoint) { + if !self.started { + endpoint + .connections + .insert((self.remote_id, self.is_authority)); + self.started = true; + } + + while !self.rx.is_empty() { + let data = self.rx.recv().unwrap(); + + // Mock transports move untrusted bytes. Malformed frames are dropped so + // the sync state machine is tested only after packet parsing succeeds. + if let Ok(packet) = Packet::deserialize(&data) { + let _ = endpoint.add_inbound(packet); + } + } + + endpoint.take_outbound_clear(self.remote_id, |packet| { + let data = packet.serialize().unwrap(); + let _ = self.tx.send(data); + }); + } +} + +impl Leaf for MerkleCallerLeaf { + fn get_id(&self) -> u32 { + LEAF_MERKLE_CALLER + } + + fn update(&mut self, endpoint: &mut Endpoint) { + self.receive_responses(endpoint); + self.dispatch_next_request(endpoint); + } +} + +impl Leaf for MerkleRespondentLeaf { + fn get_id(&self) -> u32 { + LEAF_MERKLE_RESPONDENT + } + + fn update(&mut self, endpoint: &mut Endpoint) { + self.open_stream_from_request(endpoint); + self.send_one_response_frame(endpoint); + } +} + +impl MerkleCallerLeaf { + /// Consumes all response packets currently delivered to endpoint A. + fn receive_responses(&mut self, endpoint: &mut Endpoint) { + endpoint.take_inbound_clear(ENDPOINT_CALLER, |packet| { + self.report + .borrow_mut() + .received_procedures + .push(packet.procedure_id); + self.handle_response_packet(packet); + }); + } + + /// Handles one response packet according to the current caller phase. + fn handle_response_packet(&mut self, packet: &Packet) { + match &mut self.phase { + CallerPhase::AwaitRoot { hook_id } => { + assert_eq!(packet.hook_id, *hook_id); + assert_eq!(packet.procedure_id, PROC_ROOT_HASH); + let remote_root = decode_u32(&packet.data).expect("root hash payload"); + + if packet.end_hook { + self.finish_root_response(remote_root); + } + } + CallerPhase::AwaitChildren { + hook_id, + node_id: _, + entries, + } => { + assert_eq!(packet.hook_id, *hook_id); + assert_eq!(packet.procedure_id, PROC_CHILD_HASH_ENTRY); + entries.push(decode_child_summary(&packet.data).expect("child summary payload")); + + if packet.end_hook { + self.finish_child_response(); + } + } + CallerPhase::AwaitBlock { + hook_id, + block_id: _, + chunks, + } => { + assert_eq!(packet.hook_id, *hook_id); + assert_eq!(packet.procedure_id, PROC_BLOCK_CHUNK); + chunks.push(decode_block_chunk(&packet.data).expect("block chunk payload")); + + if packet.end_hook { + self.finish_block_response(); + } + } + CallerPhase::NeedRoot | CallerPhase::Ready | CallerPhase::Done => { + panic!("unexpected Merkle response in phase {:?}", self.phase); + } + } + } + + /// Applies the completed root response and decides whether tree walking is needed. + fn finish_root_response(&mut self, remote_root: u32) { + if self.local.root_hash() == remote_root { + self.mark_done(); + } else { + self.pending_nodes.push_back(ROOT_NODE); + self.phase = CallerPhase::Ready; + } + } + + /// Applies a completed child-hash stream. + fn finish_child_response(&mut self) { + let CallerPhase::AwaitChildren { + hook_id: _, + node_id: _, + entries, + } = core::mem::replace(&mut self.phase, CallerPhase::Ready) + else { + unreachable!(); + }; + + for entry in entries { + if self.local.hash_for(entry.kind, entry.id) == entry.hash { + continue; + } + + match entry.kind { + ChildKind::Branch => self.pending_nodes.push_back(entry.id), + ChildKind::Block => self.pending_blocks.push_back(entry.id), + } + } + } + + /// Applies a completed block stream to the local store. + fn finish_block_response(&mut self) { + let CallerPhase::AwaitBlock { + hook_id: _, + block_id, + mut chunks, + } = core::mem::replace(&mut self.phase, CallerPhase::Ready) + else { + unreachable!(); + }; + + chunks.sort_by_key(|chunk| chunk.index); + assert_eq!( + chunks.len(), + chunks.first().map(|chunk| chunk.total).unwrap_or(0) as usize + ); + + let new_chunks: Vec> = chunks.into_iter().map(|chunk| chunk.data).collect(); + self.local.replace_block(block_id, new_chunks.clone()); + + let mut report = self.report.borrow_mut(); + report.synchronized_blocks.push(block_id); + report.applied_block_chunks.push((block_id, new_chunks)); + } + + /// Sends the next request if the caller is not waiting on a response stream. + fn dispatch_next_request(&mut self, endpoint: &mut Endpoint) { + match self.phase { + CallerPhase::NeedRoot => { + let hook_id = self.send_request(endpoint, PROC_GET_ROOT_HASH, Vec::new()); + endpoint.add_outbound(root_hash_request(hook_id)).unwrap(); + self.phase = CallerPhase::AwaitRoot { hook_id }; + } + CallerPhase::Ready => { + if let Some(node_id) = self.pending_nodes.pop_front() { + let hook_id = self.send_request(endpoint, PROC_GET_CHILD_HASHES, Vec::new()); + endpoint + .add_outbound(child_hashes_request(hook_id, node_id)) + .unwrap(); + self.phase = CallerPhase::AwaitChildren { + hook_id, + node_id, + entries: Vec::new(), + }; + } else if let Some(block_id) = self.pending_blocks.pop_front() { + let hook_id = self.send_request(endpoint, PROC_GET_BLOCK_STREAM, Vec::new()); + endpoint + .add_outbound(block_stream_request(hook_id, block_id)) + .unwrap(); + self.phase = CallerPhase::AwaitBlock { + hook_id, + block_id, + chunks: Vec::new(), + }; + } else { + self.mark_done(); + } + } + CallerPhase::AwaitRoot { .. } + | CallerPhase::AwaitChildren { .. } + | CallerPhase::AwaitBlock { .. } + | CallerPhase::Done => {} + } + } + + /// Reserves a hook id and records the logical RPC request. + fn send_request(&mut self, endpoint: &mut Endpoint, procedure_id: u32, _data: Vec) -> u16 { + let hook_id = endpoint.get_hook_id(); + self.report + .borrow_mut() + .requested_procedures + .push(procedure_id); + hook_id + } + + /// Marks the synchronization complete and records the final local root. + fn mark_done(&mut self) { + self.phase = CallerPhase::Done; + let mut report = self.report.borrow_mut(); + report.done = true; + report.final_root_hash = Some(self.local.root_hash()); + } +} + +impl MerkleRespondentLeaf { + /// Opens one response stream from the first pending local request. + fn open_stream_from_request(&mut self, endpoint: &mut Endpoint) { + if self.active_stream.is_some() { + return; + } + + let mut request = None; + endpoint.take_inbound_clear(ENDPOINT_RESPONDENT, |packet| { + if request.is_none() { + request = Some((packet.hook_id, packet.procedure_id, packet.data.clone())); + } + }); + + let Some((hook_id, procedure_id, data)) = request else { + return; + }; + + let frames = self.frames_for_request(procedure_id, &data); + endpoint.hooks.insert(hook_id, ENDPOINT_CALLER); + + self.report.borrow_mut().requests_seen.push(procedure_id); + if !frames.is_empty() { + self.report.borrow_mut().streams_started += 1; + self.active_stream = Some(ResponseStream::new(hook_id, frames)); + } + } + + /// Builds response frames for one request procedure. + fn frames_for_request(&self, procedure_id: u32, data: &[u8]) -> Vec { + match procedure_id { + PROC_GET_ROOT_HASH => vec![root_hash_frame(self.remote.root_hash())], + PROC_GET_CHILD_HASHES => { + let node_id = decode_u32(data).expect("child hash request node id"); + self.remote + .child_summaries(node_id) + .into_iter() + .map(child_hash_frame) + .collect() + } + PROC_GET_BLOCK_STREAM => { + let block_id = decode_u32(data).expect("block stream request block id"); + let chunks = self.remote.block_chunks(block_id); + let total = chunks.len() as u32; + chunks + .into_iter() + .enumerate() + .map(|(index, data)| { + block_chunk_frame(BlockChunk { + block_id, + index: index as u32, + total, + data, + }) + }) + .collect() + } + _ => Vec::new(), + } + } + + /// Sends at most one response frame per update loop. + fn send_one_response_frame(&mut self, endpoint: &mut Endpoint) { + let Some(stream) = self.active_stream.as_mut() else { + return; + }; + + if stream.is_empty() { + self.active_stream = None; + return; + } + + let packet = stream.next_packet().expect("active stream frame"); + if endpoint.add_outbound(packet).is_err() { + return; + } + + self.report.borrow_mut().frames_sent += 1; + stream.advance(); + + if stream.is_complete() { + self.report.borrow_mut().streams_completed += 1; + self.active_stream = None; + } + } +} diff --git a/unshell-protocol/src/tests/merkle_sync/mod.rs b/unshell-protocol/src/tests/merkle_sync/mod.rs new file mode 100644 index 0000000..7098fdf --- /dev/null +++ b/unshell-protocol/src/tests/merkle_sync/mod.rs @@ -0,0 +1,8 @@ +mod codec; +mod constants; +mod harness; +mod leaves; +mod rpc; +mod state; +mod tests; +mod tree; diff --git a/unshell-protocol/src/tests/merkle_sync/rpc.rs b/unshell-protocol/src/tests/merkle_sync/rpc.rs new file mode 100644 index 0000000..01440e0 --- /dev/null +++ b/unshell-protocol/src/tests/merkle_sync/rpc.rs @@ -0,0 +1,86 @@ +use alloc::{vec, vec::Vec}; + +use crate::Packet; + +use super::{ + codec::{encode_block_chunk, encode_child_summary, encode_u32}, + constants::{ + ENDPOINT_CALLER, ENDPOINT_RESPONDENT, PROC_BLOCK_CHUNK, PROC_CHILD_HASH_ENTRY, + PROC_GET_BLOCK_STREAM, PROC_GET_CHILD_HASHES, PROC_GET_ROOT_HASH, PROC_ROOT_HASH, + }, + tree::{BlockChunk, ChildSummary}, +}; + +/// One outbound response frame before it is wrapped in endpoint routing fields. +/// +/// A response stream owns a list of these frames and asks each frame to become a +/// packet only when the loop is ready to send it. That keeps retry behavior simple: +/// a failed send does not consume the frame. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(super) struct OutgoingFrame { + procedure_id: u32, + data: Vec, +} + +impl OutgoingFrame { + /// Wraps the frame in an upward packet for `hook_id`. + pub(super) fn to_packet(&self, hook_id: u16, end_hook: bool) -> Packet { + Packet { + hook_id, + end_hook, + path: vec![ENDPOINT_CALLER], + procedure_id: self.procedure_id, + data: self.data.clone(), + } + } +} + +/// Builds the initial root-hash request. +pub(super) fn root_hash_request(hook_id: u16) -> Packet { + request_packet(PROC_GET_ROOT_HASH, hook_id, Vec::new()) +} + +/// Builds a request for one branch node's child hashes. +pub(super) fn child_hashes_request(hook_id: u16, node_id: u32) -> Packet { + request_packet(PROC_GET_CHILD_HASHES, hook_id, encode_u32(node_id)) +} + +/// Builds a request for one mismatched block's data stream. +pub(super) fn block_stream_request(hook_id: u16, block_id: u32) -> Packet { + request_packet(PROC_GET_BLOCK_STREAM, hook_id, encode_u32(block_id)) +} + +/// Builds a single root-hash response frame. +pub(super) fn root_hash_frame(root_hash: u32) -> OutgoingFrame { + OutgoingFrame { + procedure_id: PROC_ROOT_HASH, + data: encode_u32(root_hash), + } +} + +/// Builds one streamed child hash entry response frame. +pub(super) fn child_hash_frame(summary: ChildSummary) -> OutgoingFrame { + OutgoingFrame { + procedure_id: PROC_CHILD_HASH_ENTRY, + data: encode_child_summary(summary), + } +} + +/// Builds one streamed block chunk response frame. +pub(super) fn block_chunk_frame(chunk: BlockChunk) -> OutgoingFrame { + OutgoingFrame { + procedure_id: PROC_BLOCK_CHUNK, + data: encode_block_chunk(&chunk), + } +} + +/// Builds a downward request packet. +fn request_packet(procedure_id: u32, hook_id: u16, data: Vec) -> Packet { + Packet { + hook_id, + end_hook: true, + path: vec![ENDPOINT_CALLER, ENDPOINT_RESPONDENT], + procedure_id, + data, + } +} diff --git a/unshell-protocol/src/tests/merkle_sync/state.rs b/unshell-protocol/src/tests/merkle_sync/state.rs new file mode 100644 index 0000000..92a8cea --- /dev/null +++ b/unshell-protocol/src/tests/merkle_sync/state.rs @@ -0,0 +1,98 @@ +use alloc::vec::Vec; + +use super::{ + rpc::OutgoingFrame, + tree::{BlockChunk, ChildSummary}, +}; + +/// Caller-side synchronization phase. +/// +/// This is the manual state machine a future macro should be able to derive from +/// RPC declarations. Each awaiting state owns the partial stream it is collecting, +/// making it clear which packets are legal at each step. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(super) enum CallerPhase { + NeedRoot, + AwaitRoot { + hook_id: u16, + }, + Ready, + AwaitChildren { + hook_id: u16, + node_id: u32, + entries: Vec, + }, + AwaitBlock { + hook_id: u16, + block_id: u32, + chunks: Vec, + }, + Done, +} + +/// Test-visible caller observations. +/// +/// The leaf itself lives behind `Box`, so the harness keeps a shared +/// report handle for assertions without needing downcasts. +#[derive(Debug, Default)] +pub(super) struct CallerReport { + pub(super) done: bool, + pub(super) requested_procedures: Vec, + pub(super) received_procedures: Vec, + pub(super) synchronized_blocks: Vec, + pub(super) applied_block_chunks: Vec<(u32, Vec>)>, + pub(super) final_root_hash: Option, +} + +/// Test-visible respondent observations. +#[derive(Debug, Default)] +pub(super) struct RespondentReport { + pub(super) requests_seen: Vec, + pub(super) streams_started: usize, + pub(super) streams_completed: usize, + pub(super) frames_sent: usize, +} + +/// Respondent-owned response stream. +/// +/// It stores encoded frames and exposes packet construction one frame at a time. +/// Since `next_packet` does not advance, a failed route can be retried by calling it +/// again on the next loop. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(super) struct ResponseStream { + hook_id: u16, + frames: Vec, + next_index: usize, +} + +impl ResponseStream { + /// Creates a response stream for one request hook. + pub(super) fn new(hook_id: u16, frames: Vec) -> Self { + Self { + hook_id, + frames, + next_index: 0, + } + } + + /// Builds the next packet without advancing the stream. + pub(super) fn next_packet(&self) -> Option { + let frame = self.frames.get(self.next_index)?; + Some(frame.to_packet(self.hook_id, self.next_index + 1 == self.frames.len())) + } + + /// Marks the current frame as successfully sent. + pub(super) fn advance(&mut self) { + self.next_index += 1; + } + + /// Returns true once every frame has been sent. + pub(super) fn is_complete(&self) -> bool { + self.next_index >= self.frames.len() + } + + /// Returns true when the request generated no frames. + pub(super) fn is_empty(&self) -> bool { + self.frames.is_empty() + } +} diff --git a/unshell-protocol/src/tests/merkle_sync/tests.rs b/unshell-protocol/src/tests/merkle_sync/tests.rs new file mode 100644 index 0000000..ecd97bb --- /dev/null +++ b/unshell-protocol/src/tests/merkle_sync/tests.rs @@ -0,0 +1,78 @@ +use super::{ + constants::{ + BLOCK_BRAVO, BLOCK_CHARLIE, PROC_GET_BLOCK_STREAM, PROC_GET_CHILD_HASHES, + PROC_GET_ROOT_HASH, + }, + harness::MerkleHarness, + tree::remote_fixture, +}; + +#[test] +fn merkle_sync_walks_hash_tree_and_streams_changed_blocks() { + let mut harness = MerkleHarness::divergent(); + harness.assert_four_leaf_topology(); + + let ticks = harness.run_until_done(100); + assert!( + ticks > 20, + "sync should require many request/stream iterations" + ); + + let caller = harness.caller_report.borrow(); + assert_eq!(caller.final_root_hash, Some(harness.remote_root_hash)); + assert_eq!(caller.synchronized_blocks, [BLOCK_BRAVO, BLOCK_CHARLIE]); + assert_eq!( + caller.requested_procedures, + [ + PROC_GET_ROOT_HASH, + PROC_GET_CHILD_HASHES, + PROC_GET_CHILD_HASHES, + PROC_GET_CHILD_HASHES, + PROC_GET_BLOCK_STREAM, + PROC_GET_BLOCK_STREAM, + ] + ); + + let respondent = harness.respondent_report.borrow(); + assert_eq!(respondent.requests_seen, caller.requested_procedures); + assert_eq!(respondent.streams_started, 6); + assert_eq!(respondent.streams_completed, 6); + assert_eq!(respondent.frames_sent, 12); + assert!(harness.endpoint_b.hooks.is_empty()); +} + +#[test] +fn identical_tree_stops_after_root_hash() { + let remote = remote_fixture(); + let mut harness = MerkleHarness::with_stores(remote.clone(), remote); + + harness.run_until_done(20); + + let caller = harness.caller_report.borrow(); + assert_eq!(caller.final_root_hash, Some(harness.remote_root_hash)); + assert_eq!(caller.requested_procedures, [PROC_GET_ROOT_HASH]); + assert!(caller.synchronized_blocks.is_empty()); + + let respondent = harness.respondent_report.borrow(); + assert_eq!(respondent.frames_sent, 1); + assert_eq!(respondent.streams_started, 1); + assert_eq!(respondent.streams_completed, 1); +} + +#[test] +fn block_stream_hook_persists_until_final_frame() { + let mut harness = MerkleHarness::divergent(); + + harness.run_until_respondent_frames(8, 100); + assert_eq!( + harness.endpoint_b.hooks.len(), + 1, + "first block stream should keep its hook after a non-final chunk" + ); + + harness.run_until_done(100); + assert!( + harness.endpoint_b.hooks.is_empty(), + "final block stream packet should clean respondent hook state" + ); +} diff --git a/unshell-protocol/src/tests/merkle_sync/tree.rs b/unshell-protocol/src/tests/merkle_sync/tree.rs new file mode 100644 index 0000000..b0c8142 --- /dev/null +++ b/unshell-protocol/src/tests/merkle_sync/tree.rs @@ -0,0 +1,255 @@ +use alloc::{collections::BTreeMap, vec, vec::Vec}; + +use super::constants::{ + BLOCK_ALPHA, BLOCK_BRAVO, BLOCK_CHARLIE, BLOCK_DELTA, BRANCH_LEFT, BRANCH_RIGHT, ROOT_NODE, +}; + +/// Type of child referenced by a Merkle node summary. +/// +/// The sync caller uses this to decide whether a mismatched child should recurse +/// with `GET_CHILD_HASHES` or transfer data with `GET_BLOCK_STREAM`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum ChildKind { + Branch, + Block, +} + +/// One child entry in a streamed Merkle summary response. +/// +/// A respondent streams these one per loop. The caller compares each `hash` with +/// its local store and queues either another node walk or a block transfer. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) struct ChildSummary { + pub(super) id: u32, + pub(super) kind: ChildKind, + pub(super) hash: u32, +} + +/// One chunk in a streamed block response. +/// +/// Chunks carry their total so the caller can replace the local block only after +/// the final stream packet arrives. This keeps partially received data out of the +/// Merkle hash until the hook completes. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(super) struct BlockChunk { + pub(super) block_id: u32, + pub(super) index: u32, + pub(super) total: u32, + pub(super) data: Vec, +} + +/// Static edge in the test Merkle tree. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct TreeChild { + id: u32, + kind: ChildKind, +} + +/// In-memory Merkle store used by the caller and respondent leaves. +/// +/// This is deliberately small but extensible: adding wider trees, extra branches, +/// or different block chunking only changes this store, not the endpoint routing +/// harness. The hash is not cryptographic; it is deterministic test content used to +/// exercise the protocol state machine. +#[derive(Debug, Clone)] +pub(super) struct MerkleStore { + root_id: u32, + children: BTreeMap>, + blocks: BTreeMap>>, +} + +impl MerkleStore { + /// Creates an empty store with the standard root id. + fn new() -> Self { + Self { + root_id: ROOT_NODE, + children: BTreeMap::new(), + blocks: BTreeMap::new(), + } + } + + /// Returns the deterministic root hash for the current tree contents. + pub(super) fn root_hash(&self) -> u32 { + self.node_hash(self.root_id) + } + + /// Returns child summaries for `node_id` in stable order. + pub(super) fn child_summaries(&self, node_id: u32) -> Vec { + self.children + .get(&node_id) + .map(|children| { + children + .iter() + .map(|child| ChildSummary { + id: child.id, + kind: child.kind, + hash: self.hash_for(child.kind, child.id), + }) + .collect() + }) + .unwrap_or_default() + } + + /// Returns the local hash for a branch or block child. + pub(super) fn hash_for(&self, kind: ChildKind, id: u32) -> u32 { + match kind { + ChildKind::Branch => self.node_hash(id), + ChildKind::Block => self.block_hash(id), + } + } + + /// Returns the stored chunks for a block, preserving stream order. + pub(super) fn block_chunks(&self, block_id: u32) -> Vec> { + self.blocks.get(&block_id).cloned().unwrap_or_default() + } + + /// Replaces one local block after a complete block stream arrives. + pub(super) fn replace_block(&mut self, block_id: u32, chunks: Vec>) { + self.blocks.insert(block_id, chunks); + } + + /// Computes a deterministic hash for a branch node. + fn node_hash(&self, node_id: u32) -> u32 { + let mut hash = mix_u32(0x4E4F_4445, node_id); + + if let Some(children) = self.children.get(&node_id) { + for child in children { + hash = mix_u32(hash, child.id); + hash = mix_u32(hash, child.kind.discriminant()); + hash = mix_u32(hash, self.hash_for(child.kind, child.id)); + } + } + + hash + } + + /// Computes a deterministic hash for a data block. + fn block_hash(&self, block_id: u32) -> u32 { + let mut hash = mix_u32(0x424C_4F43, block_id); + + if let Some(chunks) = self.blocks.get(&block_id) { + for chunk in chunks { + hash = mix_u32(hash, chunk.len() as u32); + hash = hash_bytes(hash, chunk); + } + } + + hash + } +} + +impl ChildKind { + /// Stable wire discriminant for streamed child summaries. + pub(super) fn discriminant(self) -> u32 { + match self { + ChildKind::Branch => 0, + ChildKind::Block => 1, + } + } + + /// Decodes a stable wire discriminant. + pub(super) fn from_discriminant(value: u32) -> Option { + match value { + 0 => Some(Self::Branch), + 1 => Some(Self::Block), + _ => None, + } + } +} + +/// Remote store containing the authoritative content. +pub(super) fn remote_fixture() -> MerkleStore { + let mut store = base_tree(); + store + .blocks + .insert(BLOCK_ALPHA, chunks(&["alpha-", "same"])); + store + .blocks + .insert(BLOCK_BRAVO, chunks(&["bravo-", "remote-", "v2"])); + store + .blocks + .insert(BLOCK_CHARLIE, chunks(&["charlie-", "remote"])); + store.blocks.insert(BLOCK_DELTA, chunks(&["delta-same"])); + store +} + +/// Local store with two stale blocks and two already matching blocks. +pub(super) fn local_fixture() -> MerkleStore { + let mut store = base_tree(); + store + .blocks + .insert(BLOCK_ALPHA, chunks(&["alpha-", "same"])); + store + .blocks + .insert(BLOCK_BRAVO, chunks(&["bravo-", "local-", "v1"])); + store + .blocks + .insert(BLOCK_CHARLIE, chunks(&["charlie-", "local"])); + store.blocks.insert(BLOCK_DELTA, chunks(&["delta-same"])); + store +} + +/// Tree topology shared by the local and remote fixtures. +fn base_tree() -> MerkleStore { + let mut store = MerkleStore::new(); + store.children.insert( + ROOT_NODE, + vec![ + TreeChild { + id: BRANCH_LEFT, + kind: ChildKind::Branch, + }, + TreeChild { + id: BRANCH_RIGHT, + kind: ChildKind::Branch, + }, + ], + ); + store.children.insert( + BRANCH_LEFT, + vec![ + TreeChild { + id: BLOCK_ALPHA, + kind: ChildKind::Block, + }, + TreeChild { + id: BLOCK_BRAVO, + kind: ChildKind::Block, + }, + ], + ); + store.children.insert( + BRANCH_RIGHT, + vec![ + TreeChild { + id: BLOCK_CHARLIE, + kind: ChildKind::Block, + }, + TreeChild { + id: BLOCK_DELTA, + kind: ChildKind::Block, + }, + ], + ); + store +} + +/// Converts string slices into owned byte chunks. +fn chunks(parts: &[&str]) -> Vec> { + parts.iter().map(|part| part.as_bytes().to_vec()).collect() +} + +/// FNV-like byte mixing used only for deterministic test hashes. +fn hash_bytes(mut hash: u32, bytes: &[u8]) -> u32 { + for byte in bytes { + hash ^= u32::from(*byte); + hash = hash.wrapping_mul(16_777_619); + } + + hash +} + +/// Mixes one little-endian integer into the deterministic test hash. +fn mix_u32(hash: u32, value: u32) -> u32 { + hash_bytes(hash, &value.to_le_bytes()) +}