Shrink endpoint runtime footprint

This commit is contained in:
Michael Mikovsky
2026-06-01 13:08:26 -06:00
parent 4cd496ed2b
commit 7749f62629
25 changed files with 1245 additions and 489 deletions
+26 -28
View File
@@ -1,10 +1,13 @@
use alloc::{boxed::Box, rc::Rc, vec};
use alloc::{rc::Rc, vec};
use core::cell::RefCell;
use crate::protocol::Endpoint;
use crate::protocol::{Endpoint, Leaf};
use super::{
constants::{ENDPOINT_CALLER, ENDPOINT_RESPONDENT},
constants::{
ENDPOINT_CALLER, ENDPOINT_RESPONDENT, LEAF_MERKLE_CALLER, LEAF_MERKLE_RESPONDENT,
LEAF_MOCK_CONNECTION,
},
leaves::{MerkleCallerLeaf, MerkleRespondentLeaf, MockConnectionLeaf},
state::{CallerReport, RespondentReport},
tree::{MerkleStore, local_fixture, remote_fixture},
@@ -19,6 +22,10 @@ use super::{
pub(super) struct MerkleHarness {
pub(super) endpoint_a: Endpoint,
pub(super) endpoint_b: Endpoint,
caller_leaf: MerkleCallerLeaf,
caller_connection: MockConnectionLeaf,
respondent_leaf: MerkleRespondentLeaf,
respondent_connection: MockConnectionLeaf,
pub(super) caller_report: Rc<RefCell<CallerReport>>,
pub(super) respondent_report: Rc<RefCell<RespondentReport>>,
pub(super) remote_root_hash: u32,
@@ -38,37 +45,24 @@ impl MerkleHarness {
let (tx_a, rx_a) = crossbeam_channel::unbounded();
let (tx_b, rx_b) = crossbeam_channel::unbounded();
let mut endpoint_a = Endpoint::new(
ENDPOINT_CALLER,
vec![
Box::new(MerkleCallerLeaf::new(local, caller_report.clone())),
Box::new(MockConnectionLeaf::new(
tx_b,
rx_a,
ENDPOINT_RESPONDENT,
false,
)),
],
);
let mut endpoint_a = Endpoint::new(ENDPOINT_CALLER);
endpoint_a.path = vec![ENDPOINT_CALLER];
let mut endpoint_b = Endpoint::new(
ENDPOINT_RESPONDENT,
vec![
Box::new(MerkleRespondentLeaf::new(remote, respondent_report.clone())),
Box::new(MockConnectionLeaf::new(tx_a, rx_b, ENDPOINT_CALLER, true)),
],
);
let mut endpoint_b = Endpoint::new(ENDPOINT_RESPONDENT);
endpoint_b.path = vec![ENDPOINT_CALLER, ENDPOINT_RESPONDENT];
// Register routes before the first caller update so initial packet delivery
// does not depend on leaf ordering.
endpoint_a.connections.insert((ENDPOINT_RESPONDENT, false));
endpoint_b.connections.insert((ENDPOINT_CALLER, true));
endpoint_a.add_connection(ENDPOINT_RESPONDENT, false);
endpoint_b.add_connection(ENDPOINT_CALLER, true);
Self {
endpoint_a,
endpoint_b,
caller_leaf: MerkleCallerLeaf::new(local, caller_report.clone()),
caller_connection: MockConnectionLeaf::new(tx_b, rx_a, ENDPOINT_RESPONDENT, false),
respondent_leaf: MerkleRespondentLeaf::new(remote, respondent_report.clone()),
respondent_connection: MockConnectionLeaf::new(tx_a, rx_b, ENDPOINT_CALLER, true),
caller_report,
respondent_report,
remote_root_hash,
@@ -77,8 +71,10 @@ impl MerkleHarness {
/// Drives one deterministic protocol loop.
pub(super) fn tick(&mut self) {
self.endpoint_a.update();
self.endpoint_b.update();
self.caller_leaf.update(&mut self.endpoint_a);
self.caller_connection.update(&mut self.endpoint_a);
self.respondent_leaf.update(&mut self.endpoint_b);
self.respondent_connection.update(&mut self.endpoint_b);
}
/// Runs until the caller reports completion.
@@ -113,7 +109,9 @@ impl MerkleHarness {
/// Verifies the requested four-leaf topology.
pub(super) fn assert_four_leaf_topology(&self) {
assert_eq!(self.endpoint_a.leaves.len(), 2);
assert_eq!(self.endpoint_b.leaves.len(), 2);
assert_eq!(self.caller_leaf.get_id(), LEAF_MERKLE_CALLER);
assert_eq!(self.caller_connection.get_id(), LEAF_MOCK_CONNECTION);
assert_eq!(self.respondent_leaf.get_id(), LEAF_MERKLE_RESPONDENT);
assert_eq!(self.respondent_connection.get_id(), LEAF_MOCK_CONNECTION);
}
}
+1 -3
View File
@@ -111,9 +111,7 @@ impl Leaf for MockConnectionLeaf {
fn update(&mut self, endpoint: &mut Endpoint) {
if !self.started {
endpoint
.connections
.insert((self.remote_id, self.is_authority));
endpoint.add_connection(self.remote_id, self.is_authority);
self.started = true;
}
+2 -2
View File
@@ -34,8 +34,8 @@ pub(super) enum CallerPhase {
/// Test-visible caller observations.
///
/// The leaf itself lives behind `Box<dyn Leaf>`, so the harness keeps a shared
/// report handle for assertions without needing downcasts.
/// The harness keeps a shared report handle so assertions can inspect caller
/// behavior without borrowing the concrete leaf for the duration of a protocol run.
#[derive(Debug, Default)]
pub(super) struct CallerReport {
pub(super) done: bool,
+110 -114
View File
@@ -1,9 +1,9 @@
mod streams;
mod support;
use crate::protocol::{Endpoint, EndpointError, RouteDirection};
use crate::protocol::{Endpoint, EndpointError, Leaf, RouteDirection};
use alloc::{boxed::Box, vec};
use alloc::vec;
use support::{
CommsLeaf, ControllerLeaf, ENDPOINT_A, ENDPOINT_B, ENDPOINT_C, ResponderLeaf,
@@ -16,66 +16,63 @@ fn test_oneshot() {
let (tx_a, rx_a) = crossbeam_channel::unbounded();
let (tx_b, rx_b) = crossbeam_channel::unbounded();
let mut endpoint_a = Endpoint::new(
ENDPOINT_A,
vec![
Box::new(ControllerLeaf { has_run: false }),
Box::new(CommsLeaf {
tx: tx_b,
rx: rx_a,
remote_id: ENDPOINT_B,
is_authority: false,
started: false,
}),
],
);
let mut endpoint_a = Endpoint::new(ENDPOINT_A);
let mut controller_a = ControllerLeaf { has_run: false };
let mut comms_a = CommsLeaf {
tx: tx_b,
rx: rx_a,
remote_id: ENDPOINT_B,
is_authority: false,
started: false,
};
endpoint_a.path = vec![ENDPOINT_A];
let mut endpoint_b = Endpoint::new(
ENDPOINT_B,
vec![
Box::new(ResponderLeaf),
Box::new(CommsLeaf {
tx: tx_a,
rx: rx_b,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
}),
],
);
let mut endpoint_b = Endpoint::new(ENDPOINT_B);
let mut responder_b = ResponderLeaf;
let mut comms_b = CommsLeaf {
tx: tx_a,
rx: rx_b,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
};
endpoint_b.path = vec![ENDPOINT_A, ENDPOINT_B];
// Connections are registered routing state. The comms leaves also insert them
// during updates, but the first application packet should not depend on leaf order.
endpoint_a.connections.insert((ENDPOINT_B, false));
endpoint_b.connections.insert((ENDPOINT_A, true));
endpoint_a.add_connection(ENDPOINT_B, false);
endpoint_b.add_connection(ENDPOINT_A, true);
// Cycle 1: A sends request to B
endpoint_a.update();
endpoint_b.update();
controller_a.update(&mut endpoint_a);
comms_a.update(&mut endpoint_a);
responder_b.update(&mut endpoint_b);
comms_b.update(&mut endpoint_b);
// Cycle 2: B receives request and sends response to A
endpoint_b.update();
endpoint_a.update();
responder_b.update(&mut endpoint_b);
comms_b.update(&mut endpoint_b);
controller_a.update(&mut endpoint_a);
comms_a.update(&mut endpoint_a);
// Cycle 3: A's CommsLeaf needs one more update to pull the packet from the channel
// and put it into the inbound queue.
endpoint_a.update();
controller_a.update(&mut endpoint_a);
comms_a.update(&mut endpoint_a);
// Assertions on state
assert!(
endpoint_a.inbound.contains_key(&ENDPOINT_A),
Endpoint::route_contains(ENDPOINT_A, &endpoint_a.inbound),
"Endpoint A should have received response"
);
assert_eq!(
endpoint_a.inbound.get(&ENDPOINT_A).unwrap().len(),
Endpoint::route_get(ENDPOINT_A, &endpoint_a.inbound)
.unwrap()
.len(),
1,
"Endpoint A should have exactly one packet"
);
let response = &endpoint_a
.inbound
.get(&ENDPOINT_A)
let response = &Endpoint::route_get(ENDPOINT_A, &endpoint_a.inbound)
.unwrap()
.front()
.unwrap();
@@ -92,7 +89,7 @@ fn test_oneshot() {
fn inbound_downward_packet_for_local_endpoint_opens_hook() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.connections.insert((ENDPOINT_A, true));
endpoint.add_connection(ENDPOINT_A, true);
endpoint
.add_inbound_from(
@@ -106,7 +103,7 @@ fn inbound_downward_packet_for_local_endpoint_opens_hook() {
assert_eq!(packet.path, vec![ENDPOINT_A, ENDPOINT_B]);
assert_hook_present(&endpoint, hook_id);
assert_eq!(endpoint.hook_peer(hook_id), Some(ENDPOINT_A));
assert!(endpoint.outbound.is_empty());
assert!(Endpoint::routes_is_empty(&endpoint.outbound));
}
#[test]
@@ -122,15 +119,15 @@ fn outbound_packet_for_local_endpoint_is_delivered_locally() {
assert!(!packet.end_hook);
assert_eq!(packet.data, "ABC123".as_bytes());
assert_hook_removed(&endpoint, hook_id);
assert!(endpoint.outbound.is_empty());
assert!(Endpoint::routes_is_empty(&endpoint.outbound));
}
#[test]
fn inbound_downward_packet_routes_to_immediate_child() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.connections.insert((ENDPOINT_A, true));
endpoint.connections.insert((ENDPOINT_C, false));
endpoint.add_connection(ENDPOINT_A, true);
endpoint.add_connection(ENDPOINT_C, false);
endpoint
.add_inbound_from(
@@ -144,7 +141,7 @@ fn inbound_downward_packet_routes_to_immediate_child() {
assert_eq!(packet.path, vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C]);
assert_hook_present(&endpoint, hook_id);
assert_eq!(endpoint.hook_peer(hook_id), Some(ENDPOINT_C));
assert!(!endpoint.outbound.contains_key(&ENDPOINT_A));
assert!(!Endpoint::route_contains(ENDPOINT_A, &endpoint.outbound));
}
#[test]
@@ -152,7 +149,7 @@ fn outbound_downward_packet_routes_to_immediate_child() {
let mut endpoint = endpoint_at(ENDPOINT_A, vec![ENDPOINT_A]);
let hook_id = endpoint.get_hook_id();
endpoint.accept_hook(hook_id, ENDPOINT_B);
endpoint.connections.insert((ENDPOINT_B, false));
endpoint.add_connection(ENDPOINT_B, false);
endpoint
.add_outbound(echo_packet_with_end(
@@ -166,7 +163,7 @@ fn outbound_downward_packet_routes_to_immediate_child() {
assert!(packet.end_hook);
assert_eq!(packet.path, vec![ENDPOINT_A, ENDPOINT_B, ENDPOINT_C]);
assert_hook_removed(&endpoint, hook_id);
assert!(!endpoint.outbound.contains_key(&ENDPOINT_C));
assert!(!Endpoint::route_contains(ENDPOINT_C, &endpoint.outbound));
}
#[test]
@@ -174,8 +171,8 @@ fn inbound_upward_packet_with_hook_routes_to_parent() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.accept_hook(hook_id, ENDPOINT_C);
endpoint.connections.insert((ENDPOINT_A, true));
endpoint.connections.insert((ENDPOINT_C, false));
endpoint.add_connection(ENDPOINT_A, true);
endpoint.add_connection(ENDPOINT_C, false);
endpoint
.add_inbound_from(
@@ -188,15 +185,15 @@ fn inbound_upward_packet_with_hook_routes_to_parent() {
assert!(packet.end_hook);
assert_eq!(packet.hook_id, hook_id);
assert_hook_removed(&endpoint, hook_id);
assert!(!endpoint.outbound.contains_key(&ENDPOINT_C));
assert!(!Endpoint::route_contains(ENDPOINT_C, &endpoint.outbound));
}
#[test]
fn inbound_upward_packet_without_hook_is_rejected() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.connections.insert((ENDPOINT_A, true));
endpoint.connections.insert((ENDPOINT_C, false));
endpoint.add_connection(ENDPOINT_A, true);
endpoint.add_connection(ENDPOINT_C, false);
let error = endpoint
.add_inbound_from(
@@ -209,16 +206,16 @@ fn inbound_upward_packet_without_hook_is_rejected() {
error,
EndpointError::UnknownHook { hook_id: observed_hook_id } if observed_hook_id == hook_id
));
assert!(endpoint.inbound.is_empty());
assert!(endpoint.outbound.is_empty());
assert!(Endpoint::routes_is_empty(&endpoint.inbound));
assert!(Endpoint::routes_is_empty(&endpoint.outbound));
}
#[test]
fn forged_upward_packet_with_unknown_hook_is_rejected() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
endpoint.accept_hook(7, ENDPOINT_C);
endpoint.connections.insert((ENDPOINT_A, true));
endpoint.connections.insert((ENDPOINT_C, false));
endpoint.add_connection(ENDPOINT_A, true);
endpoint.add_connection(ENDPOINT_C, false);
let error = endpoint
.add_inbound_from(ENDPOINT_C, echo_packet_with_end(vec![ENDPOINT_A], 99, true))
@@ -226,7 +223,7 @@ fn forged_upward_packet_with_unknown_hook_is_rejected() {
assert!(matches!(error, EndpointError::UnknownHook { hook_id: 99 }));
assert_hook_present(&endpoint, 7);
assert!(endpoint.outbound.is_empty());
assert!(Endpoint::routes_is_empty(&endpoint.outbound));
}
#[test]
@@ -234,7 +231,7 @@ fn forged_sideways_packet_is_rejected_as_incorrect_path() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.accept_hook(hook_id, ENDPOINT_A);
endpoint.connections.insert((ENDPOINT_A, true));
endpoint.add_connection(ENDPOINT_A, true);
let error = endpoint
.add_inbound_from(
@@ -245,31 +242,29 @@ fn forged_sideways_packet_is_rejected_as_incorrect_path() {
assert!(matches!(error, EndpointError::DestinationOutsideLocalTree));
assert_hook_present(&endpoint, hook_id);
assert!(endpoint.inbound.is_empty());
assert!(endpoint.outbound.is_empty());
assert!(Endpoint::routes_is_empty(&endpoint.inbound));
assert!(Endpoint::routes_is_empty(&endpoint.outbound));
}
#[test]
fn malformed_frame_is_dropped_by_comms_leaf() {
let (tx_to_endpoint, rx_for_endpoint) = crossbeam_channel::unbounded();
let (tx_unused, _rx_unused) = crossbeam_channel::unbounded();
let mut endpoint = Endpoint::new(
ENDPOINT_B,
vec![Box::new(CommsLeaf {
tx: tx_unused,
rx: rx_for_endpoint,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
})],
);
let mut endpoint = Endpoint::new(ENDPOINT_B);
let mut comms = CommsLeaf {
tx: tx_unused,
rx: rx_for_endpoint,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
};
endpoint.path = vec![ENDPOINT_A, ENDPOINT_B];
tx_to_endpoint.send(vec![0, 1, 2, 3]).unwrap();
endpoint.update();
comms.update(&mut endpoint);
assert!(endpoint.inbound.is_empty());
assert!(endpoint.outbound.is_empty());
assert!(Endpoint::routes_is_empty(&endpoint.inbound));
assert!(Endpoint::routes_is_empty(&endpoint.outbound));
}
#[test]
@@ -277,16 +272,14 @@ fn malformed_frame_does_not_block_following_valid_packet() {
let (tx_to_endpoint, rx_for_endpoint) = crossbeam_channel::unbounded();
let (tx_unused, _rx_unused) = crossbeam_channel::unbounded();
let hook_id = 42;
let mut endpoint = Endpoint::new(
ENDPOINT_B,
vec![Box::new(CommsLeaf {
tx: tx_unused,
rx: rx_for_endpoint,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
})],
);
let mut endpoint = Endpoint::new(ENDPOINT_B);
let mut comms = CommsLeaf {
tx: tx_unused,
rx: rx_for_endpoint,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
};
endpoint.path = vec![ENDPOINT_A, ENDPOINT_B];
tx_to_endpoint.send(vec![0, 1, 2, 3]).unwrap();
@@ -297,7 +290,7 @@ fn malformed_frame_does_not_block_following_valid_packet() {
.unwrap(),
)
.unwrap();
endpoint.update();
comms.update(&mut endpoint);
let packet = single_inbound_packet(&endpoint, ENDPOINT_B);
assert!(!packet.end_hook);
@@ -309,19 +302,17 @@ fn malformed_frame_does_not_block_following_valid_packet() {
fn forged_frame_without_required_hook_is_dropped_by_comms_leaf() {
let (tx_to_endpoint, rx_for_endpoint) = crossbeam_channel::unbounded();
let (tx_unused, _rx_unused) = crossbeam_channel::unbounded();
let mut endpoint = Endpoint::new(
ENDPOINT_B,
vec![Box::new(CommsLeaf {
tx: tx_unused,
rx: rx_for_endpoint,
remote_id: ENDPOINT_C,
is_authority: false,
started: false,
})],
);
let mut endpoint = Endpoint::new(ENDPOINT_B);
let mut comms = CommsLeaf {
tx: tx_unused,
rx: rx_for_endpoint,
remote_id: ENDPOINT_C,
is_authority: false,
started: false,
};
endpoint.path = vec![ENDPOINT_A, ENDPOINT_B];
endpoint.accept_hook(7, ENDPOINT_C);
endpoint.connections.insert((ENDPOINT_A, true));
endpoint.add_connection(ENDPOINT_A, true);
tx_to_endpoint
.send(
@@ -330,18 +321,18 @@ fn forged_frame_without_required_hook_is_dropped_by_comms_leaf() {
.unwrap(),
)
.unwrap();
endpoint.update();
comms.update(&mut endpoint);
assert_hook_present(&endpoint, 7);
assert!(endpoint.inbound.is_empty());
assert!(endpoint.outbound.is_empty());
assert!(Endpoint::routes_is_empty(&endpoint.inbound));
assert!(Endpoint::routes_is_empty(&endpoint.outbound));
}
#[test]
fn upward_outbound_without_hook_is_rejected() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
endpoint.accept_hook(7, ENDPOINT_A);
endpoint.connections.insert((ENDPOINT_A, true));
endpoint.add_connection(ENDPOINT_A, true);
let new_hook = endpoint.get_hook_id();
@@ -354,13 +345,13 @@ fn upward_outbound_without_hook_is_rejected() {
EndpointError::UnknownHook { hook_id: observed_hook_id } if observed_hook_id == new_hook
));
assert_hook_present(&endpoint, 7);
assert!(endpoint.outbound.is_empty());
assert!(Endpoint::routes_is_empty(&endpoint.outbound));
}
#[test]
fn downward_outbound_without_hook_is_allowed() {
let mut endpoint = endpoint_at(ENDPOINT_A, vec![ENDPOINT_A]);
endpoint.connections.insert((ENDPOINT_B, false));
endpoint.add_connection(ENDPOINT_B, false);
let new_hook = endpoint.get_hook_id();
@@ -368,7 +359,12 @@ fn downward_outbound_without_hook_is_allowed() {
.add_outbound(echo_packet(vec![ENDPOINT_A, ENDPOINT_B], new_hook))
.unwrap();
assert_eq!(endpoint.outbound.get(&ENDPOINT_B).unwrap().len(), 1);
assert_eq!(
Endpoint::route_get(ENDPOINT_B, &endpoint.outbound)
.unwrap()
.len(),
1
);
assert_hook_present(&endpoint, new_hook);
assert_eq!(endpoint.hook_peer(new_hook), Some(ENDPOINT_B));
}
@@ -379,14 +375,14 @@ fn deeper_upward_route_uses_parent_as_next_hop() {
let new_hook = endpoint.get_hook_id();
endpoint.accept_hook(new_hook, ENDPOINT_B);
endpoint.connections.insert((ENDPOINT_B, true));
endpoint.add_connection(ENDPOINT_B, true);
endpoint
.add_outbound(echo_packet_with_end(vec![ENDPOINT_A], new_hook, true))
.unwrap();
assert!(endpoint.outbound.contains_key(&ENDPOINT_B));
assert!(!endpoint.outbound.contains_key(&ENDPOINT_A));
assert!(Endpoint::route_contains(ENDPOINT_B, &endpoint.outbound));
assert!(!Endpoint::route_contains(ENDPOINT_A, &endpoint.outbound));
assert_hook_removed(&endpoint, new_hook);
}
@@ -407,7 +403,7 @@ fn downward_route_without_connection_is_rejected() {
}
));
assert_hook_removed(&endpoint, hook_id);
assert!(endpoint.outbound.is_empty());
assert!(Endpoint::routes_is_empty(&endpoint.outbound));
}
#[test]
@@ -428,7 +424,7 @@ fn upward_route_without_connection_is_rejected_even_with_hook() {
}
));
assert_hook_present(&endpoint, hook_id);
assert!(endpoint.outbound.is_empty());
assert!(Endpoint::routes_is_empty(&endpoint.outbound));
}
#[test]
@@ -436,7 +432,7 @@ fn end_hook_removes_hook_after_packet_is_queued() {
let mut endpoint = endpoint_at(ENDPOINT_B, vec![ENDPOINT_A, ENDPOINT_B]);
let hook_id = endpoint.get_hook_id();
endpoint.accept_hook(hook_id, ENDPOINT_A);
endpoint.connections.insert((ENDPOINT_A, true));
endpoint.add_connection(ENDPOINT_A, true);
endpoint
.add_outbound(echo_packet_with_end(vec![ENDPOINT_A], hook_id, true))
@@ -467,29 +463,29 @@ fn failed_end_hook_route_keeps_hook_state() {
}
));
assert_hook_present(&endpoint, hook_id);
assert!(endpoint.outbound.is_empty());
assert!(Endpoint::routes_is_empty(&endpoint.outbound));
}
#[test]
fn inbound_without_absolute_path_is_rejected() {
let mut endpoint = Endpoint::new(ENDPOINT_A, vec![]);
let mut endpoint = Endpoint::new(ENDPOINT_A);
let error = endpoint
.add_inbound(echo_packet(vec![ENDPOINT_A], 1))
.unwrap_err();
assert!(matches!(error, EndpointError::EndpointPathUnset));
assert!(endpoint.inbound.is_empty());
assert!(Endpoint::routes_is_empty(&endpoint.inbound));
}
#[test]
fn outbound_without_absolute_path_is_rejected() {
let mut endpoint = Endpoint::new(ENDPOINT_A, vec![]);
let mut endpoint = Endpoint::new(ENDPOINT_A);
let error = endpoint
.add_outbound(echo_packet(vec![ENDPOINT_A], 1))
.unwrap_err();
assert!(matches!(error, EndpointError::EndpointPathUnset));
assert!(endpoint.outbound.is_empty());
assert!(Endpoint::routes_is_empty(&endpoint.outbound));
}
+101 -98
View File
@@ -3,7 +3,7 @@ use crate::protocol::{Endpoint, Leaf, Packet};
#[cfg(feature = "interface")]
use crate::protocol::LeafMeta;
use alloc::{boxed::Box, format, vec, vec::Vec};
use alloc::{format, vec, vec::Vec};
use super::support::{CommsLeaf, ENDPOINT_A, ENDPOINT_B, assert_hook_present, assert_hook_removed};
@@ -69,6 +69,20 @@ struct StreamState {
next_index: usize,
}
/// Concrete stream test harness that keeps leaves outside endpoint routing state.
///
/// This mirrors firmware-style ownership: the endpoint only routes packets while the
/// caller, respondent, and connection leaves are updated explicitly in the same
/// order the old boxed endpoint dispatcher used.
struct StreamHarness {
endpoint_a: Endpoint,
endpoint_b: Endpoint,
caller_a: StreamCallerLeaf,
comms_a: CommsLeaf,
respondent_b: StreamRespondentLeaf,
comms_b: CommsLeaf,
}
impl StreamRespondentLeaf {
/// Creates a respondent that will emit `total_packets` stream frames.
fn new(total_packets: usize) -> Self {
@@ -189,66 +203,57 @@ impl StreamRespondentLeaf {
/// Each endpoint has exactly one application leaf and one mock connection leaf. The
/// channel leaves are intentionally the same `CommsLeaf` used by the oneshot tests
/// so stream behavior exercises the same serialization and routing boundary.
fn stream_endpoints(total_packets: usize) -> (Endpoint, Endpoint) {
fn stream_endpoints(total_packets: usize) -> StreamHarness {
let (tx_a, rx_a) = crossbeam_channel::unbounded();
let (tx_b, rx_b) = crossbeam_channel::unbounded();
let mut endpoint_a = Endpoint::new(
ENDPOINT_A,
vec![
Box::new(StreamCallerLeaf { has_run: false }),
Box::new(CommsLeaf {
tx: tx_b,
rx: rx_a,
remote_id: ENDPOINT_B,
is_authority: false,
started: false,
}),
],
);
let mut endpoint_a = Endpoint::new(ENDPOINT_A);
endpoint_a.path = vec![ENDPOINT_A];
let mut endpoint_b = Endpoint::new(
ENDPOINT_B,
vec![
Box::new(StreamRespondentLeaf::new(total_packets)),
Box::new(CommsLeaf {
tx: tx_a,
rx: rx_b,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
}),
],
);
let mut endpoint_b = Endpoint::new(ENDPOINT_B);
endpoint_b.path = vec![ENDPOINT_A, ENDPOINT_B];
// Register routes before the first application packet so leaf order is not a
// hidden prerequisite for the initial request leaving endpoint A.
endpoint_a.connections.insert((ENDPOINT_B, false));
endpoint_b.connections.insert((ENDPOINT_A, true));
endpoint_a.add_connection(ENDPOINT_B, false);
endpoint_b.add_connection(ENDPOINT_A, true);
(endpoint_a, endpoint_b)
StreamHarness {
endpoint_a,
endpoint_b,
caller_a: StreamCallerLeaf { has_run: false },
comms_a: CommsLeaf {
tx: tx_b,
rx: rx_a,
remote_id: ENDPOINT_B,
is_authority: false,
started: false,
},
respondent_b: StreamRespondentLeaf::new(total_packets),
comms_b: CommsLeaf {
tx: tx_a,
rx: rx_b,
remote_id: ENDPOINT_A,
is_authority: true,
started: false,
},
}
}
/// Asserts the requested two-endpoint, four-leaf topology.
fn assert_four_leaf_topology(endpoint_a: &Endpoint, endpoint_b: &Endpoint) {
assert_eq!(
endpoint_a.leaves.len(),
2,
"caller endpoint should have two leaves"
);
assert_eq!(
endpoint_b.leaves.len(),
2,
"respondent endpoint should have two leaves"
);
fn assert_four_leaf_topology(harness: &StreamHarness) {
assert_eq!(harness.caller_a.get_id(), LEAF_STREAM_CALLER);
assert_eq!(harness.comms_a.get_id(), 101);
assert_eq!(harness.respondent_b.get_id(), LEAF_STREAM_RESPONDENT);
assert_eq!(harness.comms_b.get_id(), 101);
}
/// Drives the initial request until it is queued locally on endpoint B.
fn deliver_stream_request(endpoint_a: &mut Endpoint, endpoint_b: &mut Endpoint) {
endpoint_a.update();
endpoint_b.update();
fn deliver_stream_request(harness: &mut StreamHarness) {
harness.caller_a.update(&mut harness.endpoint_a);
harness.comms_a.update(&mut harness.endpoint_a);
harness.respondent_b.update(&mut harness.endpoint_b);
harness.comms_b.update(&mut harness.endpoint_b);
}
/// Returns the single hook opened by the stream request on both endpoints.
@@ -269,15 +274,13 @@ fn opened_stream_hook_id(endpoint_a: &Endpoint, endpoint_b: &Endpoint) -> u16 {
"respondent endpoint should have exactly one stream hook"
);
let (&caller_hook, &caller_peer) = endpoint_a
let &(caller_hook, caller_peer) = endpoint_a
.hooks
.iter()
.next()
.first()
.expect("caller endpoint should expose the opened hook");
let (&respondent_hook, &respondent_peer) = endpoint_b
let &(respondent_hook, respondent_peer) = endpoint_b
.hooks
.iter()
.next()
.first()
.expect("respondent endpoint should expose the opened hook");
assert_eq!(
@@ -297,16 +300,16 @@ fn opened_stream_hook_id(endpoint_a: &Endpoint, endpoint_b: &Endpoint) -> u16 {
}
/// Drives one respondent stream loop and delivers any produced frame to endpoint A.
fn drive_stream_loop(endpoint_a: &mut Endpoint, endpoint_b: &mut Endpoint) {
endpoint_b.update();
endpoint_a.update();
fn drive_stream_loop(harness: &mut StreamHarness) {
harness.respondent_b.update(&mut harness.endpoint_b);
harness.comms_b.update(&mut harness.endpoint_b);
harness.caller_a.update(&mut harness.endpoint_a);
harness.comms_a.update(&mut harness.endpoint_a);
}
/// Returns stream packets that endpoint A has received so far.
fn received_stream_packets(endpoint: &Endpoint) -> Vec<&Packet> {
endpoint
.inbound
.get(&ENDPOINT_A)
Endpoint::route_get(ENDPOINT_A, &endpoint.inbound)
.map(|queue| queue.iter().collect())
.unwrap_or_default()
}
@@ -335,77 +338,77 @@ fn assert_received_stream(
#[test]
fn one_directional_stream_returns_one_packet_per_loop() {
let total_packets = 3;
let (mut endpoint_a, mut endpoint_b) = stream_endpoints(total_packets);
assert_four_leaf_topology(&endpoint_a, &endpoint_b);
let mut harness = stream_endpoints(total_packets);
assert_four_leaf_topology(&harness);
deliver_stream_request(&mut endpoint_a, &mut endpoint_b);
let stream_hook_id = opened_stream_hook_id(&endpoint_a, &endpoint_b);
deliver_stream_request(&mut harness);
let stream_hook_id = opened_stream_hook_id(&harness.endpoint_a, &harness.endpoint_b);
assert_received_stream(&endpoint_a, 0, false, stream_hook_id);
assert_hook_present(&endpoint_a, stream_hook_id);
assert_hook_present(&endpoint_b, stream_hook_id);
assert_received_stream(&harness.endpoint_a, 0, false, stream_hook_id);
assert_hook_present(&harness.endpoint_a, stream_hook_id);
assert_hook_present(&harness.endpoint_b, stream_hook_id);
for index in 0..total_packets {
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
drive_stream_loop(&mut harness);
let final_seen = index + 1 == total_packets;
assert_received_stream(&endpoint_a, index + 1, final_seen, stream_hook_id);
assert_received_stream(&harness.endpoint_a, index + 1, final_seen, stream_hook_id);
if final_seen {
assert_hook_removed(&endpoint_a, stream_hook_id);
assert_hook_removed(&endpoint_b, stream_hook_id);
assert_hook_removed(&harness.endpoint_a, stream_hook_id);
assert_hook_removed(&harness.endpoint_b, stream_hook_id);
} else {
assert_hook_present(&endpoint_a, stream_hook_id);
assert_hook_present(&endpoint_b, stream_hook_id);
assert_hook_present(&harness.endpoint_a, stream_hook_id);
assert_hook_present(&harness.endpoint_b, stream_hook_id);
}
}
}
#[test]
fn stream_does_not_emit_before_request_is_processed_by_respondent() {
let (mut endpoint_a, mut endpoint_b) = stream_endpoints(2);
let mut harness = stream_endpoints(2);
deliver_stream_request(&mut endpoint_a, &mut endpoint_b);
let stream_hook_id = opened_stream_hook_id(&endpoint_a, &endpoint_b);
deliver_stream_request(&mut harness);
let stream_hook_id = opened_stream_hook_id(&harness.endpoint_a, &harness.endpoint_b);
assert_received_stream(&endpoint_a, 0, false, stream_hook_id);
assert!(endpoint_b.outbound.is_empty());
assert_hook_present(&endpoint_a, stream_hook_id);
assert_hook_present(&endpoint_b, stream_hook_id);
assert_received_stream(&harness.endpoint_a, 0, false, stream_hook_id);
assert!(Endpoint::routes_is_empty(&harness.endpoint_b.outbound));
assert_hook_present(&harness.endpoint_a, stream_hook_id);
assert_hook_present(&harness.endpoint_b, stream_hook_id);
}
#[test]
fn stream_stops_after_final_packet() {
let total_packets = 2;
let (mut endpoint_a, mut endpoint_b) = stream_endpoints(total_packets);
let mut harness = stream_endpoints(total_packets);
deliver_stream_request(&mut endpoint_a, &mut endpoint_b);
let stream_hook_id = opened_stream_hook_id(&endpoint_a, &endpoint_b);
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
assert_received_stream(&endpoint_a, total_packets, true, stream_hook_id);
assert_hook_removed(&endpoint_b, stream_hook_id);
deliver_stream_request(&mut harness);
let stream_hook_id = opened_stream_hook_id(&harness.endpoint_a, &harness.endpoint_b);
drive_stream_loop(&mut harness);
drive_stream_loop(&mut harness);
assert_received_stream(&harness.endpoint_a, total_packets, true, stream_hook_id);
assert_hook_removed(&harness.endpoint_b, stream_hook_id);
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
assert_received_stream(&endpoint_a, total_packets, true, stream_hook_id);
assert_hook_removed(&endpoint_b, stream_hook_id);
drive_stream_loop(&mut harness);
assert_received_stream(&harness.endpoint_a, total_packets, true, stream_hook_id);
assert_hook_removed(&harness.endpoint_b, stream_hook_id);
}
#[test]
fn failed_final_stream_route_keeps_hook_and_retries() {
let (mut endpoint_a, mut endpoint_b) = stream_endpoints(1);
let mut harness = stream_endpoints(1);
deliver_stream_request(&mut endpoint_a, &mut endpoint_b);
let stream_hook_id = opened_stream_hook_id(&endpoint_a, &endpoint_b);
endpoint_b.connections.remove(&(ENDPOINT_A, true));
deliver_stream_request(&mut harness);
let stream_hook_id = opened_stream_hook_id(&harness.endpoint_a, &harness.endpoint_b);
harness.endpoint_b.remove_connection(ENDPOINT_A, true);
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
assert_received_stream(&endpoint_a, 0, false, stream_hook_id);
assert_hook_present(&endpoint_b, stream_hook_id);
drive_stream_loop(&mut harness);
assert_received_stream(&harness.endpoint_a, 0, false, stream_hook_id);
assert_hook_present(&harness.endpoint_b, stream_hook_id);
endpoint_b.connections.insert((ENDPOINT_A, true));
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
harness.endpoint_b.add_connection(ENDPOINT_A, true);
drive_stream_loop(&mut harness);
assert_received_stream(&endpoint_a, 1, true, stream_hook_id);
assert_hook_removed(&endpoint_b, stream_hook_id);
assert_received_stream(&harness.endpoint_a, 1, true, stream_hook_id);
assert_hook_removed(&harness.endpoint_b, stream_hook_id);
}
+4 -10
View File
@@ -40,7 +40,7 @@ pub(super) fn echo_packet_with_end(path: Vec<u32>, hook_id: u16, end_hook: bool)
/// connection table, and hook table. This helper keeps that setup explicit without
/// hiding the routing state that each test is validating.
pub(super) fn endpoint_at(id: u32, path: Vec<u32>) -> Endpoint {
let mut endpoint = Endpoint::new(id, vec![]);
let mut endpoint = Endpoint::new(id);
endpoint.path = path;
endpoint
}
@@ -51,9 +51,7 @@ pub(super) fn endpoint_at(id: u32, path: Vec<u32>) -> Endpoint {
/// than the immediate neighbor. Tests use this helper to assert both that exactly one
/// packet exists and that it was queued for the expected adjacent endpoint.
pub(super) fn single_outbound_packet(endpoint: &Endpoint, next_hop: u32) -> &Packet {
let queue = endpoint
.outbound
.get(&next_hop)
let queue = Endpoint::route_get(next_hop, &endpoint.outbound)
.unwrap_or_else(|| panic!("expected one outbound queue for {next_hop}"));
assert_eq!(queue.len(), 1, "expected exactly one outbound packet");
queue.front().unwrap()
@@ -65,9 +63,7 @@ pub(super) fn single_outbound_packet(endpoint: &Endpoint, next_hop: u32) -> &Pac
/// assert against the local inbound queue instead of only checking that routing did
/// not produce an error.
pub(super) fn single_inbound_packet(endpoint: &Endpoint, local_id: u32) -> &Packet {
let queue = endpoint
.inbound
.get(&local_id)
let queue = Endpoint::route_get(local_id, &endpoint.inbound)
.unwrap_or_else(|| panic!("expected one inbound queue for {local_id}"));
assert_eq!(queue.len(), 1, "expected exactly one inbound packet");
queue.front().unwrap()
@@ -154,9 +150,7 @@ impl Leaf for CommsLeaf {
fn update(&mut self, endpoint: &mut Endpoint) {
if !self.started {
endpoint
.connections
.insert((self.remote_id, self.is_authority));
endpoint.add_connection(self.remote_id, self.is_authority);
self.started = true;
}