Add fix for tests.

This commit is contained in:
Michael Mikovsky
2026-06-01 09:21:10 -06:00
parent 966f16008b
commit 08adf12361
2 changed files with 87 additions and 26 deletions
+78 -25
View File
@@ -9,7 +9,6 @@ use super::support::{CommsLeaf, ENDPOINT_A, ENDPOINT_B, assert_hook_present, ass
const LEAF_STREAM_CALLER: u32 = 200;
const LEAF_STREAM_RESPONDENT: u32 = 201;
const STREAM_HOOK_ID: u16 = 0;
/// Builds the initial downwards packet that opens the stream on the respondent.
///
@@ -43,9 +42,9 @@ fn stream_frame_packet(hook_id: u16, index: usize, end_hook: bool) -> Packet {
/// Caller leaf that opens exactly one stream request.
///
/// The first allocated hook id is deterministic in these tests (`0`) because the
/// endpoint starts with no existing hooks. Keeping the caller this small makes the
/// per-loop stream assertions about respondent behavior rather than caller retries.
/// Keeping the caller this small makes the per-loop stream assertions about
/// respondent behavior rather than caller retries. The allocated hook id is read
/// back from endpoint state because the counter may start at a randomized offset.
struct StreamCallerLeaf {
has_run: bool,
}
@@ -252,6 +251,51 @@ fn deliver_stream_request(endpoint_a: &mut Endpoint, endpoint_b: &mut Endpoint)
endpoint_b.update();
}
/// Returns the single hook opened by the stream request on both endpoints.
///
/// The production counter intentionally does not promise that the first hook is
/// zero. Stream tests still need to prove that both endpoints agree on one routed
/// return channel, so this helper validates the topology and returns the actual id
/// allocated by `StreamCallerLeaf`.
fn opened_stream_hook_id(endpoint_a: &Endpoint, endpoint_b: &Endpoint) -> u16 {
assert_eq!(
endpoint_a.hook_count(),
1,
"caller endpoint should have exactly one stream hook"
);
assert_eq!(
endpoint_b.hook_count(),
1,
"respondent endpoint should have exactly one stream hook"
);
let (&caller_hook, &caller_peer) = endpoint_a
.hooks
.iter()
.next()
.expect("caller endpoint should expose the opened hook");
let (&respondent_hook, &respondent_peer) = endpoint_b
.hooks
.iter()
.next()
.expect("respondent endpoint should expose the opened hook");
assert_eq!(
caller_hook, respondent_hook,
"stream endpoints should agree on the hook id"
);
assert_eq!(
caller_peer, ENDPOINT_B,
"caller hook should route stream frames through endpoint B"
);
assert_eq!(
respondent_peer, ENDPOINT_A,
"respondent hook should route stream frames back through endpoint A"
);
caller_hook
}
/// Drives one respondent stream loop and delivers any produced frame to endpoint A.
fn drive_stream_loop(endpoint_a: &mut Endpoint, endpoint_b: &mut Endpoint) {
endpoint_b.update();
@@ -268,12 +312,17 @@ fn received_stream_packets(endpoint: &Endpoint) -> Vec<&Packet> {
}
/// Verifies ordered stream payloads and final-frame markers.
fn assert_received_stream(endpoint: &Endpoint, expected_count: usize, final_seen: bool) {
fn assert_received_stream(
endpoint: &Endpoint,
expected_count: usize,
final_seen: bool,
expected_hook_id: u16,
) {
let packets = received_stream_packets(endpoint);
assert_eq!(packets.len(), expected_count);
for (index, packet) in packets.iter().enumerate() {
assert_eq!(packet.hook_id, STREAM_HOOK_ID);
assert_eq!(packet.hook_id, expected_hook_id);
assert_eq!(packet.data, format!("stream-{index}").as_bytes());
assert_eq!(
packet.end_hook,
@@ -290,23 +339,24 @@ fn one_directional_stream_returns_one_packet_per_loop() {
assert_four_leaf_topology(&endpoint_a, &endpoint_b);
deliver_stream_request(&mut endpoint_a, &mut endpoint_b);
let stream_hook_id = opened_stream_hook_id(&endpoint_a, &endpoint_b);
assert_received_stream(&endpoint_a, 0, false);
assert_hook_present(&endpoint_a, STREAM_HOOK_ID);
assert_hook_present(&endpoint_b, STREAM_HOOK_ID);
assert_received_stream(&endpoint_a, 0, false, stream_hook_id);
assert_hook_present(&endpoint_a, stream_hook_id);
assert_hook_present(&endpoint_b, stream_hook_id);
for index in 0..total_packets {
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
let final_seen = index + 1 == total_packets;
assert_received_stream(&endpoint_a, index + 1, final_seen);
assert_received_stream(&endpoint_a, index + 1, final_seen, stream_hook_id);
if final_seen {
assert_hook_removed(&endpoint_a, STREAM_HOOK_ID);
assert_hook_removed(&endpoint_b, STREAM_HOOK_ID);
assert_hook_removed(&endpoint_a, stream_hook_id);
assert_hook_removed(&endpoint_b, stream_hook_id);
} else {
assert_hook_present(&endpoint_a, STREAM_HOOK_ID);
assert_hook_present(&endpoint_b, STREAM_HOOK_ID);
assert_hook_present(&endpoint_a, stream_hook_id);
assert_hook_present(&endpoint_b, stream_hook_id);
}
}
}
@@ -316,11 +366,12 @@ fn stream_does_not_emit_before_request_is_processed_by_respondent() {
let (mut endpoint_a, mut endpoint_b) = stream_endpoints(2);
deliver_stream_request(&mut endpoint_a, &mut endpoint_b);
let stream_hook_id = opened_stream_hook_id(&endpoint_a, &endpoint_b);
assert_received_stream(&endpoint_a, 0, false);
assert_received_stream(&endpoint_a, 0, false, stream_hook_id);
assert!(endpoint_b.outbound.is_empty());
assert_hook_present(&endpoint_a, STREAM_HOOK_ID);
assert_hook_present(&endpoint_b, STREAM_HOOK_ID);
assert_hook_present(&endpoint_a, stream_hook_id);
assert_hook_present(&endpoint_b, stream_hook_id);
}
#[test]
@@ -329,14 +380,15 @@ fn stream_stops_after_final_packet() {
let (mut endpoint_a, mut endpoint_b) = stream_endpoints(total_packets);
deliver_stream_request(&mut endpoint_a, &mut endpoint_b);
let stream_hook_id = opened_stream_hook_id(&endpoint_a, &endpoint_b);
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
assert_received_stream(&endpoint_a, total_packets, true);
assert_hook_removed(&endpoint_b, STREAM_HOOK_ID);
assert_received_stream(&endpoint_a, total_packets, true, stream_hook_id);
assert_hook_removed(&endpoint_b, stream_hook_id);
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
assert_received_stream(&endpoint_a, total_packets, true);
assert_hook_removed(&endpoint_b, STREAM_HOOK_ID);
assert_received_stream(&endpoint_a, total_packets, true, stream_hook_id);
assert_hook_removed(&endpoint_b, stream_hook_id);
}
#[test]
@@ -344,15 +396,16 @@ fn failed_final_stream_route_keeps_hook_and_retries() {
let (mut endpoint_a, mut endpoint_b) = stream_endpoints(1);
deliver_stream_request(&mut endpoint_a, &mut endpoint_b);
let stream_hook_id = opened_stream_hook_id(&endpoint_a, &endpoint_b);
endpoint_b.connections.remove(&(ENDPOINT_A, true));
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
assert_received_stream(&endpoint_a, 0, false);
assert_hook_present(&endpoint_b, STREAM_HOOK_ID);
assert_received_stream(&endpoint_a, 0, false, stream_hook_id);
assert_hook_present(&endpoint_b, stream_hook_id);
endpoint_b.connections.insert((ENDPOINT_A, true));
drive_stream_loop(&mut endpoint_a, &mut endpoint_b);
assert_received_stream(&endpoint_a, 1, true);
assert_hook_removed(&endpoint_b, STREAM_HOOK_ID);
assert_received_stream(&endpoint_a, 1, true, stream_hook_id);
assert_hook_removed(&endpoint_b, stream_hook_id);
}