Redesign interface event ownership.

This commit is contained in:
Michael Mikovsky
2026-06-01 09:54:37 -06:00
parent 5597ca2fef
commit aa1e9be696
16 changed files with 882 additions and 368 deletions
+3 -3
View File
@@ -19,10 +19,10 @@ pub fn feistel_shuffle(index: u16, seed: u32) -> u16 {
.rotate_left(rot_amount)
.wrapping_add(round.wrapping_mul(0x9E3779B9));
// Round function F: Simple multiplicative hash mixing R and sub_key
// We cast to u32 for multiplication to avoid overflow, then mask back to 8 bits
// Round function F: Simple multiplicative hash mixing R and sub_key.
// Casting to u8 keeps the low byte, which is the half-block width here.
let r_u32 = r as u32;
let hash_val = ((r_u32.wrapping_mul(sub_key)) ^ (r_u32 >> 4)) as u8 & 0xFF;
let hash_val = ((r_u32.wrapping_mul(sub_key)) ^ (r_u32 >> 4)) as u8;
// Feistel step: New L = Old R, New R = Old L XOR F(R, key)
let temp = l;
+55 -2
View File
@@ -1,12 +1,38 @@
use crate::crypto::feistel_shuffle;
#[cfg(feature = "counter_shuffle_none")]
/// Counter implementation selected by feature flags.
///
/// Cargo's `--all-features` enables every counter strategy at once, so these cfgs are
/// intentionally priority-ordered instead of mutually exclusive aliases. The strongest
/// configured shuffle wins: Feistel+LCG, then Feistel, then the linear fallback.
#[cfg(all(
feature = "counter_shuffle_none",
not(any(
feature = "counter_shuffle_feistel",
feature = "counter_shuffle_feistel_lcg"
))
))]
pub type Counter = NoShuffle;
#[cfg(feature = "counter_shuffle_feistel")]
/// Counter implementation selected when Feistel is enabled without Feistel+LCG.
#[cfg(all(
feature = "counter_shuffle_feistel",
not(feature = "counter_shuffle_feistel_lcg")
))]
pub type Counter = FeistelShuffle;
/// Default and strongest counter implementation.
#[cfg(feature = "counter_shuffle_feistel_lcg")]
pub type Counter = FeistelLCGShuffle;
/// Fallback used only when all counter shuffle features are disabled.
#[cfg(not(any(
feature = "counter_shuffle_none",
feature = "counter_shuffle_feistel",
feature = "counter_shuffle_feistel_lcg"
)))]
pub type Counter = NoShuffle;
const NONCE16_1: u16 = const_random::const_random!(u16);
const NONCE16_2: u16 = const_random::const_random!(u16);
const NONCE32: u32 = const_random::const_random!(u32);
@@ -27,12 +53,21 @@ impl NoShuffle {
Self(NONCE16_1)
}
// This is an id generator API, not an iterator: callers need a bare `u16` and no
// exhaustion state because the counter intentionally wraps through the full space.
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> u16 {
self.0 = self.0.wrapping_add(1);
self.0
}
}
impl Default for NoShuffle {
fn default() -> Self {
Self::new()
}
}
/// Shuffle all 16 bit numbers, an actual shuffle
/// But this still stores local values in a linear format
pub struct FeistelShuffle(u16, u32);
@@ -42,12 +77,21 @@ impl FeistelShuffle {
Self(NONCE16_1, NONCE32)
}
// This is an id generator API, not an iterator: callers need a bare `u16` and no
// exhaustion state because the counter intentionally wraps through the full space.
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> u16 {
self.0 = self.0.wrapping_add(FEISTEL_STEP);
feistel_shuffle(self.0, self.1)
}
}
impl Default for FeistelShuffle {
fn default() -> Self {
Self::new()
}
}
/// Linear recursive shuffle,
/// feeds back into itself and doesn't store the actual state.
/// Harder to decompile
@@ -65,6 +109,9 @@ impl FeistelLCGShuffle {
Self { state: 0, a, c }
}
// This is an id generator API, not an iterator: callers need a bare `u16` and no
// exhaustion state because the counter intentionally wraps through the full space.
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> u16 {
// 1. Advance state using LCG (Guarantees single cycle of 65536)
self.state = self.state.wrapping_mul(self.a).wrapping_add(self.c);
@@ -73,3 +120,9 @@ impl FeistelLCGShuffle {
feistel_shuffle(self.state, self.a as u32)
}
}
impl Default for FeistelLCGShuffle {
fn default() -> Self {
Self::new()
}
}
+1 -1
View File
@@ -107,7 +107,7 @@ const fn compress(state: &mut [u32; 8], block: &[u8; 64]) {
/// Returns the SHA-256 digest of `input` as 32 raw bytes.
pub const fn sha256(input: &[u8]) -> [u8; 32] {
// Padded length is the next multiple of 64 that fits input + 1 (0x80) + 8 (length).
let padded_len = ((input.len() + 9 + 63) / 64) * 64;
let padded_len = (input.len() + 9).div_ceil(64) * 64;
let mut state = H;
let mut block_start = 0;
+3
View File
@@ -6,9 +6,12 @@
mod event;
mod key;
mod store;
mod target;
mod view;
pub use event::{InterfaceEvent, InterfaceEventKind};
pub use key::{ProcedureKey, SessionKey};
pub use store::InterfaceStore;
pub use view::{ProcedureView, SessionView, SessionViewStatus};
pub(crate) use target::InterfaceTarget;
+185 -86
View File
@@ -2,8 +2,8 @@ use alloc::{collections::BTreeMap, vec::Vec};
use crate::{
interface::{
InterfaceEvent, InterfaceEventKind, ProcedureKey, ProcedureView, SessionKey, SessionView,
SessionViewStatus,
InterfaceEvent, InterfaceEventKind, InterfaceTarget, ProcedureKey, ProcedureView,
SessionKey, SessionView, SessionViewStatus,
},
protocol::{EndpointError, HookID, Packet, SessionStatus},
};
@@ -15,7 +15,6 @@ use crate::{
/// itself stays with the renderer or application shell so protocol state remains
/// headless and reusable.
pub struct InterfaceStore {
next_sequence: u64,
now_ns: Option<u64>,
events: Vec<InterfaceEvent>,
sessions: BTreeMap<SessionKey, SessionView>,
@@ -26,7 +25,6 @@ impl InterfaceStore {
/// Creates an empty caller-owned interface store.
pub fn new() -> Self {
Self {
next_sequence: 0,
now_ns: None,
events: Vec::new(),
sessions: BTreeMap::new(),
@@ -70,30 +68,32 @@ impl InterfaceStore {
procedure_id: u32,
hook_id: HookID,
) -> &mut SessionView {
self.sessions
.entry(SessionKey {
leaf_id,
procedure_id,
hook_id,
})
.or_insert_with(SessionView::new)
self.session_view_for_key_mut(SessionKey {
leaf_id,
procedure_id,
hook_id,
})
}
/// Returns or creates the view for a one-shot procedure family.
pub fn procedure_view_mut(&mut self, leaf_id: u32, procedure_id: u32) -> &mut ProcedureView {
self.procedures
.entry(ProcedureKey {
leaf_id,
procedure_id,
})
.or_insert_with(ProcedureView::new)
self.procedure_view_for_key_mut(ProcedureKey {
leaf_id,
procedure_id,
})
}
/// Records a packet delivered to a generated leaf.
pub fn record_inbound(&mut self, leaf_id: u32, packet: &Packet) {
self.push_packet_event(
leaf_id,
packet,
let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id);
self.record_inbound_for(target, packet);
}
/// Records a packet delivered to a target already known by generated runtime code.
pub(crate) fn record_inbound_for(&mut self, target: InterfaceTarget, packet: &Packet) {
self.record(
target,
None,
InterfaceEventKind::Inbound {
packet: packet.clone(),
},
@@ -107,14 +107,25 @@ impl InterfaceStore {
procedure_id: u32,
hook_id: HookID,
) {
self.push_session_event(
self.record_session_packet_queued_for(InterfaceTarget::session(
leaf_id,
procedure_id,
hook_id,
));
}
/// Records that a packet was queued for an existing session inbox.
pub(crate) fn record_session_packet_queued_for(&mut self, target: InterfaceTarget) {
let InterfaceTarget::Session(key) = target else {
return;
};
self.record(
target,
None,
InterfaceEventKind::SessionPacketQueued {
procedure_id,
hook_id,
procedure_id: key.procedure_id,
hook_id: key.hook_id,
},
);
}
@@ -127,14 +138,26 @@ impl InterfaceStore {
hook_id: HookID,
started_ns: Option<u64>,
) {
self.push_session_event(
leaf_id,
procedure_id,
hook_id,
let target = InterfaceTarget::session(leaf_id, procedure_id, hook_id);
self.record_session_created_for(target, started_ns);
}
/// Records successful creation of a new session state for an explicit target.
pub(crate) fn record_session_created_for(
&mut self,
target: InterfaceTarget,
started_ns: Option<u64>,
) {
let InterfaceTarget::Session(key) = target else {
return;
};
self.record(
target,
Some(SessionViewStatus::Running),
InterfaceEventKind::SessionCreated {
procedure_id,
hook_id,
procedure_id: key.procedure_id,
hook_id: key.hook_id,
started_ns,
finished_ns: self.now_ns,
},
@@ -149,14 +172,26 @@ impl InterfaceStore {
hook_id: HookID,
started_ns: Option<u64>,
) {
self.push_session_event(
leaf_id,
procedure_id,
hook_id,
let target = InterfaceTarget::session(leaf_id, procedure_id, hook_id);
self.record_session_rejected_for(target, started_ns);
}
/// Records rejection of a packet that could not create a session.
pub(crate) fn record_session_rejected_for(
&mut self,
target: InterfaceTarget,
started_ns: Option<u64>,
) {
let InterfaceTarget::Session(key) = target else {
return;
};
self.record(
target,
Some(SessionViewStatus::Rejected),
InterfaceEventKind::SessionRejected {
procedure_id,
hook_id,
procedure_id: key.procedure_id,
hook_id: key.hook_id,
started_ns,
finished_ns: self.now_ns,
},
@@ -172,14 +207,27 @@ impl InterfaceStore {
status: SessionStatus,
started_ns: Option<u64>,
) {
self.push_session_event(
leaf_id,
procedure_id,
hook_id,
let target = InterfaceTarget::session(leaf_id, procedure_id, hook_id);
self.record_session_update_for(target, status, started_ns);
}
/// Records one session update tick for an explicit session target.
pub(crate) fn record_session_update_for(
&mut self,
target: InterfaceTarget,
status: SessionStatus,
started_ns: Option<u64>,
) {
let InterfaceTarget::Session(key) = target else {
return;
};
self.record(
target,
Some(SessionViewStatus::from_session_status(status)),
InterfaceEventKind::SessionUpdated {
procedure_id,
hook_id,
procedure_id: key.procedure_id,
hook_id: key.hook_id,
status,
started_ns,
finished_ns: self.now_ns,
@@ -195,11 +243,29 @@ impl InterfaceStore {
hook_id: HookID,
started_ns: Option<u64>,
) {
self.push_procedure_event(
leaf_id,
procedure_id,
self.record_procedure_call_for(
InterfaceTarget::procedure(leaf_id, procedure_id),
hook_id,
started_ns,
);
}
/// Records one procedure call for an explicit procedure target.
pub(crate) fn record_procedure_call_for(
&mut self,
target: InterfaceTarget,
hook_id: HookID,
started_ns: Option<u64>,
) {
let InterfaceTarget::Procedure(key) = target else {
return;
};
self.record(
target,
None,
InterfaceEventKind::ProcedureCalled {
procedure_id,
procedure_id: key.procedure_id,
hook_id,
started_ns,
finished_ns: self.now_ns,
@@ -209,9 +275,15 @@ impl InterfaceStore {
/// Records a packet emitted by leaf logic before route retry handling.
pub fn record_outbound_queued(&mut self, leaf_id: u32, packet: &Packet) {
self.push_packet_event(
leaf_id,
packet,
let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id);
self.record_outbound_queued_for(target, packet);
}
/// Records a packet emitted by leaf logic before route retry handling.
pub(crate) fn record_outbound_queued_for(&mut self, target: InterfaceTarget, packet: &Packet) {
self.record(
target,
None,
InterfaceEventKind::OutboundQueued {
packet: packet.clone(),
},
@@ -220,9 +292,15 @@ impl InterfaceStore {
/// Records a route attempt for a queued outbound packet.
pub fn record_route_attempt(&mut self, leaf_id: u32, packet: &Packet) {
self.push_packet_event(
leaf_id,
packet,
let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id);
self.record_route_attempt_for(target, packet);
}
/// Records a route attempt for a queued outbound packet.
pub(crate) fn record_route_attempt_for(&mut self, target: InterfaceTarget, packet: &Packet) {
self.record(
target,
None,
InterfaceEventKind::RouteAttempt {
packet: packet.clone(),
},
@@ -231,9 +309,15 @@ impl InterfaceStore {
/// Records a successful route attempt.
pub fn record_route_success(&mut self, leaf_id: u32, packet: &Packet) {
self.push_packet_event(
leaf_id,
packet,
let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id);
self.record_route_success_for(target, packet);
}
/// Records a successful route attempt.
pub(crate) fn record_route_success_for(&mut self, target: InterfaceTarget, packet: &Packet) {
self.record(
target,
None,
InterfaceEventKind::RouteSuccess {
packet: packet.clone(),
},
@@ -242,9 +326,20 @@ impl InterfaceStore {
/// Records a failed route attempt without removing the packet from retry state.
pub fn record_route_failure(&mut self, leaf_id: u32, packet: &Packet, error: EndpointError) {
self.push_packet_event(
leaf_id,
packet,
let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id);
self.record_route_failure_for(target, packet, error);
}
/// Records a failed route attempt without removing the packet from retry state.
pub(crate) fn record_route_failure_for(
&mut self,
target: InterfaceTarget,
packet: &Packet,
error: EndpointError,
) {
self.record(
target,
None,
InterfaceEventKind::RouteFailure {
packet: packet.clone(),
error,
@@ -252,43 +347,43 @@ impl InterfaceStore {
);
}
fn push_packet_event(&mut self, leaf_id: u32, packet: &Packet, kind: InterfaceEventKind) {
let index = self.push_event(leaf_id, kind);
self.link_packet_event(leaf_id, packet, index);
}
fn push_session_event(
fn record(
&mut self,
leaf_id: u32,
procedure_id: u32,
hook_id: HookID,
target: InterfaceTarget,
status: Option<SessionViewStatus>,
kind: InterfaceEventKind,
) {
let index = self.push_event(leaf_id, kind);
let view = self.session_view_mut(leaf_id, procedure_id, hook_id);
if let Some(status) = status {
view.status = status;
}
view.events.push(index);
let index = self.push_event(target.leaf_id(), kind);
self.link_event(target, status, index);
}
fn push_procedure_event(&mut self, leaf_id: u32, procedure_id: u32, kind: InterfaceEventKind) {
let index = self.push_event(leaf_id, kind);
self.procedure_view_mut(leaf_id, procedure_id)
.events
.push(index);
fn link_event(
&mut self,
target: InterfaceTarget,
status: Option<SessionViewStatus>,
index: usize,
) {
match target {
InterfaceTarget::Session(key) => {
let view = self.session_view_for_key_mut(key);
if let Some(status) = status {
view.status = status;
}
view.events.push(index);
}
InterfaceTarget::Procedure(key) => {
self.procedure_view_for_key_mut(key).events.push(index);
}
}
}
fn push_event(&mut self, leaf_id: u32, kind: InterfaceEventKind) -> usize {
let sequence = self.next_sequence;
self.next_sequence = self.next_sequence.wrapping_add(1);
let index = self.events.len();
self.events.push(InterfaceEvent {
sequence,
sequence: index as u64,
time_ns: self.now_ns,
leaf_id,
kind,
@@ -297,10 +392,14 @@ impl InterfaceStore {
index
}
fn link_packet_event(&mut self, leaf_id: u32, packet: &Packet, index: usize) {
self.session_view_mut(leaf_id, packet.procedure_id, packet.hook_id)
.events
.push(index);
fn session_view_for_key_mut(&mut self, key: SessionKey) -> &mut SessionView {
self.sessions.entry(key).or_insert_with(SessionView::new)
}
fn procedure_view_for_key_mut(&mut self, key: ProcedureKey) -> &mut ProcedureView {
self.procedures
.entry(key)
.or_insert_with(ProcedureView::new)
}
}
+46
View File
@@ -0,0 +1,46 @@
use crate::{
interface::{ProcedureKey, SessionKey},
protocol::HookID,
};
/// Internal owner for one interface event.
///
/// The runtime already knows whether a packet belongs to a hook-backed session or a
/// one-shot procedure. Keeping that answer explicit avoids reconstructing ownership
/// from packet fields later, which is what made procedure packet flow look like fake
/// session activity in the previous store implementation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum InterfaceTarget {
/// Event belongs to one hook-backed session instance.
Session(SessionKey),
/// Event belongs to one one-shot procedure family.
Procedure(ProcedureKey),
}
impl InterfaceTarget {
/// Builds a session target from the same pieces exposed by [`SessionKey`].
pub(crate) fn session(leaf_id: u32, procedure_id: u32, hook_id: HookID) -> Self {
Self::Session(SessionKey {
leaf_id,
procedure_id,
hook_id,
})
}
/// Builds a procedure target from the same pieces exposed by [`ProcedureKey`].
pub(crate) fn procedure(leaf_id: u32, procedure_id: u32) -> Self {
Self::Procedure(ProcedureKey {
leaf_id,
procedure_id,
})
}
/// Returns the leaf id used on the append-only event record.
pub(crate) fn leaf_id(self) -> u32 {
match self {
Self::Session(key) => key.leaf_id,
Self::Procedure(key) => key.leaf_id,
}
}
}
+150 -50
View File
@@ -1,5 +1,7 @@
use alloc::collections::VecDeque;
use crate::{
interface::InterfaceStore,
interface::{InterfaceStore, InterfaceTarget},
protocol::{
Endpoint, Packet, PacketQueue, Procedure, ProcedureOut, Session, SessionCtx, SessionEntry,
SessionFamily, SessionInit, SessionInitResult, SessionStatus,
@@ -12,25 +14,49 @@ use crate::{
/// session initialization responses and one-shot procedures, both of which need the
/// same retry semantics as session output without becoming separate framework types.
pub struct LeafOutbox {
packets: PacketQueue,
packets: VecDeque<LeafOutboxEntry>,
}
/// One packet retained by a leaf-level retry queue.
///
/// Session entry outboxes have an obvious owner from their surrounding session entry.
/// Leaf-level outboxes are mixed: rejected session initialization packets and one-shot
/// procedure responses both land here. Storing the owner beside the packet keeps route
/// logging precise without exposing another public queue type.
#[derive(Clone)]
struct LeafOutboxEntry {
packet: Packet,
target: LeafOutboxTarget,
}
/// Interface owner attached to a leaf-level outbox entry.
#[derive(Clone, Copy)]
enum LeafOutboxTarget {
/// Compatibility path for packets queued through the public `push`/`extend` API.
InferFromPacket,
/// Runtime-known session or procedure target.
Explicit(InterfaceTarget),
}
impl LeafOutbox {
/// Creates an empty leaf-level outbox.
pub fn new() -> Self {
Self {
packets: PacketQueue::new(),
packets: VecDeque::new(),
}
}
/// Adds one packet to the retry queue.
pub fn push(&mut self, packet: Packet) {
self.packets.push_back(packet);
self.push_with_target(packet, LeafOutboxTarget::InferFromPacket);
}
/// Adds all packets from `packets` in FIFO order.
pub fn extend(&mut self, packets: PacketQueue) {
self.packets.extend(packets);
for packet in packets {
self.push(packet);
}
}
/// Returns the number of queued packets.
@@ -42,6 +68,22 @@ impl LeafOutbox {
pub fn is_empty(&self) -> bool {
self.packets.is_empty()
}
/// Adds one packet with a runtime-known interface target.
pub(crate) fn push_for_target(&mut self, packet: Packet, target: InterfaceTarget) {
self.push_with_target(packet, LeafOutboxTarget::Explicit(target));
}
/// Adds all packets with the same runtime-known interface target.
pub(crate) fn extend_for_target(&mut self, packets: PacketQueue, target: InterfaceTarget) {
for packet in packets {
self.push_for_target(packet, target);
}
}
fn push_with_target(&mut self, packet: Packet, target: LeafOutboxTarget) {
self.packets.push_back(LeafOutboxEntry { packet, target });
}
}
impl Default for LeafOutbox {
@@ -67,9 +109,10 @@ pub fn dispatch_session<L, S>(
{
let hook_id = packet.hook_id;
let procedure_id = S::PROCEDURE_ID;
let target = InterfaceTarget::session(leaf_id, procedure_id, hook_id);
if let Some(store) = interface.as_mut() {
store.record_inbound(leaf_id, &packet);
store.record_inbound_for(target, &packet);
}
if let Some(entry) = family
@@ -80,7 +123,7 @@ pub fn dispatch_session<L, S>(
entry.inbox.push_back(packet);
if let Some(store) = interface.as_mut() {
store.record_session_packet_queued(leaf_id, procedure_id, hook_id);
store.record_session_packet_queued_for(target);
}
return;
@@ -95,21 +138,21 @@ pub fn dispatch_session<L, S>(
family.entries.push(SessionEntry::new(hook_id, state));
if let Some(store) = interface.as_mut() {
store.record_session_created(leaf_id, procedure_id, hook_id, started_ns);
store.record_session_created_for(target, started_ns);
}
}
SessionInitResult::Rejected => {
if let Some(store) = interface.as_mut() {
store.record_session_rejected(leaf_id, procedure_id, hook_id, started_ns);
store.record_session_rejected_for(target, started_ns);
}
}
SessionInitResult::RejectedWith(packet) => {
if let Some(store) = interface.as_mut() {
store.record_session_rejected(leaf_id, procedure_id, hook_id, started_ns);
store.record_outbound_queued(leaf_id, &packet);
store.record_session_rejected_for(target, started_ns);
store.record_outbound_queued_for(target, &packet);
}
outbox.push(packet);
outbox.push_for_target(packet, target);
}
}
}
@@ -129,23 +172,26 @@ pub fn update_session_family<L, S>(
}
let started_ns = interface.as_ref().and_then(|store| store.now_ns());
let outbox_start = entry.outbox.len();
let reply_path = S::reply_path(&entry.state).to_vec();
let mut ctx = SessionCtx::new(
entry.hook_id,
reply_path,
S::PROCEDURE_ID,
&mut entry.outbox,
);
let status = S::update(leaf, &mut entry.state, &mut entry.inbox, &mut ctx);
let status = {
let mut ctx = SessionCtx::new(
entry.hook_id,
reply_path,
S::PROCEDURE_ID,
&mut entry.outbox,
);
S::update(leaf, &mut entry.state, &mut entry.inbox, &mut ctx)
};
let target = InterfaceTarget::session(leaf_id, S::PROCEDURE_ID, entry.hook_id);
if let Some(store) = interface.as_mut() {
store.record_session_update(
leaf_id,
S::PROCEDURE_ID,
entry.hook_id,
status,
started_ns,
);
store.record_session_update_for(target, status, started_ns);
for packet in entry.outbox.iter().skip(outbox_start) {
store.record_outbound_queued_for(target, packet);
}
}
if matches!(status, SessionStatus::Closed) {
@@ -166,9 +212,10 @@ pub fn dispatch_procedure<L, P>(
P: Procedure<L>,
{
let started_ns = interface.as_ref().and_then(|store| store.now_ns());
let target = InterfaceTarget::procedure(leaf_id, P::PROCEDURE_ID);
if let Some(store) = interface.as_mut() {
store.record_inbound(leaf_id, &packet);
store.record_inbound_for(target, &packet);
}
let hook_id = packet.hook_id;
@@ -180,14 +227,14 @@ pub fn dispatch_procedure<L, P>(
let packets = procedure_out.into_packets();
if let Some(store) = interface.as_mut() {
store.record_procedure_call(leaf_id, P::PROCEDURE_ID, hook_id, started_ns);
store.record_procedure_call_for(target, hook_id, started_ns);
for packet in &packets {
store.record_outbound_queued(leaf_id, packet);
store.record_outbound_queued_for(target, packet);
}
}
outbox.extend(packets);
outbox.extend_for_target(packets, target);
}
/// Flushes a generated leaf-level outbox through endpoint routing.
@@ -197,7 +244,17 @@ pub fn flush_leaf_outbox(
outbox: &mut LeafOutbox,
interface: &mut Option<&mut InterfaceStore>,
) -> bool {
flush_packet_queue_with_interface(endpoint, leaf_id, &mut outbox.packets, interface)
while let Some(entry) = outbox.packets.front().cloned() {
let target = resolve_leaf_outbox_target(leaf_id, &entry);
if !flush_packet_with_target(endpoint, target, &entry.packet, interface) {
return false;
}
outbox.packets.pop_front();
}
true
}
/// Flushes and retains one generated session family.
@@ -210,7 +267,8 @@ pub fn flush_session_family<L, S>(
S: Session<L>,
{
for entry in &mut family.entries {
flush_packet_queue_with_interface(endpoint, leaf_id, &mut entry.outbox, interface);
let target = InterfaceTarget::session(leaf_id, S::PROCEDURE_ID, entry.hook_id);
flush_packet_queue_with_target(endpoint, target, &mut entry.outbox, interface);
}
family
@@ -230,31 +288,73 @@ pub fn flush_packet_queue_with_interface(
interface: &mut Option<&mut InterfaceStore>,
) -> bool {
while let Some(packet) = outbox.front().cloned() {
if let Some(store) = interface.as_mut() {
store.record_route_attempt(leaf_id, &packet);
let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id);
if !flush_packet_with_target(endpoint, target, &packet, interface) {
return false;
}
match endpoint.add_outbound(packet.clone()) {
Ok(()) => {
if let Some(store) = interface.as_mut() {
store.record_route_success(leaf_id, &packet);
}
outbox.pop_front();
}
Err(error) => {
if let Some(store) = interface.as_mut() {
store.record_route_failure(leaf_id, &packet, error);
}
return false;
}
}
outbox.pop_front();
}
true
}
/// Flushes a packet queue whose owner is already known by the generated runtime.
fn flush_packet_queue_with_target(
endpoint: &mut Endpoint,
target: InterfaceTarget,
outbox: &mut PacketQueue,
interface: &mut Option<&mut InterfaceStore>,
) -> bool {
while let Some(packet) = outbox.front().cloned() {
if !flush_packet_with_target(endpoint, target, &packet, interface) {
return false;
}
outbox.pop_front();
}
true
}
fn flush_packet_with_target(
endpoint: &mut Endpoint,
target: InterfaceTarget,
packet: &Packet,
interface: &mut Option<&mut InterfaceStore>,
) -> bool {
if let Some(store) = interface.as_mut() {
store.record_route_attempt_for(target, packet);
}
match endpoint.add_outbound(packet.clone()) {
Ok(()) => {
if let Some(store) = interface.as_mut() {
store.record_route_success_for(target, packet);
}
true
}
Err(error) => {
if let Some(store) = interface.as_mut() {
store.record_route_failure_for(target, packet, error);
}
false
}
}
}
fn resolve_leaf_outbox_target(leaf_id: u32, entry: &LeafOutboxEntry) -> InterfaceTarget {
match entry.target {
LeafOutboxTarget::InferFromPacket => {
InterfaceTarget::session(leaf_id, entry.packet.procedure_id, entry.packet.hook_id)
}
LeafOutboxTarget::Explicit(target) => target,
}
}
/// Returns the path used by generated procedure responses.
fn parent_reply_path(endpoint: &Endpoint) -> alloc::vec::Vec<u32> {
if endpoint.path.len() > 1 {