diff --git a/src/interface/mod.rs b/src/interface/mod.rs index 513bcf9..65bce89 100644 --- a/src/interface/mod.rs +++ b/src/interface/mod.rs @@ -6,7 +6,6 @@ mod event; mod key; mod store; -mod target; mod view; pub use event::{InterfaceEvent, InterfaceEventKind}; @@ -14,4 +13,4 @@ pub use key::{ProcedureKey, SessionKey}; pub use store::InterfaceStore; pub use view::{ProcedureView, SessionView, SessionViewStatus}; -pub(crate) use target::InterfaceTarget; +pub(crate) use store::InterfaceTarget; diff --git a/src/interface/store.rs b/src/interface/store.rs index d979648..e210d23 100644 --- a/src/interface/store.rs +++ b/src/interface/store.rs @@ -2,12 +2,54 @@ use alloc::{collections::BTreeMap, vec::Vec}; use crate::{ interface::{ - InterfaceEvent, InterfaceEventKind, InterfaceTarget, ProcedureKey, ProcedureView, - SessionKey, SessionView, SessionViewStatus, + InterfaceEvent, InterfaceEventKind, ProcedureKey, ProcedureView, SessionKey, SessionView, + SessionViewStatus, }, protocol::{EndpointError, HookID, Packet, SessionStatus}, }; +/// Internal owner for one interface event. +/// +/// The runtime already knows whether a packet belongs to a hook-backed session or a +/// one-shot procedure. Keeping that answer explicit avoids reconstructing ownership +/// from packet fields later, which is what made procedure packet flow look like fake +/// session activity in the previous store implementation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum InterfaceTarget { + /// Event belongs to one hook-backed session instance. + Session(SessionKey), + + /// Event belongs to one one-shot procedure family. + Procedure(ProcedureKey), +} + +impl InterfaceTarget { + /// Builds a session target from the same pieces exposed by [`SessionKey`]. + pub(crate) fn session(leaf_id: u32, procedure_id: u32, hook_id: HookID) -> Self { + Self::Session(SessionKey { + leaf_id, + procedure_id, + hook_id, + }) + } + + /// Builds a procedure target from the same pieces exposed by [`ProcedureKey`]. + pub(crate) fn procedure(leaf_id: u32, procedure_id: u32) -> Self { + Self::Procedure(ProcedureKey { + leaf_id, + procedure_id, + }) + } + + /// Returns the leaf id used on the append-only event record. + pub(crate) fn leaf_id(self) -> u32 { + match self { + Self::Session(key) => key.leaf_id, + Self::Procedure(key) => key.leaf_id, + } + } +} + /// Caller-owned view and packet-flow store for interface frontends. /// /// Generated leaves receive a mutable reference to this store during interface-aware @@ -86,14 +128,8 @@ impl InterfaceStore { /// Records a packet delivered to a generated leaf. pub fn record_inbound(&mut self, leaf_id: u32, packet: &Packet) { let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id); - self.record_inbound_for(target, packet); - } - - /// Records a packet delivered to a target already known by generated runtime code. - pub(crate) fn record_inbound_for(&mut self, target: InterfaceTarget, packet: &Packet) { - self.record( + self.record_for( target, - None, InterfaceEventKind::Inbound { packet: packet.clone(), }, @@ -107,25 +143,11 @@ impl InterfaceStore { procedure_id: u32, hook_id: HookID, ) { - self.record_session_packet_queued_for(InterfaceTarget::session( - leaf_id, - procedure_id, - hook_id, - )); - } - - /// Records that a packet was queued for an existing session inbox. - pub(crate) fn record_session_packet_queued_for(&mut self, target: InterfaceTarget) { - let InterfaceTarget::Session(key) = target else { - return; - }; - - self.record( - target, - None, + self.record_for( + InterfaceTarget::session(leaf_id, procedure_id, hook_id), InterfaceEventKind::SessionPacketQueued { - procedure_id: key.procedure_id, - hook_id: key.hook_id, + procedure_id, + hook_id, }, ); } @@ -138,26 +160,11 @@ impl InterfaceStore { hook_id: HookID, started_ns: Option, ) { - let target = InterfaceTarget::session(leaf_id, procedure_id, hook_id); - self.record_session_created_for(target, started_ns); - } - - /// Records successful creation of a new session state for an explicit target. - pub(crate) fn record_session_created_for( - &mut self, - target: InterfaceTarget, - started_ns: Option, - ) { - let InterfaceTarget::Session(key) = target else { - return; - }; - - self.record( - target, - Some(SessionViewStatus::Running), + self.record_for( + InterfaceTarget::session(leaf_id, procedure_id, hook_id), InterfaceEventKind::SessionCreated { - procedure_id: key.procedure_id, - hook_id: key.hook_id, + procedure_id, + hook_id, started_ns, finished_ns: self.now_ns, }, @@ -172,26 +179,11 @@ impl InterfaceStore { hook_id: HookID, started_ns: Option, ) { - let target = InterfaceTarget::session(leaf_id, procedure_id, hook_id); - self.record_session_rejected_for(target, started_ns); - } - - /// Records rejection of a packet that could not create a session. - pub(crate) fn record_session_rejected_for( - &mut self, - target: InterfaceTarget, - started_ns: Option, - ) { - let InterfaceTarget::Session(key) = target else { - return; - }; - - self.record( - target, - Some(SessionViewStatus::Rejected), + self.record_for( + InterfaceTarget::session(leaf_id, procedure_id, hook_id), InterfaceEventKind::SessionRejected { - procedure_id: key.procedure_id, - hook_id: key.hook_id, + procedure_id, + hook_id, started_ns, finished_ns: self.now_ns, }, @@ -207,27 +199,11 @@ impl InterfaceStore { status: SessionStatus, started_ns: Option, ) { - let target = InterfaceTarget::session(leaf_id, procedure_id, hook_id); - self.record_session_update_for(target, status, started_ns); - } - - /// Records one session update tick for an explicit session target. - pub(crate) fn record_session_update_for( - &mut self, - target: InterfaceTarget, - status: SessionStatus, - started_ns: Option, - ) { - let InterfaceTarget::Session(key) = target else { - return; - }; - - self.record( - target, - Some(SessionViewStatus::from_session_status(status)), + self.record_for( + InterfaceTarget::session(leaf_id, procedure_id, hook_id), InterfaceEventKind::SessionUpdated { - procedure_id: key.procedure_id, - hook_id: key.hook_id, + procedure_id, + hook_id, status, started_ns, finished_ns: self.now_ns, @@ -243,29 +219,10 @@ impl InterfaceStore { hook_id: HookID, started_ns: Option, ) { - self.record_procedure_call_for( + self.record_for( InterfaceTarget::procedure(leaf_id, procedure_id), - hook_id, - started_ns, - ); - } - - /// Records one procedure call for an explicit procedure target. - pub(crate) fn record_procedure_call_for( - &mut self, - target: InterfaceTarget, - hook_id: HookID, - started_ns: Option, - ) { - let InterfaceTarget::Procedure(key) = target else { - return; - }; - - self.record( - target, - None, InterfaceEventKind::ProcedureCalled { - procedure_id: key.procedure_id, + procedure_id, hook_id, started_ns, finished_ns: self.now_ns, @@ -276,14 +233,8 @@ impl InterfaceStore { /// Records a packet emitted by leaf logic before route retry handling. pub fn record_outbound_queued(&mut self, leaf_id: u32, packet: &Packet) { let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id); - self.record_outbound_queued_for(target, packet); - } - - /// Records a packet emitted by leaf logic before route retry handling. - pub(crate) fn record_outbound_queued_for(&mut self, target: InterfaceTarget, packet: &Packet) { - self.record( + self.record_for( target, - None, InterfaceEventKind::OutboundQueued { packet: packet.clone(), }, @@ -293,14 +244,8 @@ impl InterfaceStore { /// Records a route attempt for a queued outbound packet. pub fn record_route_attempt(&mut self, leaf_id: u32, packet: &Packet) { let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id); - self.record_route_attempt_for(target, packet); - } - - /// Records a route attempt for a queued outbound packet. - pub(crate) fn record_route_attempt_for(&mut self, target: InterfaceTarget, packet: &Packet) { - self.record( + self.record_for( target, - None, InterfaceEventKind::RouteAttempt { packet: packet.clone(), }, @@ -310,14 +255,8 @@ impl InterfaceStore { /// Records a successful route attempt. pub fn record_route_success(&mut self, leaf_id: u32, packet: &Packet) { let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id); - self.record_route_success_for(target, packet); - } - - /// Records a successful route attempt. - pub(crate) fn record_route_success_for(&mut self, target: InterfaceTarget, packet: &Packet) { - self.record( + self.record_for( target, - None, InterfaceEventKind::RouteSuccess { packet: packet.clone(), }, @@ -327,19 +266,8 @@ impl InterfaceStore { /// Records a failed route attempt without removing the packet from retry state. pub fn record_route_failure(&mut self, leaf_id: u32, packet: &Packet, error: EndpointError) { let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id); - self.record_route_failure_for(target, packet, error); - } - - /// Records a failed route attempt without removing the packet from retry state. - pub(crate) fn record_route_failure_for( - &mut self, - target: InterfaceTarget, - packet: &Packet, - error: EndpointError, - ) { - self.record( + self.record_for( target, - None, InterfaceEventKind::RouteFailure { packet: packet.clone(), error, @@ -347,22 +275,14 @@ impl InterfaceStore { ); } - fn record( - &mut self, - target: InterfaceTarget, - status: Option, - kind: InterfaceEventKind, - ) { + pub(crate) fn record_for(&mut self, target: InterfaceTarget, kind: InterfaceEventKind) { let index = self.push_event(target.leaf_id(), kind); - self.link_event(target, status, index); + self.link_event(target, index); } - fn link_event( - &mut self, - target: InterfaceTarget, - status: Option, - index: usize, - ) { + fn link_event(&mut self, target: InterfaceTarget, index: usize) { + let status = Self::status_for_event(&self.events[index].kind); + match target { InterfaceTarget::Session(key) => { let view = self.session_view_for_key_mut(key); @@ -379,6 +299,17 @@ impl InterfaceStore { } } + fn status_for_event(kind: &InterfaceEventKind) -> Option { + match kind { + InterfaceEventKind::SessionCreated { .. } => Some(SessionViewStatus::Running), + InterfaceEventKind::SessionRejected { .. } => Some(SessionViewStatus::Rejected), + InterfaceEventKind::SessionUpdated { status, .. } => { + Some(SessionViewStatus::from_session_status(*status)) + } + _ => None, + } + } + fn push_event(&mut self, leaf_id: u32, kind: InterfaceEventKind) -> usize { let index = self.events.len(); diff --git a/src/interface/target.rs b/src/interface/target.rs deleted file mode 100644 index bcedf82..0000000 --- a/src/interface/target.rs +++ /dev/null @@ -1,46 +0,0 @@ -use crate::{ - interface::{ProcedureKey, SessionKey}, - protocol::HookID, -}; - -/// Internal owner for one interface event. -/// -/// The runtime already knows whether a packet belongs to a hook-backed session or a -/// one-shot procedure. Keeping that answer explicit avoids reconstructing ownership -/// from packet fields later, which is what made procedure packet flow look like fake -/// session activity in the previous store implementation. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(crate) enum InterfaceTarget { - /// Event belongs to one hook-backed session instance. - Session(SessionKey), - - /// Event belongs to one one-shot procedure family. - Procedure(ProcedureKey), -} - -impl InterfaceTarget { - /// Builds a session target from the same pieces exposed by [`SessionKey`]. - pub(crate) fn session(leaf_id: u32, procedure_id: u32, hook_id: HookID) -> Self { - Self::Session(SessionKey { - leaf_id, - procedure_id, - hook_id, - }) - } - - /// Builds a procedure target from the same pieces exposed by [`ProcedureKey`]. - pub(crate) fn procedure(leaf_id: u32, procedure_id: u32) -> Self { - Self::Procedure(ProcedureKey { - leaf_id, - procedure_id, - }) - } - - /// Returns the leaf id used on the append-only event record. - pub(crate) fn leaf_id(self) -> u32 { - match self { - Self::Session(key) => key.leaf_id, - Self::Procedure(key) => key.leaf_id, - } - } -} diff --git a/src/protocol/runtime.rs b/src/protocol/runtime.rs index 9f684d5..ecf1347 100644 --- a/src/protocol/runtime.rs +++ b/src/protocol/runtime.rs @@ -1,7 +1,7 @@ use alloc::collections::VecDeque; use crate::{ - interface::{InterfaceStore, InterfaceTarget}, + interface::{InterfaceEventKind, InterfaceStore, InterfaceTarget}, protocol::{ Endpoint, Packet, PacketQueue, Procedure, ProcedureOut, Session, SessionCtx, SessionEntry, SessionFamily, SessionInit, SessionInitResult, SessionStatus, @@ -26,17 +26,7 @@ pub struct LeafOutbox { #[derive(Clone)] struct LeafOutboxEntry { packet: Packet, - target: LeafOutboxTarget, -} - -/// Interface owner attached to a leaf-level outbox entry. -#[derive(Clone, Copy)] -enum LeafOutboxTarget { - /// Compatibility path for packets queued through the public `push`/`extend` API. - InferFromPacket, - - /// Runtime-known session or procedure target. - Explicit(InterfaceTarget), + target: Option, } impl LeafOutbox { @@ -49,7 +39,7 @@ impl LeafOutbox { /// Adds one packet to the retry queue. pub fn push(&mut self, packet: Packet) { - self.push_with_target(packet, LeafOutboxTarget::InferFromPacket); + self.push_with_target(packet, None); } /// Adds all packets from `packets` in FIFO order. @@ -71,7 +61,11 @@ impl LeafOutbox { /// Adds one packet with a runtime-known interface target. pub(crate) fn push_for_target(&mut self, packet: Packet, target: InterfaceTarget) { - self.push_with_target(packet, LeafOutboxTarget::Explicit(target)); + self.push_with_target(packet, Some(target)); + } + + fn push_with_target(&mut self, packet: Packet, target: Option) { + self.packets.push_back(LeafOutboxEntry { packet, target }); } /// Adds all packets with the same runtime-known interface target. @@ -80,10 +74,6 @@ impl LeafOutbox { self.push_for_target(packet, target); } } - - fn push_with_target(&mut self, packet: Packet, target: LeafOutboxTarget) { - self.packets.push_back(LeafOutboxEntry { packet, target }); - } } impl Default for LeafOutbox { @@ -112,7 +102,12 @@ pub fn dispatch_session( let target = InterfaceTarget::session(leaf_id, procedure_id, hook_id); if let Some(store) = interface.as_mut() { - store.record_inbound_for(target, &packet); + store.record_for( + target, + InterfaceEventKind::Inbound { + packet: packet.clone(), + }, + ); } if let Some(entry) = family @@ -123,7 +118,13 @@ pub fn dispatch_session( entry.inbox.push_back(packet); if let Some(store) = interface.as_mut() { - store.record_session_packet_queued_for(target); + store.record_for( + target, + InterfaceEventKind::SessionPacketQueued { + procedure_id, + hook_id, + }, + ); } return; @@ -138,18 +139,47 @@ pub fn dispatch_session( family.entries.push(SessionEntry::new(hook_id, state)); if let Some(store) = interface.as_mut() { - store.record_session_created_for(target, started_ns); + store.record_for( + target, + InterfaceEventKind::SessionCreated { + procedure_id, + hook_id, + started_ns, + finished_ns: store.now_ns(), + }, + ); } } SessionInitResult::Rejected => { if let Some(store) = interface.as_mut() { - store.record_session_rejected_for(target, started_ns); + store.record_for( + target, + InterfaceEventKind::SessionRejected { + procedure_id, + hook_id, + started_ns, + finished_ns: store.now_ns(), + }, + ); } } SessionInitResult::RejectedWith(packet) => { if let Some(store) = interface.as_mut() { - store.record_session_rejected_for(target, started_ns); - store.record_outbound_queued_for(target, &packet); + store.record_for( + target, + InterfaceEventKind::SessionRejected { + procedure_id, + hook_id, + started_ns, + finished_ns: store.now_ns(), + }, + ); + store.record_for( + target, + InterfaceEventKind::OutboundQueued { + packet: packet.clone(), + }, + ); } outbox.push_for_target(packet, target); @@ -187,10 +217,24 @@ pub fn update_session_family( let target = InterfaceTarget::session(leaf_id, S::PROCEDURE_ID, entry.hook_id); if let Some(store) = interface.as_mut() { - store.record_session_update_for(target, status, started_ns); + store.record_for( + target, + InterfaceEventKind::SessionUpdated { + procedure_id: S::PROCEDURE_ID, + hook_id: entry.hook_id, + status, + started_ns, + finished_ns: store.now_ns(), + }, + ); for packet in entry.outbox.iter().skip(outbox_start) { - store.record_outbound_queued_for(target, packet); + store.record_for( + target, + InterfaceEventKind::OutboundQueued { + packet: packet.clone(), + }, + ); } } @@ -215,7 +259,12 @@ pub fn dispatch_procedure( let target = InterfaceTarget::procedure(leaf_id, P::PROCEDURE_ID); if let Some(store) = interface.as_mut() { - store.record_inbound_for(target, &packet); + store.record_for( + target, + InterfaceEventKind::Inbound { + packet: packet.clone(), + }, + ); } let hook_id = packet.hook_id; @@ -227,10 +276,23 @@ pub fn dispatch_procedure( let packets = procedure_out.into_packets(); if let Some(store) = interface.as_mut() { - store.record_procedure_call_for(target, hook_id, started_ns); + store.record_for( + target, + InterfaceEventKind::ProcedureCalled { + procedure_id: P::PROCEDURE_ID, + hook_id, + started_ns, + finished_ns: store.now_ns(), + }, + ); for packet in &packets { - store.record_outbound_queued_for(target, packet); + store.record_for( + target, + InterfaceEventKind::OutboundQueued { + packet: packet.clone(), + }, + ); } } @@ -244,17 +306,13 @@ pub fn flush_leaf_outbox( outbox: &mut LeafOutbox, interface: &mut Option<&mut InterfaceStore>, ) -> bool { - while let Some(entry) = outbox.packets.front().cloned() { - let target = resolve_leaf_outbox_target(leaf_id, &entry); + flush_outbox(endpoint, &mut outbox.packets, interface, |entry| { + let target = entry.target.unwrap_or_else(|| { + InterfaceTarget::session(leaf_id, entry.packet.procedure_id, entry.packet.hook_id) + }); - if !flush_packet_with_target(endpoint, target, &entry.packet, interface) { - return false; - } - - outbox.packets.pop_front(); - } - - true + (target, entry.packet.clone()) + }) } /// Flushes and retains one generated session family. @@ -287,17 +345,12 @@ pub fn flush_packet_queue_with_interface( outbox: &mut PacketQueue, interface: &mut Option<&mut InterfaceStore>, ) -> bool { - while let Some(packet) = outbox.front().cloned() { - let target = InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id); - - if !flush_packet_with_target(endpoint, target, &packet, interface) { - return false; - } - - outbox.pop_front(); - } - - true + flush_outbox(endpoint, outbox, interface, |packet| { + ( + InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id), + packet.clone(), + ) + }) } /// Flushes a packet queue whose owner is already known by the generated runtime. @@ -307,7 +360,20 @@ fn flush_packet_queue_with_target( outbox: &mut PacketQueue, interface: &mut Option<&mut InterfaceStore>, ) -> bool { - while let Some(packet) = outbox.front().cloned() { + flush_outbox(endpoint, outbox, interface, |packet| { + (target, packet.clone()) + }) +} + +fn flush_outbox( + endpoint: &mut Endpoint, + outbox: &mut VecDeque, + interface: &mut Option<&mut InterfaceStore>, + mut packet_for: impl FnMut(&T) -> (InterfaceTarget, Packet), +) -> bool { + while let Some(item) = outbox.front() { + let (target, packet) = packet_for(item); + if !flush_packet_with_target(endpoint, target, &packet, interface) { return false; } @@ -325,20 +391,36 @@ fn flush_packet_with_target( interface: &mut Option<&mut InterfaceStore>, ) -> bool { if let Some(store) = interface.as_mut() { - store.record_route_attempt_for(target, packet); + store.record_for( + target, + InterfaceEventKind::RouteAttempt { + packet: packet.clone(), + }, + ); } match endpoint.add_outbound(packet.clone()) { Ok(()) => { if let Some(store) = interface.as_mut() { - store.record_route_success_for(target, packet); + store.record_for( + target, + InterfaceEventKind::RouteSuccess { + packet: packet.clone(), + }, + ); } true } Err(error) => { if let Some(store) = interface.as_mut() { - store.record_route_failure_for(target, packet, error); + store.record_for( + target, + InterfaceEventKind::RouteFailure { + packet: packet.clone(), + error, + }, + ); } false @@ -346,15 +428,6 @@ fn flush_packet_with_target( } } -fn resolve_leaf_outbox_target(leaf_id: u32, entry: &LeafOutboxEntry) -> InterfaceTarget { - match entry.target { - LeafOutboxTarget::InferFromPacket => { - InterfaceTarget::session(leaf_id, entry.packet.procedure_id, entry.packet.hook_id) - } - LeafOutboxTarget::Explicit(target) => target, - } -} - /// Returns the path used by generated procedure responses. fn parent_reply_path(endpoint: &Endpoint) -> alloc::vec::Vec { if endpoint.path.len() > 1 {