mirror of
https://github.com/Astatin3/unshell.git
synced 2026-06-08 22:38:01 -06:00
Add merkle_sync test
This commit is contained in:
@@ -34,6 +34,7 @@ type RouteMap = BTreeMap<EndpointName, PacketQueue>;
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
mod merkle_sync;
|
||||||
mod oneshot;
|
mod oneshot;
|
||||||
mod packet;
|
mod packet;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,79 @@
|
|||||||
|
use alloc::vec::Vec;
|
||||||
|
|
||||||
|
use super::tree::{BlockChunk, ChildKind, ChildSummary};
|
||||||
|
|
||||||
|
/// Encodes one `u32` request or response payload.
|
||||||
|
pub(super) fn encode_u32(value: u32) -> Vec<u8> {
|
||||||
|
value.to_le_bytes().to_vec()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decodes one exact `u32` payload.
|
||||||
|
pub(super) fn decode_u32(data: &[u8]) -> Option<u32> {
|
||||||
|
if data.len() == 4 {
|
||||||
|
Some(read_u32(data, 0))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Encodes one streamed child hash entry.
|
||||||
|
pub(super) fn encode_child_summary(summary: ChildSummary) -> Vec<u8> {
|
||||||
|
let mut data = Vec::with_capacity(12);
|
||||||
|
data.extend_from_slice(&summary.id.to_le_bytes());
|
||||||
|
data.extend_from_slice(&summary.kind.discriminant().to_le_bytes());
|
||||||
|
data.extend_from_slice(&summary.hash.to_le_bytes());
|
||||||
|
data
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decodes one streamed child hash entry.
|
||||||
|
pub(super) fn decode_child_summary(data: &[u8]) -> Option<ChildSummary> {
|
||||||
|
if data.len() != 12 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(ChildSummary {
|
||||||
|
id: read_u32(data, 0),
|
||||||
|
kind: ChildKind::from_discriminant(read_u32(data, 4))?,
|
||||||
|
hash: read_u32(data, 8),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Encodes one streamed block chunk.
|
||||||
|
pub(super) fn encode_block_chunk(chunk: &BlockChunk) -> Vec<u8> {
|
||||||
|
let mut data = Vec::with_capacity(16 + chunk.data.len());
|
||||||
|
data.extend_from_slice(&chunk.block_id.to_le_bytes());
|
||||||
|
data.extend_from_slice(&chunk.index.to_le_bytes());
|
||||||
|
data.extend_from_slice(&chunk.total.to_le_bytes());
|
||||||
|
data.extend_from_slice(&(chunk.data.len() as u32).to_le_bytes());
|
||||||
|
data.extend_from_slice(&chunk.data);
|
||||||
|
data
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decodes one streamed block chunk.
|
||||||
|
pub(super) fn decode_block_chunk(data: &[u8]) -> Option<BlockChunk> {
|
||||||
|
if data.len() < 16 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let len = read_u32(data, 12) as usize;
|
||||||
|
if data.len() != 16 + len {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(BlockChunk {
|
||||||
|
block_id: read_u32(data, 0),
|
||||||
|
index: read_u32(data, 4),
|
||||||
|
total: read_u32(data, 8),
|
||||||
|
data: data[16..].to_vec(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reads a little-endian `u32` at a known-valid offset.
|
||||||
|
fn read_u32(data: &[u8], offset: usize) -> u32 {
|
||||||
|
u32::from_le_bytes([
|
||||||
|
data[offset],
|
||||||
|
data[offset + 1],
|
||||||
|
data[offset + 2],
|
||||||
|
data[offset + 3],
|
||||||
|
])
|
||||||
|
}
|
||||||
@@ -0,0 +1,27 @@
|
|||||||
|
//! Shared ids for the Merkle sync protocol test.
|
||||||
|
//!
|
||||||
|
//! Keeping ids in one file makes the manually managed leaf state easier to audit
|
||||||
|
//! and mirrors the table a future leaf-state macro would generate from annotated
|
||||||
|
//! RPC definitions.
|
||||||
|
|
||||||
|
pub(super) const ENDPOINT_CALLER: u32 = 0;
|
||||||
|
pub(super) const ENDPOINT_RESPONDENT: u32 = 1;
|
||||||
|
|
||||||
|
pub(super) const LEAF_MERKLE_CALLER: u32 = 300;
|
||||||
|
pub(super) const LEAF_MERKLE_RESPONDENT: u32 = 301;
|
||||||
|
pub(super) const LEAF_MOCK_CONNECTION: u32 = 302;
|
||||||
|
|
||||||
|
pub(super) const PROC_GET_ROOT_HASH: u32 = 10;
|
||||||
|
pub(super) const PROC_GET_CHILD_HASHES: u32 = 11;
|
||||||
|
pub(super) const PROC_GET_BLOCK_STREAM: u32 = 12;
|
||||||
|
pub(super) const PROC_ROOT_HASH: u32 = 20;
|
||||||
|
pub(super) const PROC_CHILD_HASH_ENTRY: u32 = 21;
|
||||||
|
pub(super) const PROC_BLOCK_CHUNK: u32 = 22;
|
||||||
|
|
||||||
|
pub(super) const ROOT_NODE: u32 = 0;
|
||||||
|
pub(super) const BRANCH_LEFT: u32 = 1;
|
||||||
|
pub(super) const BRANCH_RIGHT: u32 = 2;
|
||||||
|
pub(super) const BLOCK_ALPHA: u32 = 10;
|
||||||
|
pub(super) const BLOCK_BRAVO: u32 = 11;
|
||||||
|
pub(super) const BLOCK_CHARLIE: u32 = 20;
|
||||||
|
pub(super) const BLOCK_DELTA: u32 = 21;
|
||||||
@@ -0,0 +1,119 @@
|
|||||||
|
use alloc::{boxed::Box, rc::Rc, vec};
|
||||||
|
use core::cell::RefCell;
|
||||||
|
|
||||||
|
use crate::Endpoint;
|
||||||
|
|
||||||
|
use super::{
|
||||||
|
constants::{ENDPOINT_CALLER, ENDPOINT_RESPONDENT},
|
||||||
|
leaves::{MerkleCallerLeaf, MerkleRespondentLeaf, MockConnectionLeaf},
|
||||||
|
state::{CallerReport, RespondentReport},
|
||||||
|
tree::{MerkleStore, local_fixture, remote_fixture},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Complete two-endpoint Merkle sync test harness.
|
||||||
|
///
|
||||||
|
/// Endpoint A owns the caller leaf and one mock connection leaf. Endpoint B owns the
|
||||||
|
/// respondent leaf and the opposite mock connection leaf. Reports are shared out of
|
||||||
|
/// the boxed leaf objects so tests can assert state without downcasting trait
|
||||||
|
/// objects.
|
||||||
|
pub(super) struct MerkleHarness {
|
||||||
|
pub(super) endpoint_a: Endpoint,
|
||||||
|
pub(super) endpoint_b: Endpoint,
|
||||||
|
pub(super) caller_report: Rc<RefCell<CallerReport>>,
|
||||||
|
pub(super) respondent_report: Rc<RefCell<RespondentReport>>,
|
||||||
|
pub(super) remote_root_hash: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MerkleHarness {
|
||||||
|
/// Creates the divergent fixture used by the main sync test.
|
||||||
|
pub(super) fn divergent() -> Self {
|
||||||
|
Self::with_stores(local_fixture(), remote_fixture())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a custom caller/respondent fixture.
|
||||||
|
pub(super) fn with_stores(local: MerkleStore, remote: MerkleStore) -> Self {
|
||||||
|
let remote_root_hash = remote.root_hash();
|
||||||
|
let caller_report = Rc::new(RefCell::new(CallerReport::default()));
|
||||||
|
let respondent_report = Rc::new(RefCell::new(RespondentReport::default()));
|
||||||
|
let (tx_a, rx_a) = crossbeam_channel::unbounded();
|
||||||
|
let (tx_b, rx_b) = crossbeam_channel::unbounded();
|
||||||
|
|
||||||
|
let mut endpoint_a = Endpoint::new(
|
||||||
|
ENDPOINT_CALLER,
|
||||||
|
vec![
|
||||||
|
Box::new(MerkleCallerLeaf::new(local, caller_report.clone())),
|
||||||
|
Box::new(MockConnectionLeaf::new(
|
||||||
|
tx_b,
|
||||||
|
rx_a,
|
||||||
|
ENDPOINT_RESPONDENT,
|
||||||
|
false,
|
||||||
|
)),
|
||||||
|
],
|
||||||
|
);
|
||||||
|
endpoint_a.path = vec![ENDPOINT_CALLER];
|
||||||
|
|
||||||
|
let mut endpoint_b = Endpoint::new(
|
||||||
|
ENDPOINT_RESPONDENT,
|
||||||
|
vec![
|
||||||
|
Box::new(MerkleRespondentLeaf::new(remote, respondent_report.clone())),
|
||||||
|
Box::new(MockConnectionLeaf::new(tx_a, rx_b, ENDPOINT_CALLER, true)),
|
||||||
|
],
|
||||||
|
);
|
||||||
|
endpoint_b.path = vec![ENDPOINT_CALLER, ENDPOINT_RESPONDENT];
|
||||||
|
|
||||||
|
// Register routes before the first caller update so initial packet delivery
|
||||||
|
// does not depend on leaf ordering.
|
||||||
|
endpoint_a.connections.insert((ENDPOINT_RESPONDENT, false));
|
||||||
|
endpoint_b.connections.insert((ENDPOINT_CALLER, true));
|
||||||
|
|
||||||
|
Self {
|
||||||
|
endpoint_a,
|
||||||
|
endpoint_b,
|
||||||
|
caller_report,
|
||||||
|
respondent_report,
|
||||||
|
remote_root_hash,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Drives one deterministic protocol loop.
|
||||||
|
pub(super) fn tick(&mut self) {
|
||||||
|
self.endpoint_a.update();
|
||||||
|
self.endpoint_b.update();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs until the caller reports completion.
|
||||||
|
pub(super) fn run_until_done(&mut self, max_ticks: usize) -> usize {
|
||||||
|
for tick in 1..=max_ticks {
|
||||||
|
self.tick();
|
||||||
|
|
||||||
|
if self.caller_report.borrow().done {
|
||||||
|
return tick;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
panic!("Merkle sync did not finish within {max_ticks} ticks");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs until the respondent has sent at least `target_frames` frames.
|
||||||
|
pub(super) fn run_until_respondent_frames(
|
||||||
|
&mut self,
|
||||||
|
target_frames: usize,
|
||||||
|
max_ticks: usize,
|
||||||
|
) -> usize {
|
||||||
|
for tick in 1..=max_ticks {
|
||||||
|
self.tick();
|
||||||
|
|
||||||
|
if self.respondent_report.borrow().frames_sent >= target_frames {
|
||||||
|
return tick;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
panic!("respondent did not send {target_frames} frames within {max_ticks} ticks");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Verifies the requested four-leaf topology.
|
||||||
|
pub(super) fn assert_four_leaf_topology(&self) {
|
||||||
|
assert_eq!(self.endpoint_a.leaves.len(), 2);
|
||||||
|
assert_eq!(self.endpoint_b.leaves.len(), 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,404 @@
|
|||||||
|
use alloc::{collections::VecDeque, rc::Rc, vec, vec::Vec};
|
||||||
|
use core::cell::RefCell;
|
||||||
|
|
||||||
|
use crossbeam_channel::{Receiver, Sender};
|
||||||
|
|
||||||
|
use crate::{Endpoint, Leaf, Packet};
|
||||||
|
|
||||||
|
use super::{
|
||||||
|
codec::{decode_block_chunk, decode_child_summary, decode_u32},
|
||||||
|
constants::{
|
||||||
|
ENDPOINT_CALLER, ENDPOINT_RESPONDENT, LEAF_MERKLE_CALLER, LEAF_MERKLE_RESPONDENT,
|
||||||
|
LEAF_MOCK_CONNECTION, PROC_BLOCK_CHUNK, PROC_CHILD_HASH_ENTRY, PROC_GET_BLOCK_STREAM,
|
||||||
|
PROC_GET_CHILD_HASHES, PROC_GET_ROOT_HASH, PROC_ROOT_HASH, ROOT_NODE,
|
||||||
|
},
|
||||||
|
rpc::{
|
||||||
|
block_chunk_frame, block_stream_request, child_hash_frame, child_hashes_request,
|
||||||
|
root_hash_frame, root_hash_request,
|
||||||
|
},
|
||||||
|
state::{CallerPhase, CallerReport, RespondentReport, ResponseStream},
|
||||||
|
tree::{BlockChunk, ChildKind, MerkleStore},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Leaf that simulates a serialized transport connection with crossbeam channels.
|
||||||
|
///
|
||||||
|
/// This is intentionally tiny and reusable. Both endpoints in the Merkle test have
|
||||||
|
/// exactly one of these leaves, giving the requested four-leaf topology: caller,
|
||||||
|
/// respondent, and two mock connections.
|
||||||
|
pub(super) struct MockConnectionLeaf {
|
||||||
|
pub(super) tx: Sender<Vec<u8>>,
|
||||||
|
pub(super) rx: Receiver<Vec<u8>>,
|
||||||
|
pub(super) remote_id: u32,
|
||||||
|
pub(super) is_authority: bool,
|
||||||
|
pub(super) started: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Caller leaf that drives the Merkle synchronization algorithm.
|
||||||
|
pub(super) struct MerkleCallerLeaf {
|
||||||
|
local: MerkleStore,
|
||||||
|
phase: CallerPhase,
|
||||||
|
pending_nodes: VecDeque<u32>,
|
||||||
|
pending_blocks: VecDeque<u32>,
|
||||||
|
report: Rc<RefCell<CallerReport>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Respondent leaf that serves Merkle hash and block streams.
|
||||||
|
pub(super) struct MerkleRespondentLeaf {
|
||||||
|
remote: MerkleStore,
|
||||||
|
active_stream: Option<ResponseStream>,
|
||||||
|
report: Rc<RefCell<RespondentReport>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MockConnectionLeaf {
|
||||||
|
/// Creates one side of a mock connection.
|
||||||
|
pub(super) fn new(
|
||||||
|
tx: Sender<Vec<u8>>,
|
||||||
|
rx: Receiver<Vec<u8>>,
|
||||||
|
remote_id: u32,
|
||||||
|
is_authority: bool,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
tx,
|
||||||
|
rx,
|
||||||
|
remote_id,
|
||||||
|
is_authority,
|
||||||
|
started: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MerkleCallerLeaf {
|
||||||
|
/// Creates a caller with a local store and externally visible report.
|
||||||
|
pub(super) fn new(local: MerkleStore, report: Rc<RefCell<CallerReport>>) -> Self {
|
||||||
|
Self {
|
||||||
|
local,
|
||||||
|
phase: CallerPhase::NeedRoot,
|
||||||
|
pending_nodes: VecDeque::new(),
|
||||||
|
pending_blocks: VecDeque::new(),
|
||||||
|
report,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MerkleRespondentLeaf {
|
||||||
|
/// Creates a respondent backed by the authoritative remote store.
|
||||||
|
pub(super) fn new(remote: MerkleStore, report: Rc<RefCell<RespondentReport>>) -> Self {
|
||||||
|
Self {
|
||||||
|
remote,
|
||||||
|
active_stream: None,
|
||||||
|
report,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Leaf for MockConnectionLeaf {
|
||||||
|
fn get_id(&self) -> u32 {
|
||||||
|
LEAF_MOCK_CONNECTION
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update(&mut self, endpoint: &mut Endpoint) {
|
||||||
|
if !self.started {
|
||||||
|
endpoint
|
||||||
|
.connections
|
||||||
|
.insert((self.remote_id, self.is_authority));
|
||||||
|
self.started = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
while !self.rx.is_empty() {
|
||||||
|
let data = self.rx.recv().unwrap();
|
||||||
|
|
||||||
|
// Mock transports move untrusted bytes. Malformed frames are dropped so
|
||||||
|
// the sync state machine is tested only after packet parsing succeeds.
|
||||||
|
if let Ok(packet) = Packet::deserialize(&data) {
|
||||||
|
let _ = endpoint.add_inbound(packet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
endpoint.take_outbound_clear(self.remote_id, |packet| {
|
||||||
|
let data = packet.serialize().unwrap();
|
||||||
|
let _ = self.tx.send(data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Leaf for MerkleCallerLeaf {
|
||||||
|
fn get_id(&self) -> u32 {
|
||||||
|
LEAF_MERKLE_CALLER
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update(&mut self, endpoint: &mut Endpoint) {
|
||||||
|
self.receive_responses(endpoint);
|
||||||
|
self.dispatch_next_request(endpoint);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Leaf for MerkleRespondentLeaf {
|
||||||
|
fn get_id(&self) -> u32 {
|
||||||
|
LEAF_MERKLE_RESPONDENT
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update(&mut self, endpoint: &mut Endpoint) {
|
||||||
|
self.open_stream_from_request(endpoint);
|
||||||
|
self.send_one_response_frame(endpoint);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MerkleCallerLeaf {
|
||||||
|
/// Consumes all response packets currently delivered to endpoint A.
|
||||||
|
fn receive_responses(&mut self, endpoint: &mut Endpoint) {
|
||||||
|
endpoint.take_inbound_clear(ENDPOINT_CALLER, |packet| {
|
||||||
|
self.report
|
||||||
|
.borrow_mut()
|
||||||
|
.received_procedures
|
||||||
|
.push(packet.procedure_id);
|
||||||
|
self.handle_response_packet(packet);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handles one response packet according to the current caller phase.
|
||||||
|
fn handle_response_packet(&mut self, packet: &Packet) {
|
||||||
|
match &mut self.phase {
|
||||||
|
CallerPhase::AwaitRoot { hook_id } => {
|
||||||
|
assert_eq!(packet.hook_id, *hook_id);
|
||||||
|
assert_eq!(packet.procedure_id, PROC_ROOT_HASH);
|
||||||
|
let remote_root = decode_u32(&packet.data).expect("root hash payload");
|
||||||
|
|
||||||
|
if packet.end_hook {
|
||||||
|
self.finish_root_response(remote_root);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CallerPhase::AwaitChildren {
|
||||||
|
hook_id,
|
||||||
|
node_id: _,
|
||||||
|
entries,
|
||||||
|
} => {
|
||||||
|
assert_eq!(packet.hook_id, *hook_id);
|
||||||
|
assert_eq!(packet.procedure_id, PROC_CHILD_HASH_ENTRY);
|
||||||
|
entries.push(decode_child_summary(&packet.data).expect("child summary payload"));
|
||||||
|
|
||||||
|
if packet.end_hook {
|
||||||
|
self.finish_child_response();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CallerPhase::AwaitBlock {
|
||||||
|
hook_id,
|
||||||
|
block_id: _,
|
||||||
|
chunks,
|
||||||
|
} => {
|
||||||
|
assert_eq!(packet.hook_id, *hook_id);
|
||||||
|
assert_eq!(packet.procedure_id, PROC_BLOCK_CHUNK);
|
||||||
|
chunks.push(decode_block_chunk(&packet.data).expect("block chunk payload"));
|
||||||
|
|
||||||
|
if packet.end_hook {
|
||||||
|
self.finish_block_response();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CallerPhase::NeedRoot | CallerPhase::Ready | CallerPhase::Done => {
|
||||||
|
panic!("unexpected Merkle response in phase {:?}", self.phase);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Applies the completed root response and decides whether tree walking is needed.
|
||||||
|
fn finish_root_response(&mut self, remote_root: u32) {
|
||||||
|
if self.local.root_hash() == remote_root {
|
||||||
|
self.mark_done();
|
||||||
|
} else {
|
||||||
|
self.pending_nodes.push_back(ROOT_NODE);
|
||||||
|
self.phase = CallerPhase::Ready;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Applies a completed child-hash stream.
|
||||||
|
fn finish_child_response(&mut self) {
|
||||||
|
let CallerPhase::AwaitChildren {
|
||||||
|
hook_id: _,
|
||||||
|
node_id: _,
|
||||||
|
entries,
|
||||||
|
} = core::mem::replace(&mut self.phase, CallerPhase::Ready)
|
||||||
|
else {
|
||||||
|
unreachable!();
|
||||||
|
};
|
||||||
|
|
||||||
|
for entry in entries {
|
||||||
|
if self.local.hash_for(entry.kind, entry.id) == entry.hash {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match entry.kind {
|
||||||
|
ChildKind::Branch => self.pending_nodes.push_back(entry.id),
|
||||||
|
ChildKind::Block => self.pending_blocks.push_back(entry.id),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Applies a completed block stream to the local store.
|
||||||
|
fn finish_block_response(&mut self) {
|
||||||
|
let CallerPhase::AwaitBlock {
|
||||||
|
hook_id: _,
|
||||||
|
block_id,
|
||||||
|
mut chunks,
|
||||||
|
} = core::mem::replace(&mut self.phase, CallerPhase::Ready)
|
||||||
|
else {
|
||||||
|
unreachable!();
|
||||||
|
};
|
||||||
|
|
||||||
|
chunks.sort_by_key(|chunk| chunk.index);
|
||||||
|
assert_eq!(
|
||||||
|
chunks.len(),
|
||||||
|
chunks.first().map(|chunk| chunk.total).unwrap_or(0) as usize
|
||||||
|
);
|
||||||
|
|
||||||
|
let new_chunks: Vec<Vec<u8>> = chunks.into_iter().map(|chunk| chunk.data).collect();
|
||||||
|
self.local.replace_block(block_id, new_chunks.clone());
|
||||||
|
|
||||||
|
let mut report = self.report.borrow_mut();
|
||||||
|
report.synchronized_blocks.push(block_id);
|
||||||
|
report.applied_block_chunks.push((block_id, new_chunks));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends the next request if the caller is not waiting on a response stream.
|
||||||
|
fn dispatch_next_request(&mut self, endpoint: &mut Endpoint) {
|
||||||
|
match self.phase {
|
||||||
|
CallerPhase::NeedRoot => {
|
||||||
|
let hook_id = self.send_request(endpoint, PROC_GET_ROOT_HASH, Vec::new());
|
||||||
|
endpoint.add_outbound(root_hash_request(hook_id)).unwrap();
|
||||||
|
self.phase = CallerPhase::AwaitRoot { hook_id };
|
||||||
|
}
|
||||||
|
CallerPhase::Ready => {
|
||||||
|
if let Some(node_id) = self.pending_nodes.pop_front() {
|
||||||
|
let hook_id = self.send_request(endpoint, PROC_GET_CHILD_HASHES, Vec::new());
|
||||||
|
endpoint
|
||||||
|
.add_outbound(child_hashes_request(hook_id, node_id))
|
||||||
|
.unwrap();
|
||||||
|
self.phase = CallerPhase::AwaitChildren {
|
||||||
|
hook_id,
|
||||||
|
node_id,
|
||||||
|
entries: Vec::new(),
|
||||||
|
};
|
||||||
|
} else if let Some(block_id) = self.pending_blocks.pop_front() {
|
||||||
|
let hook_id = self.send_request(endpoint, PROC_GET_BLOCK_STREAM, Vec::new());
|
||||||
|
endpoint
|
||||||
|
.add_outbound(block_stream_request(hook_id, block_id))
|
||||||
|
.unwrap();
|
||||||
|
self.phase = CallerPhase::AwaitBlock {
|
||||||
|
hook_id,
|
||||||
|
block_id,
|
||||||
|
chunks: Vec::new(),
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
self.mark_done();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CallerPhase::AwaitRoot { .. }
|
||||||
|
| CallerPhase::AwaitChildren { .. }
|
||||||
|
| CallerPhase::AwaitBlock { .. }
|
||||||
|
| CallerPhase::Done => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reserves a hook id and records the logical RPC request.
|
||||||
|
fn send_request(&mut self, endpoint: &mut Endpoint, procedure_id: u32, _data: Vec<u8>) -> u16 {
|
||||||
|
let hook_id = endpoint.get_hook_id();
|
||||||
|
self.report
|
||||||
|
.borrow_mut()
|
||||||
|
.requested_procedures
|
||||||
|
.push(procedure_id);
|
||||||
|
hook_id
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Marks the synchronization complete and records the final local root.
|
||||||
|
fn mark_done(&mut self) {
|
||||||
|
self.phase = CallerPhase::Done;
|
||||||
|
let mut report = self.report.borrow_mut();
|
||||||
|
report.done = true;
|
||||||
|
report.final_root_hash = Some(self.local.root_hash());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MerkleRespondentLeaf {
|
||||||
|
/// Opens one response stream from the first pending local request.
|
||||||
|
fn open_stream_from_request(&mut self, endpoint: &mut Endpoint) {
|
||||||
|
if self.active_stream.is_some() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut request = None;
|
||||||
|
endpoint.take_inbound_clear(ENDPOINT_RESPONDENT, |packet| {
|
||||||
|
if request.is_none() {
|
||||||
|
request = Some((packet.hook_id, packet.procedure_id, packet.data.clone()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let Some((hook_id, procedure_id, data)) = request else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let frames = self.frames_for_request(procedure_id, &data);
|
||||||
|
endpoint.hooks.insert(hook_id, ENDPOINT_CALLER);
|
||||||
|
|
||||||
|
self.report.borrow_mut().requests_seen.push(procedure_id);
|
||||||
|
if !frames.is_empty() {
|
||||||
|
self.report.borrow_mut().streams_started += 1;
|
||||||
|
self.active_stream = Some(ResponseStream::new(hook_id, frames));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds response frames for one request procedure.
|
||||||
|
fn frames_for_request(&self, procedure_id: u32, data: &[u8]) -> Vec<super::rpc::OutgoingFrame> {
|
||||||
|
match procedure_id {
|
||||||
|
PROC_GET_ROOT_HASH => vec![root_hash_frame(self.remote.root_hash())],
|
||||||
|
PROC_GET_CHILD_HASHES => {
|
||||||
|
let node_id = decode_u32(data).expect("child hash request node id");
|
||||||
|
self.remote
|
||||||
|
.child_summaries(node_id)
|
||||||
|
.into_iter()
|
||||||
|
.map(child_hash_frame)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
PROC_GET_BLOCK_STREAM => {
|
||||||
|
let block_id = decode_u32(data).expect("block stream request block id");
|
||||||
|
let chunks = self.remote.block_chunks(block_id);
|
||||||
|
let total = chunks.len() as u32;
|
||||||
|
chunks
|
||||||
|
.into_iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(index, data)| {
|
||||||
|
block_chunk_frame(BlockChunk {
|
||||||
|
block_id,
|
||||||
|
index: index as u32,
|
||||||
|
total,
|
||||||
|
data,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
_ => Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends at most one response frame per update loop.
|
||||||
|
fn send_one_response_frame(&mut self, endpoint: &mut Endpoint) {
|
||||||
|
let Some(stream) = self.active_stream.as_mut() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if stream.is_empty() {
|
||||||
|
self.active_stream = None;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let packet = stream.next_packet().expect("active stream frame");
|
||||||
|
if endpoint.add_outbound(packet).is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.report.borrow_mut().frames_sent += 1;
|
||||||
|
stream.advance();
|
||||||
|
|
||||||
|
if stream.is_complete() {
|
||||||
|
self.report.borrow_mut().streams_completed += 1;
|
||||||
|
self.active_stream = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
mod codec;
|
||||||
|
mod constants;
|
||||||
|
mod harness;
|
||||||
|
mod leaves;
|
||||||
|
mod rpc;
|
||||||
|
mod state;
|
||||||
|
mod tests;
|
||||||
|
mod tree;
|
||||||
@@ -0,0 +1,86 @@
|
|||||||
|
use alloc::{vec, vec::Vec};
|
||||||
|
|
||||||
|
use crate::Packet;
|
||||||
|
|
||||||
|
use super::{
|
||||||
|
codec::{encode_block_chunk, encode_child_summary, encode_u32},
|
||||||
|
constants::{
|
||||||
|
ENDPOINT_CALLER, ENDPOINT_RESPONDENT, PROC_BLOCK_CHUNK, PROC_CHILD_HASH_ENTRY,
|
||||||
|
PROC_GET_BLOCK_STREAM, PROC_GET_CHILD_HASHES, PROC_GET_ROOT_HASH, PROC_ROOT_HASH,
|
||||||
|
},
|
||||||
|
tree::{BlockChunk, ChildSummary},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// One outbound response frame before it is wrapped in endpoint routing fields.
|
||||||
|
///
|
||||||
|
/// A response stream owns a list of these frames and asks each frame to become a
|
||||||
|
/// packet only when the loop is ready to send it. That keeps retry behavior simple:
|
||||||
|
/// a failed send does not consume the frame.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub(super) struct OutgoingFrame {
|
||||||
|
procedure_id: u32,
|
||||||
|
data: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OutgoingFrame {
|
||||||
|
/// Wraps the frame in an upward packet for `hook_id`.
|
||||||
|
pub(super) fn to_packet(&self, hook_id: u16, end_hook: bool) -> Packet {
|
||||||
|
Packet {
|
||||||
|
hook_id,
|
||||||
|
end_hook,
|
||||||
|
path: vec![ENDPOINT_CALLER],
|
||||||
|
procedure_id: self.procedure_id,
|
||||||
|
data: self.data.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds the initial root-hash request.
|
||||||
|
pub(super) fn root_hash_request(hook_id: u16) -> Packet {
|
||||||
|
request_packet(PROC_GET_ROOT_HASH, hook_id, Vec::new())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds a request for one branch node's child hashes.
|
||||||
|
pub(super) fn child_hashes_request(hook_id: u16, node_id: u32) -> Packet {
|
||||||
|
request_packet(PROC_GET_CHILD_HASHES, hook_id, encode_u32(node_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds a request for one mismatched block's data stream.
|
||||||
|
pub(super) fn block_stream_request(hook_id: u16, block_id: u32) -> Packet {
|
||||||
|
request_packet(PROC_GET_BLOCK_STREAM, hook_id, encode_u32(block_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds a single root-hash response frame.
|
||||||
|
pub(super) fn root_hash_frame(root_hash: u32) -> OutgoingFrame {
|
||||||
|
OutgoingFrame {
|
||||||
|
procedure_id: PROC_ROOT_HASH,
|
||||||
|
data: encode_u32(root_hash),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds one streamed child hash entry response frame.
|
||||||
|
pub(super) fn child_hash_frame(summary: ChildSummary) -> OutgoingFrame {
|
||||||
|
OutgoingFrame {
|
||||||
|
procedure_id: PROC_CHILD_HASH_ENTRY,
|
||||||
|
data: encode_child_summary(summary),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds one streamed block chunk response frame.
|
||||||
|
pub(super) fn block_chunk_frame(chunk: BlockChunk) -> OutgoingFrame {
|
||||||
|
OutgoingFrame {
|
||||||
|
procedure_id: PROC_BLOCK_CHUNK,
|
||||||
|
data: encode_block_chunk(&chunk),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds a downward request packet.
|
||||||
|
fn request_packet(procedure_id: u32, hook_id: u16, data: Vec<u8>) -> Packet {
|
||||||
|
Packet {
|
||||||
|
hook_id,
|
||||||
|
end_hook: true,
|
||||||
|
path: vec![ENDPOINT_CALLER, ENDPOINT_RESPONDENT],
|
||||||
|
procedure_id,
|
||||||
|
data,
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,98 @@
|
|||||||
|
use alloc::vec::Vec;
|
||||||
|
|
||||||
|
use super::{
|
||||||
|
rpc::OutgoingFrame,
|
||||||
|
tree::{BlockChunk, ChildSummary},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Caller-side synchronization phase.
|
||||||
|
///
|
||||||
|
/// This is the manual state machine a future macro should be able to derive from
|
||||||
|
/// RPC declarations. Each awaiting state owns the partial stream it is collecting,
|
||||||
|
/// making it clear which packets are legal at each step.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub(super) enum CallerPhase {
|
||||||
|
NeedRoot,
|
||||||
|
AwaitRoot {
|
||||||
|
hook_id: u16,
|
||||||
|
},
|
||||||
|
Ready,
|
||||||
|
AwaitChildren {
|
||||||
|
hook_id: u16,
|
||||||
|
node_id: u32,
|
||||||
|
entries: Vec<ChildSummary>,
|
||||||
|
},
|
||||||
|
AwaitBlock {
|
||||||
|
hook_id: u16,
|
||||||
|
block_id: u32,
|
||||||
|
chunks: Vec<BlockChunk>,
|
||||||
|
},
|
||||||
|
Done,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test-visible caller observations.
|
||||||
|
///
|
||||||
|
/// The leaf itself lives behind `Box<dyn Leaf>`, so the harness keeps a shared
|
||||||
|
/// report handle for assertions without needing downcasts.
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub(super) struct CallerReport {
|
||||||
|
pub(super) done: bool,
|
||||||
|
pub(super) requested_procedures: Vec<u32>,
|
||||||
|
pub(super) received_procedures: Vec<u32>,
|
||||||
|
pub(super) synchronized_blocks: Vec<u32>,
|
||||||
|
pub(super) applied_block_chunks: Vec<(u32, Vec<Vec<u8>>)>,
|
||||||
|
pub(super) final_root_hash: Option<u32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test-visible respondent observations.
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub(super) struct RespondentReport {
|
||||||
|
pub(super) requests_seen: Vec<u32>,
|
||||||
|
pub(super) streams_started: usize,
|
||||||
|
pub(super) streams_completed: usize,
|
||||||
|
pub(super) frames_sent: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Respondent-owned response stream.
|
||||||
|
///
|
||||||
|
/// It stores encoded frames and exposes packet construction one frame at a time.
|
||||||
|
/// Since `next_packet` does not advance, a failed route can be retried by calling it
|
||||||
|
/// again on the next loop.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub(super) struct ResponseStream {
|
||||||
|
hook_id: u16,
|
||||||
|
frames: Vec<OutgoingFrame>,
|
||||||
|
next_index: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ResponseStream {
|
||||||
|
/// Creates a response stream for one request hook.
|
||||||
|
pub(super) fn new(hook_id: u16, frames: Vec<OutgoingFrame>) -> Self {
|
||||||
|
Self {
|
||||||
|
hook_id,
|
||||||
|
frames,
|
||||||
|
next_index: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds the next packet without advancing the stream.
|
||||||
|
pub(super) fn next_packet(&self) -> Option<crate::Packet> {
|
||||||
|
let frame = self.frames.get(self.next_index)?;
|
||||||
|
Some(frame.to_packet(self.hook_id, self.next_index + 1 == self.frames.len()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Marks the current frame as successfully sent.
|
||||||
|
pub(super) fn advance(&mut self) {
|
||||||
|
self.next_index += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true once every frame has been sent.
|
||||||
|
pub(super) fn is_complete(&self) -> bool {
|
||||||
|
self.next_index >= self.frames.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true when the request generated no frames.
|
||||||
|
pub(super) fn is_empty(&self) -> bool {
|
||||||
|
self.frames.is_empty()
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,78 @@
|
|||||||
|
use super::{
|
||||||
|
constants::{
|
||||||
|
BLOCK_BRAVO, BLOCK_CHARLIE, PROC_GET_BLOCK_STREAM, PROC_GET_CHILD_HASHES,
|
||||||
|
PROC_GET_ROOT_HASH,
|
||||||
|
},
|
||||||
|
harness::MerkleHarness,
|
||||||
|
tree::remote_fixture,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn merkle_sync_walks_hash_tree_and_streams_changed_blocks() {
|
||||||
|
let mut harness = MerkleHarness::divergent();
|
||||||
|
harness.assert_four_leaf_topology();
|
||||||
|
|
||||||
|
let ticks = harness.run_until_done(100);
|
||||||
|
assert!(
|
||||||
|
ticks > 20,
|
||||||
|
"sync should require many request/stream iterations"
|
||||||
|
);
|
||||||
|
|
||||||
|
let caller = harness.caller_report.borrow();
|
||||||
|
assert_eq!(caller.final_root_hash, Some(harness.remote_root_hash));
|
||||||
|
assert_eq!(caller.synchronized_blocks, [BLOCK_BRAVO, BLOCK_CHARLIE]);
|
||||||
|
assert_eq!(
|
||||||
|
caller.requested_procedures,
|
||||||
|
[
|
||||||
|
PROC_GET_ROOT_HASH,
|
||||||
|
PROC_GET_CHILD_HASHES,
|
||||||
|
PROC_GET_CHILD_HASHES,
|
||||||
|
PROC_GET_CHILD_HASHES,
|
||||||
|
PROC_GET_BLOCK_STREAM,
|
||||||
|
PROC_GET_BLOCK_STREAM,
|
||||||
|
]
|
||||||
|
);
|
||||||
|
|
||||||
|
let respondent = harness.respondent_report.borrow();
|
||||||
|
assert_eq!(respondent.requests_seen, caller.requested_procedures);
|
||||||
|
assert_eq!(respondent.streams_started, 6);
|
||||||
|
assert_eq!(respondent.streams_completed, 6);
|
||||||
|
assert_eq!(respondent.frames_sent, 12);
|
||||||
|
assert!(harness.endpoint_b.hooks.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn identical_tree_stops_after_root_hash() {
|
||||||
|
let remote = remote_fixture();
|
||||||
|
let mut harness = MerkleHarness::with_stores(remote.clone(), remote);
|
||||||
|
|
||||||
|
harness.run_until_done(20);
|
||||||
|
|
||||||
|
let caller = harness.caller_report.borrow();
|
||||||
|
assert_eq!(caller.final_root_hash, Some(harness.remote_root_hash));
|
||||||
|
assert_eq!(caller.requested_procedures, [PROC_GET_ROOT_HASH]);
|
||||||
|
assert!(caller.synchronized_blocks.is_empty());
|
||||||
|
|
||||||
|
let respondent = harness.respondent_report.borrow();
|
||||||
|
assert_eq!(respondent.frames_sent, 1);
|
||||||
|
assert_eq!(respondent.streams_started, 1);
|
||||||
|
assert_eq!(respondent.streams_completed, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn block_stream_hook_persists_until_final_frame() {
|
||||||
|
let mut harness = MerkleHarness::divergent();
|
||||||
|
|
||||||
|
harness.run_until_respondent_frames(8, 100);
|
||||||
|
assert_eq!(
|
||||||
|
harness.endpoint_b.hooks.len(),
|
||||||
|
1,
|
||||||
|
"first block stream should keep its hook after a non-final chunk"
|
||||||
|
);
|
||||||
|
|
||||||
|
harness.run_until_done(100);
|
||||||
|
assert!(
|
||||||
|
harness.endpoint_b.hooks.is_empty(),
|
||||||
|
"final block stream packet should clean respondent hook state"
|
||||||
|
);
|
||||||
|
}
|
||||||
@@ -0,0 +1,255 @@
|
|||||||
|
use alloc::{collections::BTreeMap, vec, vec::Vec};
|
||||||
|
|
||||||
|
use super::constants::{
|
||||||
|
BLOCK_ALPHA, BLOCK_BRAVO, BLOCK_CHARLIE, BLOCK_DELTA, BRANCH_LEFT, BRANCH_RIGHT, ROOT_NODE,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Type of child referenced by a Merkle node summary.
|
||||||
|
///
|
||||||
|
/// The sync caller uses this to decide whether a mismatched child should recurse
|
||||||
|
/// with `GET_CHILD_HASHES` or transfer data with `GET_BLOCK_STREAM`.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub(super) enum ChildKind {
|
||||||
|
Branch,
|
||||||
|
Block,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One child entry in a streamed Merkle summary response.
|
||||||
|
///
|
||||||
|
/// A respondent streams these one per loop. The caller compares each `hash` with
|
||||||
|
/// its local store and queues either another node walk or a block transfer.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub(super) struct ChildSummary {
|
||||||
|
pub(super) id: u32,
|
||||||
|
pub(super) kind: ChildKind,
|
||||||
|
pub(super) hash: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One chunk in a streamed block response.
|
||||||
|
///
|
||||||
|
/// Chunks carry their total so the caller can replace the local block only after
|
||||||
|
/// the final stream packet arrives. This keeps partially received data out of the
|
||||||
|
/// Merkle hash until the hook completes.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub(super) struct BlockChunk {
|
||||||
|
pub(super) block_id: u32,
|
||||||
|
pub(super) index: u32,
|
||||||
|
pub(super) total: u32,
|
||||||
|
pub(super) data: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Static edge in the test Merkle tree.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
struct TreeChild {
|
||||||
|
id: u32,
|
||||||
|
kind: ChildKind,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// In-memory Merkle store used by the caller and respondent leaves.
|
||||||
|
///
|
||||||
|
/// This is deliberately small but extensible: adding wider trees, extra branches,
|
||||||
|
/// or different block chunking only changes this store, not the endpoint routing
|
||||||
|
/// harness. The hash is not cryptographic; it is deterministic test content used to
|
||||||
|
/// exercise the protocol state machine.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub(super) struct MerkleStore {
|
||||||
|
root_id: u32,
|
||||||
|
children: BTreeMap<u32, Vec<TreeChild>>,
|
||||||
|
blocks: BTreeMap<u32, Vec<Vec<u8>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MerkleStore {
|
||||||
|
/// Creates an empty store with the standard root id.
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
root_id: ROOT_NODE,
|
||||||
|
children: BTreeMap::new(),
|
||||||
|
blocks: BTreeMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the deterministic root hash for the current tree contents.
|
||||||
|
pub(super) fn root_hash(&self) -> u32 {
|
||||||
|
self.node_hash(self.root_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns child summaries for `node_id` in stable order.
|
||||||
|
pub(super) fn child_summaries(&self, node_id: u32) -> Vec<ChildSummary> {
|
||||||
|
self.children
|
||||||
|
.get(&node_id)
|
||||||
|
.map(|children| {
|
||||||
|
children
|
||||||
|
.iter()
|
||||||
|
.map(|child| ChildSummary {
|
||||||
|
id: child.id,
|
||||||
|
kind: child.kind,
|
||||||
|
hash: self.hash_for(child.kind, child.id),
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
})
|
||||||
|
.unwrap_or_default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the local hash for a branch or block child.
|
||||||
|
pub(super) fn hash_for(&self, kind: ChildKind, id: u32) -> u32 {
|
||||||
|
match kind {
|
||||||
|
ChildKind::Branch => self.node_hash(id),
|
||||||
|
ChildKind::Block => self.block_hash(id),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the stored chunks for a block, preserving stream order.
|
||||||
|
pub(super) fn block_chunks(&self, block_id: u32) -> Vec<Vec<u8>> {
|
||||||
|
self.blocks.get(&block_id).cloned().unwrap_or_default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Replaces one local block after a complete block stream arrives.
|
||||||
|
pub(super) fn replace_block(&mut self, block_id: u32, chunks: Vec<Vec<u8>>) {
|
||||||
|
self.blocks.insert(block_id, chunks);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Computes a deterministic hash for a branch node.
|
||||||
|
fn node_hash(&self, node_id: u32) -> u32 {
|
||||||
|
let mut hash = mix_u32(0x4E4F_4445, node_id);
|
||||||
|
|
||||||
|
if let Some(children) = self.children.get(&node_id) {
|
||||||
|
for child in children {
|
||||||
|
hash = mix_u32(hash, child.id);
|
||||||
|
hash = mix_u32(hash, child.kind.discriminant());
|
||||||
|
hash = mix_u32(hash, self.hash_for(child.kind, child.id));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
hash
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Computes a deterministic hash for a data block.
|
||||||
|
fn block_hash(&self, block_id: u32) -> u32 {
|
||||||
|
let mut hash = mix_u32(0x424C_4F43, block_id);
|
||||||
|
|
||||||
|
if let Some(chunks) = self.blocks.get(&block_id) {
|
||||||
|
for chunk in chunks {
|
||||||
|
hash = mix_u32(hash, chunk.len() as u32);
|
||||||
|
hash = hash_bytes(hash, chunk);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
hash
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ChildKind {
|
||||||
|
/// Stable wire discriminant for streamed child summaries.
|
||||||
|
pub(super) fn discriminant(self) -> u32 {
|
||||||
|
match self {
|
||||||
|
ChildKind::Branch => 0,
|
||||||
|
ChildKind::Block => 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decodes a stable wire discriminant.
|
||||||
|
pub(super) fn from_discriminant(value: u32) -> Option<Self> {
|
||||||
|
match value {
|
||||||
|
0 => Some(Self::Branch),
|
||||||
|
1 => Some(Self::Block),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remote store containing the authoritative content.
|
||||||
|
pub(super) fn remote_fixture() -> MerkleStore {
|
||||||
|
let mut store = base_tree();
|
||||||
|
store
|
||||||
|
.blocks
|
||||||
|
.insert(BLOCK_ALPHA, chunks(&["alpha-", "same"]));
|
||||||
|
store
|
||||||
|
.blocks
|
||||||
|
.insert(BLOCK_BRAVO, chunks(&["bravo-", "remote-", "v2"]));
|
||||||
|
store
|
||||||
|
.blocks
|
||||||
|
.insert(BLOCK_CHARLIE, chunks(&["charlie-", "remote"]));
|
||||||
|
store.blocks.insert(BLOCK_DELTA, chunks(&["delta-same"]));
|
||||||
|
store
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Local store with two stale blocks and two already matching blocks.
|
||||||
|
pub(super) fn local_fixture() -> MerkleStore {
|
||||||
|
let mut store = base_tree();
|
||||||
|
store
|
||||||
|
.blocks
|
||||||
|
.insert(BLOCK_ALPHA, chunks(&["alpha-", "same"]));
|
||||||
|
store
|
||||||
|
.blocks
|
||||||
|
.insert(BLOCK_BRAVO, chunks(&["bravo-", "local-", "v1"]));
|
||||||
|
store
|
||||||
|
.blocks
|
||||||
|
.insert(BLOCK_CHARLIE, chunks(&["charlie-", "local"]));
|
||||||
|
store.blocks.insert(BLOCK_DELTA, chunks(&["delta-same"]));
|
||||||
|
store
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tree topology shared by the local and remote fixtures.
|
||||||
|
fn base_tree() -> MerkleStore {
|
||||||
|
let mut store = MerkleStore::new();
|
||||||
|
store.children.insert(
|
||||||
|
ROOT_NODE,
|
||||||
|
vec![
|
||||||
|
TreeChild {
|
||||||
|
id: BRANCH_LEFT,
|
||||||
|
kind: ChildKind::Branch,
|
||||||
|
},
|
||||||
|
TreeChild {
|
||||||
|
id: BRANCH_RIGHT,
|
||||||
|
kind: ChildKind::Branch,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
);
|
||||||
|
store.children.insert(
|
||||||
|
BRANCH_LEFT,
|
||||||
|
vec![
|
||||||
|
TreeChild {
|
||||||
|
id: BLOCK_ALPHA,
|
||||||
|
kind: ChildKind::Block,
|
||||||
|
},
|
||||||
|
TreeChild {
|
||||||
|
id: BLOCK_BRAVO,
|
||||||
|
kind: ChildKind::Block,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
);
|
||||||
|
store.children.insert(
|
||||||
|
BRANCH_RIGHT,
|
||||||
|
vec![
|
||||||
|
TreeChild {
|
||||||
|
id: BLOCK_CHARLIE,
|
||||||
|
kind: ChildKind::Block,
|
||||||
|
},
|
||||||
|
TreeChild {
|
||||||
|
id: BLOCK_DELTA,
|
||||||
|
kind: ChildKind::Block,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
);
|
||||||
|
store
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Converts string slices into owned byte chunks.
|
||||||
|
fn chunks(parts: &[&str]) -> Vec<Vec<u8>> {
|
||||||
|
parts.iter().map(|part| part.as_bytes().to_vec()).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// FNV-like byte mixing used only for deterministic test hashes.
|
||||||
|
fn hash_bytes(mut hash: u32, bytes: &[u8]) -> u32 {
|
||||||
|
for byte in bytes {
|
||||||
|
hash ^= u32::from(*byte);
|
||||||
|
hash = hash.wrapping_mul(16_777_619);
|
||||||
|
}
|
||||||
|
|
||||||
|
hash
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mixes one little-endian integer into the deterministic test hash.
|
||||||
|
fn mix_u32(hash: u32, value: u32) -> u32 {
|
||||||
|
hash_bytes(hash, &value.to_le_bytes())
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user