Simplify interface event recording.

This commit is contained in:
Michael Mikovsky
2026-06-01 10:27:59 -06:00
parent aa1e9be696
commit 8a817cb5eb
4 changed files with 220 additions and 263 deletions
+1 -2
View File
@@ -6,7 +6,6 @@
mod event; mod event;
mod key; mod key;
mod store; mod store;
mod target;
mod view; mod view;
pub use event::{InterfaceEvent, InterfaceEventKind}; pub use event::{InterfaceEvent, InterfaceEventKind};
@@ -14,4 +13,4 @@ pub use key::{ProcedureKey, SessionKey};
pub use store::InterfaceStore; pub use store::InterfaceStore;
pub use view::{ProcedureView, SessionView, SessionViewStatus}; pub use view::{ProcedureView, SessionView, SessionViewStatus};
pub(crate) use target::InterfaceTarget; pub(crate) use store::InterfaceTarget;
+83 -152
View File
@@ -2,12 +2,54 @@ use alloc::{collections::BTreeMap, vec::Vec};
use crate::{ use crate::{
interface::{ interface::{
InterfaceEvent, InterfaceEventKind, InterfaceTarget, ProcedureKey, ProcedureView, InterfaceEvent, InterfaceEventKind, ProcedureKey, ProcedureView, SessionKey, SessionView,
SessionKey, SessionView, SessionViewStatus, SessionViewStatus,
}, },
protocol::{EndpointError, HookID, Packet, SessionStatus}, protocol::{EndpointError, HookID, Packet, SessionStatus},
}; };
/// Internal owner for one interface event.
///
/// The runtime already knows whether a packet belongs to a hook-backed session or a
/// one-shot procedure. Keeping that answer explicit avoids reconstructing ownership
/// from packet fields later, which is what made procedure packet flow look like fake
/// session activity in the previous store implementation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum InterfaceTarget {
/// Event belongs to one hook-backed session instance.
Session(SessionKey),
/// Event belongs to one one-shot procedure family.
Procedure(ProcedureKey),
}
impl InterfaceTarget {
/// Builds a session target from the same pieces exposed by [`SessionKey`].
pub(crate) fn session(leaf_id: u32, procedure_id: u32, hook_id: HookID) -> Self {
Self::Session(SessionKey {
leaf_id,
procedure_id,
hook_id,
})
}
/// Builds a procedure target from the same pieces exposed by [`ProcedureKey`].
pub(crate) fn procedure(leaf_id: u32, procedure_id: u32) -> Self {
Self::Procedure(ProcedureKey {
leaf_id,
procedure_id,
})
}
/// Returns the leaf id used on the append-only event record.
pub(crate) fn leaf_id(self) -> u32 {
match self {
Self::Session(key) => key.leaf_id,
Self::Procedure(key) => key.leaf_id,
}
}
}
/// Caller-owned view and packet-flow store for interface frontends. /// Caller-owned view and packet-flow store for interface frontends.
/// ///
/// Generated leaves receive a mutable reference to this store during interface-aware /// Generated leaves receive a mutable reference to this store during interface-aware
@@ -86,14 +128,8 @@ impl InterfaceStore {
/// Records a packet delivered to a generated leaf. /// Records a packet delivered to a generated leaf.
pub fn record_inbound(&mut self, leaf_id: u32, packet: &Packet) { pub fn record_inbound(&mut self, leaf_id: u32, packet: &Packet) {
let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id); let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id);
self.record_inbound_for(target, packet); self.record_for(
}
/// Records a packet delivered to a target already known by generated runtime code.
pub(crate) fn record_inbound_for(&mut self, target: InterfaceTarget, packet: &Packet) {
self.record(
target, target,
None,
InterfaceEventKind::Inbound { InterfaceEventKind::Inbound {
packet: packet.clone(), packet: packet.clone(),
}, },
@@ -107,25 +143,11 @@ impl InterfaceStore {
procedure_id: u32, procedure_id: u32,
hook_id: HookID, hook_id: HookID,
) { ) {
self.record_session_packet_queued_for(InterfaceTarget::session( self.record_for(
leaf_id, InterfaceTarget::session(leaf_id, procedure_id, hook_id),
procedure_id,
hook_id,
));
}
/// Records that a packet was queued for an existing session inbox.
pub(crate) fn record_session_packet_queued_for(&mut self, target: InterfaceTarget) {
let InterfaceTarget::Session(key) = target else {
return;
};
self.record(
target,
None,
InterfaceEventKind::SessionPacketQueued { InterfaceEventKind::SessionPacketQueued {
procedure_id: key.procedure_id, procedure_id,
hook_id: key.hook_id, hook_id,
}, },
); );
} }
@@ -138,26 +160,11 @@ impl InterfaceStore {
hook_id: HookID, hook_id: HookID,
started_ns: Option<u64>, started_ns: Option<u64>,
) { ) {
let target = InterfaceTarget::session(leaf_id, procedure_id, hook_id); self.record_for(
self.record_session_created_for(target, started_ns); InterfaceTarget::session(leaf_id, procedure_id, hook_id),
}
/// Records successful creation of a new session state for an explicit target.
pub(crate) fn record_session_created_for(
&mut self,
target: InterfaceTarget,
started_ns: Option<u64>,
) {
let InterfaceTarget::Session(key) = target else {
return;
};
self.record(
target,
Some(SessionViewStatus::Running),
InterfaceEventKind::SessionCreated { InterfaceEventKind::SessionCreated {
procedure_id: key.procedure_id, procedure_id,
hook_id: key.hook_id, hook_id,
started_ns, started_ns,
finished_ns: self.now_ns, finished_ns: self.now_ns,
}, },
@@ -172,26 +179,11 @@ impl InterfaceStore {
hook_id: HookID, hook_id: HookID,
started_ns: Option<u64>, started_ns: Option<u64>,
) { ) {
let target = InterfaceTarget::session(leaf_id, procedure_id, hook_id); self.record_for(
self.record_session_rejected_for(target, started_ns); InterfaceTarget::session(leaf_id, procedure_id, hook_id),
}
/// Records rejection of a packet that could not create a session.
pub(crate) fn record_session_rejected_for(
&mut self,
target: InterfaceTarget,
started_ns: Option<u64>,
) {
let InterfaceTarget::Session(key) = target else {
return;
};
self.record(
target,
Some(SessionViewStatus::Rejected),
InterfaceEventKind::SessionRejected { InterfaceEventKind::SessionRejected {
procedure_id: key.procedure_id, procedure_id,
hook_id: key.hook_id, hook_id,
started_ns, started_ns,
finished_ns: self.now_ns, finished_ns: self.now_ns,
}, },
@@ -207,27 +199,11 @@ impl InterfaceStore {
status: SessionStatus, status: SessionStatus,
started_ns: Option<u64>, started_ns: Option<u64>,
) { ) {
let target = InterfaceTarget::session(leaf_id, procedure_id, hook_id); self.record_for(
self.record_session_update_for(target, status, started_ns); InterfaceTarget::session(leaf_id, procedure_id, hook_id),
}
/// Records one session update tick for an explicit session target.
pub(crate) fn record_session_update_for(
&mut self,
target: InterfaceTarget,
status: SessionStatus,
started_ns: Option<u64>,
) {
let InterfaceTarget::Session(key) = target else {
return;
};
self.record(
target,
Some(SessionViewStatus::from_session_status(status)),
InterfaceEventKind::SessionUpdated { InterfaceEventKind::SessionUpdated {
procedure_id: key.procedure_id, procedure_id,
hook_id: key.hook_id, hook_id,
status, status,
started_ns, started_ns,
finished_ns: self.now_ns, finished_ns: self.now_ns,
@@ -243,29 +219,10 @@ impl InterfaceStore {
hook_id: HookID, hook_id: HookID,
started_ns: Option<u64>, started_ns: Option<u64>,
) { ) {
self.record_procedure_call_for( self.record_for(
InterfaceTarget::procedure(leaf_id, procedure_id), InterfaceTarget::procedure(leaf_id, procedure_id),
hook_id,
started_ns,
);
}
/// Records one procedure call for an explicit procedure target.
pub(crate) fn record_procedure_call_for(
&mut self,
target: InterfaceTarget,
hook_id: HookID,
started_ns: Option<u64>,
) {
let InterfaceTarget::Procedure(key) = target else {
return;
};
self.record(
target,
None,
InterfaceEventKind::ProcedureCalled { InterfaceEventKind::ProcedureCalled {
procedure_id: key.procedure_id, procedure_id,
hook_id, hook_id,
started_ns, started_ns,
finished_ns: self.now_ns, finished_ns: self.now_ns,
@@ -276,14 +233,8 @@ impl InterfaceStore {
/// Records a packet emitted by leaf logic before route retry handling. /// Records a packet emitted by leaf logic before route retry handling.
pub fn record_outbound_queued(&mut self, leaf_id: u32, packet: &Packet) { pub fn record_outbound_queued(&mut self, leaf_id: u32, packet: &Packet) {
let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id); let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id);
self.record_outbound_queued_for(target, packet); self.record_for(
}
/// Records a packet emitted by leaf logic before route retry handling.
pub(crate) fn record_outbound_queued_for(&mut self, target: InterfaceTarget, packet: &Packet) {
self.record(
target, target,
None,
InterfaceEventKind::OutboundQueued { InterfaceEventKind::OutboundQueued {
packet: packet.clone(), packet: packet.clone(),
}, },
@@ -293,14 +244,8 @@ impl InterfaceStore {
/// Records a route attempt for a queued outbound packet. /// Records a route attempt for a queued outbound packet.
pub fn record_route_attempt(&mut self, leaf_id: u32, packet: &Packet) { pub fn record_route_attempt(&mut self, leaf_id: u32, packet: &Packet) {
let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id); let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id);
self.record_route_attempt_for(target, packet); self.record_for(
}
/// Records a route attempt for a queued outbound packet.
pub(crate) fn record_route_attempt_for(&mut self, target: InterfaceTarget, packet: &Packet) {
self.record(
target, target,
None,
InterfaceEventKind::RouteAttempt { InterfaceEventKind::RouteAttempt {
packet: packet.clone(), packet: packet.clone(),
}, },
@@ -310,14 +255,8 @@ impl InterfaceStore {
/// Records a successful route attempt. /// Records a successful route attempt.
pub fn record_route_success(&mut self, leaf_id: u32, packet: &Packet) { pub fn record_route_success(&mut self, leaf_id: u32, packet: &Packet) {
let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id); let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id);
self.record_route_success_for(target, packet); self.record_for(
}
/// Records a successful route attempt.
pub(crate) fn record_route_success_for(&mut self, target: InterfaceTarget, packet: &Packet) {
self.record(
target, target,
None,
InterfaceEventKind::RouteSuccess { InterfaceEventKind::RouteSuccess {
packet: packet.clone(), packet: packet.clone(),
}, },
@@ -327,19 +266,8 @@ impl InterfaceStore {
/// Records a failed route attempt without removing the packet from retry state. /// Records a failed route attempt without removing the packet from retry state.
pub fn record_route_failure(&mut self, leaf_id: u32, packet: &Packet, error: EndpointError) { pub fn record_route_failure(&mut self, leaf_id: u32, packet: &Packet, error: EndpointError) {
let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id); let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id);
self.record_route_failure_for(target, packet, error); self.record_for(
}
/// Records a failed route attempt without removing the packet from retry state.
pub(crate) fn record_route_failure_for(
&mut self,
target: InterfaceTarget,
packet: &Packet,
error: EndpointError,
) {
self.record(
target, target,
None,
InterfaceEventKind::RouteFailure { InterfaceEventKind::RouteFailure {
packet: packet.clone(), packet: packet.clone(),
error, error,
@@ -347,22 +275,14 @@ impl InterfaceStore {
); );
} }
fn record( pub(crate) fn record_for(&mut self, target: InterfaceTarget, kind: InterfaceEventKind) {
&mut self,
target: InterfaceTarget,
status: Option<SessionViewStatus>,
kind: InterfaceEventKind,
) {
let index = self.push_event(target.leaf_id(), kind); let index = self.push_event(target.leaf_id(), kind);
self.link_event(target, status, index); self.link_event(target, index);
} }
fn link_event( fn link_event(&mut self, target: InterfaceTarget, index: usize) {
&mut self, let status = Self::status_for_event(&self.events[index].kind);
target: InterfaceTarget,
status: Option<SessionViewStatus>,
index: usize,
) {
match target { match target {
InterfaceTarget::Session(key) => { InterfaceTarget::Session(key) => {
let view = self.session_view_for_key_mut(key); let view = self.session_view_for_key_mut(key);
@@ -379,6 +299,17 @@ impl InterfaceStore {
} }
} }
fn status_for_event(kind: &InterfaceEventKind) -> Option<SessionViewStatus> {
match kind {
InterfaceEventKind::SessionCreated { .. } => Some(SessionViewStatus::Running),
InterfaceEventKind::SessionRejected { .. } => Some(SessionViewStatus::Rejected),
InterfaceEventKind::SessionUpdated { status, .. } => {
Some(SessionViewStatus::from_session_status(*status))
}
_ => None,
}
}
fn push_event(&mut self, leaf_id: u32, kind: InterfaceEventKind) -> usize { fn push_event(&mut self, leaf_id: u32, kind: InterfaceEventKind) -> usize {
let index = self.events.len(); let index = self.events.len();
-46
View File
@@ -1,46 +0,0 @@
use crate::{
interface::{ProcedureKey, SessionKey},
protocol::HookID,
};
/// Internal owner for one interface event.
///
/// The runtime already knows whether a packet belongs to a hook-backed session or a
/// one-shot procedure. Keeping that answer explicit avoids reconstructing ownership
/// from packet fields later, which is what made procedure packet flow look like fake
/// session activity in the previous store implementation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum InterfaceTarget {
/// Event belongs to one hook-backed session instance.
Session(SessionKey),
/// Event belongs to one one-shot procedure family.
Procedure(ProcedureKey),
}
impl InterfaceTarget {
/// Builds a session target from the same pieces exposed by [`SessionKey`].
pub(crate) fn session(leaf_id: u32, procedure_id: u32, hook_id: HookID) -> Self {
Self::Session(SessionKey {
leaf_id,
procedure_id,
hook_id,
})
}
/// Builds a procedure target from the same pieces exposed by [`ProcedureKey`].
pub(crate) fn procedure(leaf_id: u32, procedure_id: u32) -> Self {
Self::Procedure(ProcedureKey {
leaf_id,
procedure_id,
})
}
/// Returns the leaf id used on the append-only event record.
pub(crate) fn leaf_id(self) -> u32 {
match self {
Self::Session(key) => key.leaf_id,
Self::Procedure(key) => key.leaf_id,
}
}
}
+136 -63
View File
@@ -1,7 +1,7 @@
use alloc::collections::VecDeque; use alloc::collections::VecDeque;
use crate::{ use crate::{
interface::{InterfaceStore, InterfaceTarget}, interface::{InterfaceEventKind, InterfaceStore, InterfaceTarget},
protocol::{ protocol::{
Endpoint, Packet, PacketQueue, Procedure, ProcedureOut, Session, SessionCtx, SessionEntry, Endpoint, Packet, PacketQueue, Procedure, ProcedureOut, Session, SessionCtx, SessionEntry,
SessionFamily, SessionInit, SessionInitResult, SessionStatus, SessionFamily, SessionInit, SessionInitResult, SessionStatus,
@@ -26,17 +26,7 @@ pub struct LeafOutbox {
#[derive(Clone)] #[derive(Clone)]
struct LeafOutboxEntry { struct LeafOutboxEntry {
packet: Packet, packet: Packet,
target: LeafOutboxTarget, target: Option<InterfaceTarget>,
}
/// Interface owner attached to a leaf-level outbox entry.
#[derive(Clone, Copy)]
enum LeafOutboxTarget {
/// Compatibility path for packets queued through the public `push`/`extend` API.
InferFromPacket,
/// Runtime-known session or procedure target.
Explicit(InterfaceTarget),
} }
impl LeafOutbox { impl LeafOutbox {
@@ -49,7 +39,7 @@ impl LeafOutbox {
/// Adds one packet to the retry queue. /// Adds one packet to the retry queue.
pub fn push(&mut self, packet: Packet) { pub fn push(&mut self, packet: Packet) {
self.push_with_target(packet, LeafOutboxTarget::InferFromPacket); self.push_with_target(packet, None);
} }
/// Adds all packets from `packets` in FIFO order. /// Adds all packets from `packets` in FIFO order.
@@ -71,7 +61,11 @@ impl LeafOutbox {
/// Adds one packet with a runtime-known interface target. /// Adds one packet with a runtime-known interface target.
pub(crate) fn push_for_target(&mut self, packet: Packet, target: InterfaceTarget) { pub(crate) fn push_for_target(&mut self, packet: Packet, target: InterfaceTarget) {
self.push_with_target(packet, LeafOutboxTarget::Explicit(target)); self.push_with_target(packet, Some(target));
}
fn push_with_target(&mut self, packet: Packet, target: Option<InterfaceTarget>) {
self.packets.push_back(LeafOutboxEntry { packet, target });
} }
/// Adds all packets with the same runtime-known interface target. /// Adds all packets with the same runtime-known interface target.
@@ -80,10 +74,6 @@ impl LeafOutbox {
self.push_for_target(packet, target); self.push_for_target(packet, target);
} }
} }
fn push_with_target(&mut self, packet: Packet, target: LeafOutboxTarget) {
self.packets.push_back(LeafOutboxEntry { packet, target });
}
} }
impl Default for LeafOutbox { impl Default for LeafOutbox {
@@ -112,7 +102,12 @@ pub fn dispatch_session<L, S>(
let target = InterfaceTarget::session(leaf_id, procedure_id, hook_id); let target = InterfaceTarget::session(leaf_id, procedure_id, hook_id);
if let Some(store) = interface.as_mut() { if let Some(store) = interface.as_mut() {
store.record_inbound_for(target, &packet); store.record_for(
target,
InterfaceEventKind::Inbound {
packet: packet.clone(),
},
);
} }
if let Some(entry) = family if let Some(entry) = family
@@ -123,7 +118,13 @@ pub fn dispatch_session<L, S>(
entry.inbox.push_back(packet); entry.inbox.push_back(packet);
if let Some(store) = interface.as_mut() { if let Some(store) = interface.as_mut() {
store.record_session_packet_queued_for(target); store.record_for(
target,
InterfaceEventKind::SessionPacketQueued {
procedure_id,
hook_id,
},
);
} }
return; return;
@@ -138,18 +139,47 @@ pub fn dispatch_session<L, S>(
family.entries.push(SessionEntry::new(hook_id, state)); family.entries.push(SessionEntry::new(hook_id, state));
if let Some(store) = interface.as_mut() { if let Some(store) = interface.as_mut() {
store.record_session_created_for(target, started_ns); store.record_for(
target,
InterfaceEventKind::SessionCreated {
procedure_id,
hook_id,
started_ns,
finished_ns: store.now_ns(),
},
);
} }
} }
SessionInitResult::Rejected => { SessionInitResult::Rejected => {
if let Some(store) = interface.as_mut() { if let Some(store) = interface.as_mut() {
store.record_session_rejected_for(target, started_ns); store.record_for(
target,
InterfaceEventKind::SessionRejected {
procedure_id,
hook_id,
started_ns,
finished_ns: store.now_ns(),
},
);
} }
} }
SessionInitResult::RejectedWith(packet) => { SessionInitResult::RejectedWith(packet) => {
if let Some(store) = interface.as_mut() { if let Some(store) = interface.as_mut() {
store.record_session_rejected_for(target, started_ns); store.record_for(
store.record_outbound_queued_for(target, &packet); target,
InterfaceEventKind::SessionRejected {
procedure_id,
hook_id,
started_ns,
finished_ns: store.now_ns(),
},
);
store.record_for(
target,
InterfaceEventKind::OutboundQueued {
packet: packet.clone(),
},
);
} }
outbox.push_for_target(packet, target); outbox.push_for_target(packet, target);
@@ -187,10 +217,24 @@ pub fn update_session_family<L, S>(
let target = InterfaceTarget::session(leaf_id, S::PROCEDURE_ID, entry.hook_id); let target = InterfaceTarget::session(leaf_id, S::PROCEDURE_ID, entry.hook_id);
if let Some(store) = interface.as_mut() { if let Some(store) = interface.as_mut() {
store.record_session_update_for(target, status, started_ns); store.record_for(
target,
InterfaceEventKind::SessionUpdated {
procedure_id: S::PROCEDURE_ID,
hook_id: entry.hook_id,
status,
started_ns,
finished_ns: store.now_ns(),
},
);
for packet in entry.outbox.iter().skip(outbox_start) { for packet in entry.outbox.iter().skip(outbox_start) {
store.record_outbound_queued_for(target, packet); store.record_for(
target,
InterfaceEventKind::OutboundQueued {
packet: packet.clone(),
},
);
} }
} }
@@ -215,7 +259,12 @@ pub fn dispatch_procedure<L, P>(
let target = InterfaceTarget::procedure(leaf_id, P::PROCEDURE_ID); let target = InterfaceTarget::procedure(leaf_id, P::PROCEDURE_ID);
if let Some(store) = interface.as_mut() { if let Some(store) = interface.as_mut() {
store.record_inbound_for(target, &packet); store.record_for(
target,
InterfaceEventKind::Inbound {
packet: packet.clone(),
},
);
} }
let hook_id = packet.hook_id; let hook_id = packet.hook_id;
@@ -227,10 +276,23 @@ pub fn dispatch_procedure<L, P>(
let packets = procedure_out.into_packets(); let packets = procedure_out.into_packets();
if let Some(store) = interface.as_mut() { if let Some(store) = interface.as_mut() {
store.record_procedure_call_for(target, hook_id, started_ns); store.record_for(
target,
InterfaceEventKind::ProcedureCalled {
procedure_id: P::PROCEDURE_ID,
hook_id,
started_ns,
finished_ns: store.now_ns(),
},
);
for packet in &packets { for packet in &packets {
store.record_outbound_queued_for(target, packet); store.record_for(
target,
InterfaceEventKind::OutboundQueued {
packet: packet.clone(),
},
);
} }
} }
@@ -244,17 +306,13 @@ pub fn flush_leaf_outbox(
outbox: &mut LeafOutbox, outbox: &mut LeafOutbox,
interface: &mut Option<&mut InterfaceStore>, interface: &mut Option<&mut InterfaceStore>,
) -> bool { ) -> bool {
while let Some(entry) = outbox.packets.front().cloned() { flush_outbox(endpoint, &mut outbox.packets, interface, |entry| {
let target = resolve_leaf_outbox_target(leaf_id, &entry); let target = entry.target.unwrap_or_else(|| {
InterfaceTarget::session(leaf_id, entry.packet.procedure_id, entry.packet.hook_id)
});
if !flush_packet_with_target(endpoint, target, &entry.packet, interface) { (target, entry.packet.clone())
return false; })
}
outbox.packets.pop_front();
}
true
} }
/// Flushes and retains one generated session family. /// Flushes and retains one generated session family.
@@ -287,17 +345,12 @@ pub fn flush_packet_queue_with_interface(
outbox: &mut PacketQueue, outbox: &mut PacketQueue,
interface: &mut Option<&mut InterfaceStore>, interface: &mut Option<&mut InterfaceStore>,
) -> bool { ) -> bool {
while let Some(packet) = outbox.front().cloned() { flush_outbox(endpoint, outbox, interface, |packet| {
let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id); (
InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id),
if !flush_packet_with_target(endpoint, target, &packet, interface) { packet.clone(),
return false; )
} })
outbox.pop_front();
}
true
} }
/// Flushes a packet queue whose owner is already known by the generated runtime. /// Flushes a packet queue whose owner is already known by the generated runtime.
@@ -307,7 +360,20 @@ fn flush_packet_queue_with_target(
outbox: &mut PacketQueue, outbox: &mut PacketQueue,
interface: &mut Option<&mut InterfaceStore>, interface: &mut Option<&mut InterfaceStore>,
) -> bool { ) -> bool {
while let Some(packet) = outbox.front().cloned() { flush_outbox(endpoint, outbox, interface, |packet| {
(target, packet.clone())
})
}
fn flush_outbox<T>(
endpoint: &mut Endpoint,
outbox: &mut VecDeque<T>,
interface: &mut Option<&mut InterfaceStore>,
mut packet_for: impl FnMut(&T) -> (InterfaceTarget, Packet),
) -> bool {
while let Some(item) = outbox.front() {
let (target, packet) = packet_for(item);
if !flush_packet_with_target(endpoint, target, &packet, interface) { if !flush_packet_with_target(endpoint, target, &packet, interface) {
return false; return false;
} }
@@ -325,20 +391,36 @@ fn flush_packet_with_target(
interface: &mut Option<&mut InterfaceStore>, interface: &mut Option<&mut InterfaceStore>,
) -> bool { ) -> bool {
if let Some(store) = interface.as_mut() { if let Some(store) = interface.as_mut() {
store.record_route_attempt_for(target, packet); store.record_for(
target,
InterfaceEventKind::RouteAttempt {
packet: packet.clone(),
},
);
} }
match endpoint.add_outbound(packet.clone()) { match endpoint.add_outbound(packet.clone()) {
Ok(()) => { Ok(()) => {
if let Some(store) = interface.as_mut() { if let Some(store) = interface.as_mut() {
store.record_route_success_for(target, packet); store.record_for(
target,
InterfaceEventKind::RouteSuccess {
packet: packet.clone(),
},
);
} }
true true
} }
Err(error) => { Err(error) => {
if let Some(store) = interface.as_mut() { if let Some(store) = interface.as_mut() {
store.record_route_failure_for(target, packet, error); store.record_for(
target,
InterfaceEventKind::RouteFailure {
packet: packet.clone(),
error,
},
);
} }
false false
@@ -346,15 +428,6 @@ fn flush_packet_with_target(
} }
} }
fn resolve_leaf_outbox_target(leaf_id: u32, entry: &LeafOutboxEntry) -> InterfaceTarget {
match entry.target {
LeafOutboxTarget::InferFromPacket => {
InterfaceTarget::session(leaf_id, entry.packet.procedure_id, entry.packet.hook_id)
}
LeafOutboxTarget::Explicit(target) => target,
}
}
/// Returns the path used by generated procedure responses. /// Returns the path used by generated procedure responses.
fn parent_reply_path(endpoint: &Endpoint) -> alloc::vec::Vec<u32> { fn parent_reply_path(endpoint: &Endpoint) -> alloc::vec::Vec<u32> {
if endpoint.path.len() > 1 { if endpoint.path.len() > 1 {