Simplify session routing path

This commit is contained in:
Michael Mikovsky
2026-06-01 12:11:01 -06:00
parent 64e53c8cfe
commit 4cd496ed2b
12 changed files with 166 additions and 396 deletions
+7 -7
View File
@@ -88,11 +88,11 @@ The macro delegates behavior to small helpers:
- `update_session_family` - `update_session_family`
- `dispatch_procedure` - `dispatch_procedure`
- `flush_leaf_outbox` - `flush_leaf_outbox`
- `flush_session_family`
- `flush_packet_queue_with_interface`
This keeps the macro readable. The helper functions own the mechanics of session This keeps the macro readable. The helper functions own the mechanics of session
lookup, initialization, retry-safe flushing, and optional interface logging. lookup, initialization, procedure response flushing, and optional interface logging.
Sessions route their own output immediately through `Endpoint` helpers to avoid a
per-session output context and retry queue in small implant builds.
## Interface Store ## Interface Store
@@ -108,7 +108,7 @@ InterfaceStore
Generated leaves receive an optional mutable store during `update_interface`. The Generated leaves receive an optional mutable store during `update_interface`. The
helpers create and update the appropriate session/procedure views when packets are helpers create and update the appropriate session/procedure views when packets are
dispatched, sessions update, and outbound routes succeed or fail. dispatched, sessions update, and queued procedure outbound routes succeed or fail.
Internally, interface events are target-driven: Internally, interface events are target-driven:
@@ -132,9 +132,9 @@ procedure packet both have `procedure_id` and `hook_id`, but they should not bot
create session views. The runtime already knows which dispatch branch handled the create session views. The runtime already knows which dispatch branch handled the
packet, so that answer is carried into the store. packet, so that answer is carried into the store.
Leaf-level retry queues also carry the same owner metadata. That matters because the Leaf-level retry queues carry the same owner metadata for procedure responses.
shared leaf outbox contains both rejected session-init responses and procedure Session responses bypass this queue and use `Endpoint::send_hook_raw` or
responses. Session-entry outboxes use their surrounding session key directly. `Endpoint::send_hook_frame` directly.
Time remains caller-supplied: Time remains caller-supplied:
+47 -1
View File
@@ -1,6 +1,6 @@
use alloc::vec::Vec; use alloc::vec::Vec;
use crate::protocol::{Endpoint, EndpointError, EndpointName}; use crate::protocol::{Endpoint, EndpointError, EndpointName, Packet};
/// Compact identifier for one routed return channel. /// Compact identifier for one routed return channel.
/// ///
@@ -105,6 +105,52 @@ impl Endpoint {
} }
} }
/// Routes raw response data over an existing hook immediately.
///
/// This is the compact session-output path: it avoids an intermediate context and
/// retry queue. If a final packet cannot route, the local hook is still removed so
/// an implant does not retain dead hook state forever.
pub fn send_hook_raw(
&mut self,
hook_id: HookID,
procedure_id: u32,
data: Vec<u8>,
end_hook: bool,
) -> Result<(), EndpointError> {
let path = self.hook_path(hook_id)?;
let packet = Packet {
hook_id,
end_hook,
path,
procedure_id,
data,
};
let result = self.add_outbound(packet);
if result.is_err() && end_hook {
self.close_hook(hook_id);
}
result
}
/// Routes a one-byte-opcode response frame over an existing hook immediately.
pub fn send_hook_frame(
&mut self,
hook_id: HookID,
procedure_id: u32,
opcode: u8,
payload: &[u8],
end_hook: bool,
) -> Result<(), EndpointError> {
let mut data = Vec::with_capacity(payload.len() + 1);
data.push(opcode);
data.extend_from_slice(payload);
self.send_hook_raw(hook_id, procedure_id, data, end_hook)
}
/// Validates that `actual_peer` is the peer allowed to use `hook_id`. /// Validates that `actual_peer` is the peer allowed to use `hook_id`.
pub(crate) fn ensure_hook_peer( pub(crate) fn ensure_hook_peer(
&self, &self,
+1 -10
View File
@@ -98,6 +98,7 @@ macro_rules! unshell_leaf {
$( $(
$crate::protocol::update_session_family::<$State, $Session>( $crate::protocol::update_session_family::<$State, $Session>(
endpoint,
leaf_id, leaf_id,
&mut self.state, &mut self.state,
&mut self.$session_field, &mut self.$session_field,
@@ -126,7 +127,6 @@ macro_rules! unshell_leaf {
&mut self.state, &mut self.state,
&mut self.$session_field, &mut self.$session_field,
packet, packet,
&mut self.outbox,
interface, interface,
); );
return; return;
@@ -167,15 +167,6 @@ macro_rules! unshell_leaf {
&mut self.outbox, &mut self.outbox,
interface, interface,
); );
$(
$crate::protocol::flush_session_family::<$State, $Session>(
endpoint,
leaf_id,
&mut self.$session_field,
interface,
);
)*
} }
} }
+17 -85
View File
@@ -3,26 +3,27 @@ use alloc::collections::VecDeque;
use crate::{ use crate::{
interface::{InterfaceEventKind, InterfaceStore, InterfaceTarget}, interface::{InterfaceEventKind, InterfaceStore, InterfaceTarget},
protocol::{ protocol::{
Endpoint, Packet, PacketQueue, Procedure, ProcedureOut, Session, SessionCtx, SessionEntry, Endpoint, Packet, PacketQueue, Procedure, ProcedureOut, Session, SessionEntry,
SessionFamily, SessionInitError, SessionStatus, SessionFamily, SessionInitError, SessionStatus,
}, },
}; };
/// Retry queue shared by generated leaves. /// Retry queue shared by generated leaves.
/// ///
/// Sessions already own per-hook outboxes. This leaf-level queue is for rejected /// Leaf-level retry queue shared by generated leaves.
/// session initialization responses and one-shot procedures, both of which need the ///
/// same retry semantics as session output without becoming separate framework types. /// Sessions route directly through `Endpoint` to keep their runtime shape small. This
/// queue remains only for one-shot procedures, whose handlers still use `ProcedureOut`
/// and should not route while the procedure is borrowing leaf state.
pub struct LeafOutbox { pub struct LeafOutbox {
packets: VecDeque<LeafOutboxEntry>, packets: VecDeque<LeafOutboxEntry>,
} }
/// One packet retained by a leaf-level retry queue. /// One packet retained by a leaf-level retry queue.
/// ///
/// Session entry outboxes have an obvious owner from their surrounding session entry. /// Procedure responses from different generated branches share one queue. Storing the
/// Leaf-level outboxes are mixed: rejected session initialization packets and one-shot /// owner beside the packet keeps route logging precise without exposing another public
/// procedure responses both land here. Storing the owner beside the packet keeps route /// queue type.
/// logging precise without exposing another public queue type.
#[derive(Clone)] #[derive(Clone)]
struct LeafOutboxEntry { struct LeafOutboxEntry {
packet: Packet, packet: Packet,
@@ -85,15 +86,14 @@ impl Default for LeafOutbox {
/// Dispatches one packet into a generated session family. /// Dispatches one packet into a generated session family.
/// ///
/// The macro picks `S` and the family field. This helper owns the boring details: /// The macro picks `S` and the family field. This helper owns the boring details:
/// find the hook, initialize missing sessions, queue rejected responses, and update /// find the hook, initialize missing sessions, route rejected responses, and update
/// interface state when a caller supplied one. /// interface state when a caller supplied one.
pub fn dispatch_session<L, S>( pub fn dispatch_session<L, S>(
endpoint: &Endpoint, endpoint: &mut Endpoint,
leaf_id: u32, leaf_id: u32,
leaf: &mut L, leaf: &mut L,
family: &mut SessionFamily<S>, family: &mut SessionFamily<S>,
packet: Packet, packet: Packet,
outbox: &mut LeafOutbox,
interface: &mut Option<&mut InterfaceStore>, interface: &mut Option<&mut InterfaceStore>,
) where ) where
S: Session<L>, S: Session<L>,
@@ -149,7 +149,7 @@ pub fn dispatch_session<L, S>(
}; };
match S::init(leaf, packet) { match S::init(leaf, packet) {
Ok(state) => { Ok(state) => {
family.entries.push(SessionEntry::new(hook_id, path, state)); family.entries.push(SessionEntry::new(hook_id, state));
if let Some(store) = interface.as_mut() { if let Some(store) = interface.as_mut() {
store.record_for( store.record_for(
@@ -195,21 +195,16 @@ pub fn dispatch_session<L, S>(
finished_ns: store.now_ns(), finished_ns: store.now_ns(),
}, },
); );
store.record_for(
target,
InterfaceEventKind::OutboundQueued {
packet: packet.clone(),
},
);
} }
outbox.push_for_target(packet, target); let _ = flush_packet_with_target(endpoint, target, &packet, interface);
} }
} }
} }
/// Updates every live session in one generated session family. /// Updates every live session in one generated session family.
pub fn update_session_family<L, S>( pub fn update_session_family<L, S>(
endpoint: &mut Endpoint,
leaf_id: u32, leaf_id: u32,
leaf: &mut L, leaf: &mut L,
family: &mut SessionFamily<S>, family: &mut SessionFamily<S>,
@@ -223,13 +218,7 @@ pub fn update_session_family<L, S>(
} }
let started_ns = interface.as_ref().and_then(|store| store.now_ns()); let started_ns = interface.as_ref().and_then(|store| store.now_ns());
let outbox_start = entry.outbox.len(); let status = S::update(leaf, &mut entry.state, &mut entry.inbox, endpoint);
let path = entry.path.clone();
let status = {
let mut ctx = SessionCtx::new(entry.hook_id, path, S::PROCEDURE_ID, &mut entry.outbox);
S::update(leaf, &mut entry.state, &mut entry.inbox, &mut ctx)
};
let target = InterfaceTarget::session(leaf_id, S::PROCEDURE_ID, entry.hook_id); let target = InterfaceTarget::session(leaf_id, S::PROCEDURE_ID, entry.hook_id);
if let Some(store) = interface.as_mut() { if let Some(store) = interface.as_mut() {
@@ -243,21 +232,14 @@ pub fn update_session_family<L, S>(
finished_ns: store.now_ns(), finished_ns: store.now_ns(),
}, },
); );
for packet in entry.outbox.iter().skip(outbox_start) {
store.record_for(
target,
InterfaceEventKind::OutboundQueued {
packet: packet.clone(),
},
);
}
} }
if matches!(status, SessionStatus::Closed) { if matches!(status, SessionStatus::Closed) {
entry.closed = true; entry.closed = true;
} }
} }
family.entries.retain(|entry| !entry.closed);
} }
/// Dispatches one packet into a generated one-shot procedure. /// Dispatches one packet into a generated one-shot procedure.
@@ -331,56 +313,6 @@ pub fn flush_leaf_outbox(
}) })
} }
/// Flushes and retains one generated session family.
pub fn flush_session_family<L, S>(
endpoint: &mut Endpoint,
leaf_id: u32,
family: &mut SessionFamily<S>,
interface: &mut Option<&mut InterfaceStore>,
) where
S: Session<L>,
{
for entry in &mut family.entries {
let target = InterfaceTarget::session(leaf_id, S::PROCEDURE_ID, entry.hook_id);
flush_packet_queue_with_target(endpoint, target, &mut entry.outbox, interface);
}
family
.entries
.retain(|entry| !entry.closed || !entry.outbox.is_empty());
}
/// Flushes a retry queue through [`Endpoint::add_outbound`].
///
/// This is the interface-aware version of [`crate::protocol::flush_packet_queue`]. It
/// logs route attempts before trying them, then logs either success or the route error
/// without dropping the packet on failure.
pub fn flush_packet_queue_with_interface(
endpoint: &mut Endpoint,
leaf_id: u32,
outbox: &mut PacketQueue,
interface: &mut Option<&mut InterfaceStore>,
) -> bool {
flush_outbox(endpoint, outbox, interface, |packet| {
(
InterfaceTarget::session(leaf_id, packet.procedure_id, packet.hook_id),
packet.clone(),
)
})
}
/// Flushes a packet queue whose owner is already known by the generated runtime.
fn flush_packet_queue_with_target(
endpoint: &mut Endpoint,
target: InterfaceTarget,
outbox: &mut PacketQueue,
interface: &mut Option<&mut InterfaceStore>,
) -> bool {
flush_outbox(endpoint, outbox, interface, |packet| {
(target, packet.clone())
})
}
fn flush_outbox<T>( fn flush_outbox<T>(
endpoint: &mut Endpoint, endpoint: &mut Endpoint,
outbox: &mut VecDeque<T>, outbox: &mut VecDeque<T>,
+12 -128
View File
@@ -28,10 +28,10 @@ use crate::interface::SessionView;
/// leaf: &mut MyLeafState, /// leaf: &mut MyLeafState,
/// session: &mut Self, /// session: &mut Self,
/// incoming: &mut PacketQueue, /// incoming: &mut PacketQueue,
/// ctx: &mut SessionCtx<'_>, /// endpoint: &mut Endpoint,
/// ) -> SessionStatus { /// ) -> SessionStatus {
/// while let Some(packet) = incoming.pop_front() { /// while let Some(packet) = incoming.pop_front() {
/// session.apply(leaf, packet, ctx); /// session.apply(leaf, packet, endpoint);
/// } /// }
/// SessionStatus::Running /// SessionStatus::Running
/// } /// }
@@ -51,14 +51,15 @@ pub trait Session<L>: Sized {
/// Advances one active hook session. /// Advances one active hook session.
/// ///
/// The generated leaf calls this for every live session on each update tick so /// The generated leaf calls this for every live session on each update tick so
/// sessions can poll external workers even when no new packet arrived. Outbound /// sessions can poll external workers even when no new packet arrived. Session
/// packets must be queued through `ctx`; direct endpoint routing would bypass the /// output is routed immediately through `endpoint`; callers that need retry
/// generated retry rules. /// semantics should keep their own compact application state and retry on a later
/// tick.
fn update( fn update(
leaf: &mut L, leaf: &mut L,
session: &mut Self, session: &mut Self,
incoming: &mut PacketQueue, incoming: &mut PacketQueue,
ctx: &mut SessionCtx<'_>, endpoint: &mut Endpoint,
) -> SessionStatus; ) -> SessionStatus;
#[cfg(feature = "interface_ratatui")] #[cfg(feature = "interface_ratatui")]
@@ -121,99 +122,11 @@ pub enum SessionStatus {
/// The session has finished application work. /// The session has finished application work.
/// ///
/// The generated leaf still retains the entry until every queued packet routes /// The generated leaf removes the entry after the update tick. Final packets are
/// successfully, which prevents a failed final frame from losing session cleanup. /// routed immediately by the session before returning this status.
Closed, Closed,
} }
/// Mutable output context passed to [`Session::update`].
///
/// The context queues packets only; it never routes them immediately. Centralizing
/// routing in generated code is what makes final-frame retries reliable.
pub struct SessionCtx<'a> {
hook_id: HookID,
path: Vec<u32>,
procedure_id: u32,
outbox: &'a mut PacketQueue,
}
impl<'a> SessionCtx<'a> {
/// Creates a context for one session update call.
pub fn new(
hook_id: HookID,
path: Vec<u32>,
procedure_id: u32,
outbox: &'a mut PacketQueue,
) -> Self {
Self {
hook_id,
path,
procedure_id,
outbox,
}
}
/// Returns the hook id used for packets emitted through this context.
pub fn hook_id(&self) -> HookID {
self.hook_id
}
/// Queues a one-byte-opcode frame without closing the hook.
pub fn send(&mut self, opcode: u8, data: &[u8]) {
self.send_frame(opcode, data, false);
}
/// Queues a one-byte-opcode frame that closes the hook after successful routing.
pub fn send_final(&mut self, opcode: u8, data: &[u8]) {
self.send_frame(opcode, data, true);
}
/// Queues a protocol-specific error frame without closing the hook.
///
/// The `code` is used as the frame opcode because the protocol layer does not
/// reserve a universal error opcode. Leaves that have a dedicated error opcode can
/// pass that value here or call [`Self::send`] directly.
pub fn error(&mut self, code: u8, data: &[u8]) {
self.send(code, data);
}
/// Queues a protocol-specific error frame that closes the hook after routing.
pub fn error_final(&mut self, code: u8, data: &[u8]) {
self.send_final(code, data);
}
/// Queues raw packet data without adding an opcode byte.
pub fn send_raw(&mut self, data: &[u8]) {
self.send_raw_with_end(data, false);
}
/// Queues raw packet data and closes the hook after successful routing.
pub fn send_raw_final(&mut self, data: &[u8]) {
self.send_raw_with_end(data, true);
}
fn send_frame(&mut self, opcode: u8, data: &[u8], end_hook: bool) {
let mut frame = Vec::with_capacity(data.len() + 1);
frame.push(opcode);
frame.extend_from_slice(data);
self.enqueue_data(frame, end_hook);
}
fn send_raw_with_end(&mut self, data: &[u8], end_hook: bool) {
self.enqueue_data(data.to_vec(), end_hook);
}
fn enqueue_data(&mut self, data: Vec<u8>, end_hook: bool) {
self.outbox.push_back(Packet {
hook_id: self.hook_id,
end_hook,
path: self.path.clone(),
procedure_id: self.procedure_id,
data,
});
}
}
/// Storage entry used by macro-generated session stores. /// Storage entry used by macro-generated session stores.
/// ///
/// The fields are public so generated code in downstream crates can keep the update /// The fields are public so generated code in downstream crates can keep the update
@@ -223,23 +136,13 @@ pub struct SessionEntry<S> {
/// Hook id associated with this live session. /// Hook id associated with this live session.
pub hook_id: HookID, pub hook_id: HookID,
/// Destination path for packets emitted on this hook.
///
/// This is generated runtime state, not user session state. It is captured from
/// endpoint hook routing when the session is created so leaf sessions never have
/// to carry or understand a reply path.
pub path: Vec<u32>,
/// Application-owned session state. /// Application-owned session state.
pub state: S, pub state: S,
/// Packets delivered for this hook but not yet consumed by the session. /// Packets delivered for this hook but not yet consumed by the session.
pub inbox: PacketQueue, pub inbox: PacketQueue,
/// Packets emitted by the session but not yet accepted by endpoint routing. /// Whether application logic has finished and should be removed after update.
pub outbox: PacketQueue,
/// Whether application logic has finished and only retry flushing may remain.
pub closed: bool, pub closed: bool,
} }
@@ -266,7 +169,7 @@ impl<S> SessionFamily<S> {
let mut count = 0usize; let mut count = 0usize;
for entry in &self.entries { for entry in &self.entries {
count += entry.inbox.len() + entry.outbox.len(); count += entry.inbox.len();
} }
count count
@@ -281,31 +184,12 @@ impl<S> Default for SessionFamily<S> {
impl<S> SessionEntry<S> { impl<S> SessionEntry<S> {
/// Creates one active session entry for `hook_id`. /// Creates one active session entry for `hook_id`.
pub fn new(hook_id: HookID, path: Vec<u32>, state: S) -> Self { pub fn new(hook_id: HookID, state: S) -> Self {
Self { Self {
hook_id, hook_id,
path,
state, state,
inbox: PacketQueue::new(), inbox: PacketQueue::new(),
outbox: PacketQueue::new(),
closed: false, closed: false,
} }
} }
} }
/// Flushes a retry queue through [`Endpoint::add_outbound`].
///
/// The packet at the front is cloned for each attempt and removed only after routing
/// succeeds. This preserves final frames when a route is temporarily unavailable.
/// The return value is true when the queue was fully drained.
pub fn flush_packet_queue(endpoint: &mut Endpoint, outbox: &mut PacketQueue) -> bool {
while let Some(packet) = outbox.front().cloned() {
if endpoint.add_outbound(packet).is_err() {
return false;
}
outbox.pop_front();
}
true
}
+7 -53
View File
@@ -2,7 +2,7 @@ use alloc::vec::Vec;
use unshell::protocol::{HookID, Packet}; use unshell::protocol::{HookID, Packet};
use crate::{OP_ERROR, OP_OPEN, PROC_PTY}; use crate::{OP_OPEN, PROC_PTY};
/// Encodes a tiny PTY frame into `Packet::data`. /// Encodes a tiny PTY frame into `Packet::data`.
pub fn encode_frame(opcode: u8, payload: &[u8]) -> Vec<u8> { pub fn encode_frame(opcode: u8, payload: &[u8]) -> Vec<u8> {
@@ -12,35 +12,9 @@ pub fn encode_frame(opcode: u8, payload: &[u8]) -> Vec<u8> {
data data
} }
/// Encodes an `Open` payload with the caller's reply path. /// Encodes an `Open` frame.
pub fn encode_open(reply_path: &[u32]) -> Vec<u8> { pub fn encode_open() -> Vec<u8> {
let mut data = Vec::with_capacity(2 + reply_path.len() * 4); alloc::vec![OP_OPEN]
data.push(OP_OPEN);
data.push(reply_path.len() as u8);
for segment in reply_path {
data.extend_from_slice(&segment.to_le_bytes());
}
data
}
/// Decodes the reply path embedded in an `Open` payload after the opcode byte.
pub fn decode_open_reply_path(payload: &[u8]) -> Option<Vec<u32>> {
let path_len = usize::from(*payload.first()?);
let path_bytes = path_len.checked_mul(4)?;
let expected_len = 1usize.checked_add(path_bytes)?;
if payload.len() != expected_len {
return None;
}
let mut path = Vec::with_capacity(path_len);
for chunk in payload[1..].chunks_exact(4) {
path.push(u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
}
Some(path)
} }
/// Returns the opcode byte from a PTY packet, if present. /// Returns the opcode byte from a PTY packet, if present.
@@ -74,33 +48,13 @@ pub fn pty_packet(
} }
} }
/// Builds an outer PTY open packet with the specialized open payload shape. /// Builds an outer PTY open packet.
pub fn pty_open_packet(path: Vec<u32>, hook_id: HookID, reply_path: &[u32]) -> Packet { pub fn pty_open_packet(path: Vec<u32>, hook_id: HookID) -> Packet {
Packet { Packet {
hook_id, hook_id,
end_hook: false, end_hook: false,
path, path,
procedure_id: PROC_PTY, procedure_id: PROC_PTY,
data: encode_open(reply_path), data: encode_open(),
}
}
/// Builds a final error packet for session initialization failures.
pub(crate) fn error_packet(hook_id: HookID, reply_path: Vec<u32>, payload: &[u8]) -> Packet {
Packet {
hook_id,
end_hook: true,
path: reply_path,
procedure_id: PROC_PTY,
data: encode_frame(OP_ERROR, payload),
}
}
/// Infers the caller reply path from a locally delivered destination path.
pub(crate) fn reply_path_from_destination(destination: &[u32]) -> Vec<u32> {
if destination.len() > 1 {
destination[..destination.len() - 1].to_vec()
} else {
destination.to_vec()
} }
} }
+2 -3
View File
@@ -16,11 +16,10 @@ mod session;
mod state; mod state;
pub use codec::{ pub use codec::{
decode_open_reply_path, encode_frame, encode_open, frame_opcode, frame_payload, encode_frame, encode_open, frame_opcode, frame_payload, pty_open_packet, pty_packet,
pty_open_packet, pty_packet,
}; };
pub use constants::*; pub use constants::*;
pub use session::{PtySession, PtySessionState}; pub use session::PtySessionState;
pub use state::{FakePtyLeaf, FakePtyState}; pub use state::{FakePtyLeaf, FakePtyState};
#[cfg(test)] #[cfg(test)]
+50 -42
View File
@@ -1,14 +1,9 @@
use alloc::vec::Vec;
use unshell::protocol::{ use unshell::protocol::{
HookID, Packet, PacketQueue, Session, SessionCtx, SessionInit, SessionInitResult, SessionStatus, Endpoint, HookID, Packet, PacketQueue, Session, SessionInitError, SessionStatus,
}; };
use crate::{ use crate::{
codec::{ codec::{encode_frame, frame_opcode, frame_payload},
decode_open_reply_path, error_packet, frame_opcode, frame_payload,
reply_path_from_destination,
},
constants::{ constants::{
OP_ABORT, OP_ERROR, OP_EXIT, OP_INPUT, OP_OPEN, OP_OPENED, OP_OUTPUT, OP_STDIN_EOF, OP_ABORT, OP_ERROR, OP_EXIT, OP_INPUT, OP_OPEN, OP_OPENED, OP_OUTPUT, OP_STDIN_EOF,
OP_TERMINATE, PROC_PTY, OP_TERMINATE, PROC_PTY,
@@ -16,51 +11,32 @@ use crate::{
state::FakePtyState, state::FakePtyState,
}; };
/// Session contract for one hook-backed fake PTY.
pub struct PtySession;
/// Per-hook fake PTY session state. /// Per-hook fake PTY session state.
/// ///
/// A real PTY leaf will replace the pending flags with a worker handle. The reply path /// A real PTY leaf will replace the pending flags with a worker handle. Hook routing
/// and hook lifecycle behavior should stay the same. /// is owned by the generated runtime, so this state only tracks PTY behavior.
pub struct PtySessionState { pub struct PtySessionState {
hook_id: HookID, hook_id: HookID,
reply_path: Vec<u32>,
opened_pending: bool, opened_pending: bool,
stdin_closed: bool, stdin_closed: bool,
} }
impl Session<FakePtyState> for PtySession { impl Session<FakePtyState> for PtySessionState {
const PROCEDURE_ID: u32 = PROC_PTY; const PROCEDURE_ID: u32 = PROC_PTY;
type State = PtySessionState; fn init(leaf: &mut FakePtyState, packet: Packet) -> Result<Self, SessionInitError> {
fn reply_path(session: &Self::State) -> &[u32] {
&session.reply_path
}
fn init(
leaf: &mut FakePtyState,
packet: Packet,
ctx: &mut SessionInit,
) -> SessionInitResult<Self::State> {
if frame_opcode(&packet) != Some(OP_OPEN) { if frame_opcode(&packet) != Some(OP_OPEN) {
return SessionInitResult::RejectedWith(error_packet( return Err(SessionInitError::response_final(encode_frame(
ctx.hook_id(), OP_ERROR,
reply_path_from_destination(ctx.packet_path()),
b"unknown-session", b"unknown-session",
)); )));
} }
let reply_path = decode_open_reply_path(frame_payload(&packet))
.unwrap_or_else(|| reply_path_from_destination(ctx.packet_path()));
leaf.active_count += 1; leaf.active_count += 1;
leaf.total_opened += 1; leaf.total_opened += 1;
SessionInitResult::Created(PtySessionState { Ok(Self {
hook_id: ctx.hook_id(), hook_id: packet.hook_id,
reply_path,
opened_pending: true, opened_pending: true,
stdin_closed: false, stdin_closed: false,
}) })
@@ -68,24 +44,44 @@ impl Session<FakePtyState> for PtySession {
fn update( fn update(
leaf: &mut FakePtyState, leaf: &mut FakePtyState,
session: &mut Self::State, session: &mut Self,
incoming: &mut PacketQueue, incoming: &mut PacketQueue,
ctx: &mut SessionCtx<'_>, endpoint: &mut Endpoint,
) -> SessionStatus { ) -> SessionStatus {
if session.opened_pending { if session.opened_pending {
ctx.send(OP_OPENED, &[]); let _ = endpoint.send_hook_frame(
session.hook_id,
Self::PROCEDURE_ID,
OP_OPENED,
&[],
false,
);
session.opened_pending = false; session.opened_pending = false;
} }
while let Some(packet) = incoming.pop_front() { while let Some(packet) = incoming.pop_front() {
match frame_opcode(&packet) { match frame_opcode(&packet) {
Some(OP_INPUT) => ctx.send(OP_OUTPUT, frame_payload(&packet)), Some(OP_INPUT) => {
let _ = endpoint.send_hook_frame(
session.hook_id,
Self::PROCEDURE_ID,
OP_OUTPUT,
frame_payload(&packet),
false,
);
}
Some(OP_STDIN_EOF) => { Some(OP_STDIN_EOF) => {
session.stdin_closed = true; session.stdin_closed = true;
leaf.last_stdin_eof_hook = Some(session.hook_id); leaf.last_stdin_eof_hook = Some(session.hook_id);
} }
Some(OP_TERMINATE) => { Some(OP_TERMINATE) => {
ctx.send_final(OP_EXIT, &[0]); let _ = endpoint.send_hook_frame(
session.hook_id,
Self::PROCEDURE_ID,
OP_EXIT,
&[0],
true,
);
close_session(leaf); close_session(leaf);
return SessionStatus::Closed; return SessionStatus::Closed;
} }
@@ -94,12 +90,24 @@ impl Session<FakePtyState> for PtySession {
return SessionStatus::Closed; return SessionStatus::Closed;
} }
Some(OP_OPEN) => { Some(OP_OPEN) => {
ctx.send_final(OP_ERROR, b"duplicate-open"); let _ = endpoint.send_hook_frame(
session.hook_id,
Self::PROCEDURE_ID,
OP_ERROR,
b"duplicate-open",
true,
);
close_session(leaf); close_session(leaf);
return SessionStatus::Closed; return SessionStatus::Closed;
} }
_ => { _ => {
ctx.send_final(OP_ERROR, b"unknown-opcode"); let _ = endpoint.send_hook_frame(
session.hook_id,
Self::PROCEDURE_ID,
OP_ERROR,
b"unknown-opcode",
true,
);
close_session(leaf); close_session(leaf);
return SessionStatus::Closed; return SessionStatus::Closed;
} }
+2 -2
View File
@@ -1,6 +1,6 @@
use unshell::protocol::{HookID, unshell_leaf}; use unshell::protocol::{HookID, unshell_leaf};
use crate::{constants::LEAF_FAKE_PTY, procedure::PingProcedure, session::PtySession}; use crate::{constants::LEAF_FAKE_PTY, procedure::PingProcedure, session::PtySessionState};
/// User-owned state for the generated fake PTY leaf. /// User-owned state for the generated fake PTY leaf.
/// ///
@@ -45,7 +45,7 @@ unshell_leaf! {
authors: unshell::alloc::vec!["ASTATIN3"], authors: unshell::alloc::vec!["ASTATIN3"],
}, },
sessions { sessions {
pty: PtySession, pty: PtySessionState,
} }
procedures { procedures {
ping: PingProcedure, ping: PingProcedure,
+13 -49
View File
@@ -2,17 +2,16 @@ use alloc::vec;
use unshell::{ use unshell::{
interface::{InterfaceEventKind, InterfaceStore, ProcedureKey, SessionKey, SessionViewStatus}, interface::{InterfaceEventKind, InterfaceStore, ProcedureKey, SessionKey, SessionViewStatus},
protocol::{Leaf, Packet}, protocol::{Leaf, Packet, SessionStatus},
}; };
use crate::{ use crate::{
FakePtyLeaf, FakePtyState, OP_EXIT, OP_OPENED, OP_TERMINATE, PROC_PTY, constants::PROC_PING, FakePtyLeaf, FakePtyState, OP_TERMINATE, PROC_PTY, constants::PROC_PING, pty_open_packet,
frame_opcode, pty_open_packet,
}; };
use super::support::{ use super::support::{
ENDPOINT_A, ENDPOINT_B, assert_frame, drain_parent_packets, drain_parent_pty_packets, ENDPOINT_A, ENDPOINT_B, drain_parent_packets, drain_parent_pty_packets, pty_endpoints,
pty_endpoints, send_downward_frame, transfer_packets, send_downward_frame, transfer_packets,
}; };
fn view_has_event<F>(interface: &InterfaceStore, event_indexes: &[usize], mut predicate: F) -> bool fn view_has_event<F>(interface: &InterfaceStore, event_indexes: &[usize], mut predicate: F) -> bool
@@ -51,11 +50,7 @@ fn interface_update_records_session_flow() {
let hook_id = endpoint_a.get_hook_id(); let hook_id = endpoint_a.get_hook_id();
endpoint_a endpoint_a
.add_outbound(pty_open_packet( .add_outbound(pty_open_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id))
vec![ENDPOINT_A, ENDPOINT_B],
hook_id,
&[ENDPOINT_A],
))
.unwrap(); .unwrap();
transfer_packets(&mut endpoint_a, &mut endpoint_b, ENDPOINT_B, ENDPOINT_A); transfer_packets(&mut endpoint_a, &mut endpoint_b, ENDPOINT_B, ENDPOINT_A);
@@ -83,34 +78,21 @@ fn interface_update_records_session_flow() {
&session_view.events, &session_view.events,
|event| matches!( |event| matches!(
event, event,
InterfaceEventKind::OutboundQueued { packet } InterfaceEventKind::SessionUpdated { hook_id: recorded_hook, status, .. }
if packet.hook_id == hook_id && frame_opcode(packet) == Some(OP_OPENED) if *recorded_hook == hook_id && *status == SessionStatus::Running
),
));
assert!(view_has_event(
&interface,
&session_view.events,
|event| matches!(
event,
InterfaceEventKind::RouteSuccess { packet }
if packet.hook_id == hook_id && frame_opcode(packet) == Some(OP_OPENED)
), ),
)); ));
} }
#[test] #[test]
fn interface_update_records_failed_final_route_without_dropping_session() { fn interface_update_records_failed_direct_route_without_retry() {
let (mut endpoint_a, mut endpoint_b) = pty_endpoints(); let (mut endpoint_a, mut endpoint_b) = pty_endpoints();
let mut leaf = FakePtyLeaf::new(FakePtyState::new()); let mut leaf = FakePtyLeaf::new(FakePtyState::new());
let mut interface = InterfaceStore::new(); let mut interface = InterfaceStore::new();
let hook_id = endpoint_a.get_hook_id(); let hook_id = endpoint_a.get_hook_id();
endpoint_a endpoint_a
.add_outbound(pty_open_packet( .add_outbound(pty_open_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id))
vec![ENDPOINT_A, ENDPOINT_B],
hook_id,
&[ENDPOINT_A],
))
.unwrap(); .unwrap();
transfer_packets(&mut endpoint_a, &mut endpoint_b, ENDPOINT_B, ENDPOINT_A); transfer_packets(&mut endpoint_a, &mut endpoint_b, ENDPOINT_B, ENDPOINT_A);
leaf.update_interface(&mut endpoint_b, &mut interface); leaf.update_interface(&mut endpoint_b, &mut interface);
@@ -135,18 +117,9 @@ fn interface_update_records_failed_final_route_without_dropping_session() {
}; };
let session_view = interface.session_views().get(&session_key).unwrap(); let session_view = interface.session_views().get(&session_key).unwrap();
assert_eq!(leaf.active_session_count(), 1); assert_eq!(leaf.active_session_count(), 0);
assert_eq!(leaf.pending_packet_count(), 1); assert_eq!(leaf.pending_packet_count(), 0);
assert_eq!(session_view.status, SessionViewStatus::Closed); assert_eq!(session_view.status, SessionViewStatus::Closed);
assert!(view_has_event(
&interface,
&session_view.events,
|event| matches!(
event,
InterfaceEventKind::RouteFailure { packet, .. }
if packet.hook_id == hook_id && frame_opcode(packet) == Some(OP_EXIT)
),
));
endpoint_b.connections.insert((ENDPOINT_A, true)); endpoint_b.connections.insert((ENDPOINT_A, true));
leaf.update_interface(&mut endpoint_b, &mut interface); leaf.update_interface(&mut endpoint_b, &mut interface);
@@ -156,17 +129,8 @@ fn interface_update_records_failed_final_route_without_dropping_session() {
let session_view = interface.session_views().get(&session_key).unwrap(); let session_view = interface.session_views().get(&session_key).unwrap();
assert_eq!(leaf.active_session_count(), 0); assert_eq!(leaf.active_session_count(), 0);
assert_eq!(packets.len(), 1); assert!(packets.is_empty());
assert_frame(&packets[0], hook_id, OP_EXIT, true, &[0]); assert_eq!(session_view.status, SessionViewStatus::Closed);
assert!(view_has_event(
&interface,
&session_view.events,
|event| matches!(
event,
InterfaceEventKind::RouteSuccess { packet }
if packet.hook_id == hook_id && frame_opcode(packet) == Some(OP_EXIT)
),
));
} }
#[test] #[test]
+7 -11
View File
@@ -124,7 +124,7 @@ fn exit_end_hook_cleans_route_and_session() {
} }
#[test] #[test]
fn failed_final_exit_route_retries_without_losing_session() { fn failed_final_exit_route_closes_session_without_retry() {
let (mut endpoint_a, mut endpoint_b) = pty_endpoints(); let (mut endpoint_a, mut endpoint_b) = pty_endpoints();
let mut leaf = FakePtyLeaf::new(FakePtyState::new()); let mut leaf = FakePtyLeaf::new(FakePtyState::new());
let hook_id = open_pty_session(&mut endpoint_a, &mut endpoint_b, &mut leaf); let hook_id = open_pty_session(&mut endpoint_a, &mut endpoint_b, &mut leaf);
@@ -141,19 +141,18 @@ fn failed_final_exit_route_retries_without_losing_session() {
endpoint_b.connections.remove(&(ENDPOINT_A, true)); endpoint_b.connections.remove(&(ENDPOINT_A, true));
leaf.update(&mut endpoint_b); leaf.update(&mut endpoint_b);
assert_eq!(leaf.active_session_count(), 1); assert_eq!(leaf.active_session_count(), 0);
assert_eq!(leaf.pending_packet_count(), 1); assert_eq!(leaf.pending_packet_count(), 0);
assert_hook_present(&endpoint_b, hook_id); assert_hook_removed(&endpoint_b, hook_id);
endpoint_b.connections.insert((ENDPOINT_A, true)); endpoint_b.connections.insert((ENDPOINT_A, true));
leaf.update(&mut endpoint_b); leaf.update(&mut endpoint_b);
transfer_packets(&mut endpoint_b, &mut endpoint_a, ENDPOINT_A, ENDPOINT_B); transfer_packets(&mut endpoint_b, &mut endpoint_a, ENDPOINT_A, ENDPOINT_B);
let packets = drain_parent_pty_packets(&mut endpoint_a); let packets = drain_parent_pty_packets(&mut endpoint_a);
assert_eq!(packets.len(), 1); assert!(packets.is_empty());
assert_frame(&packets[0], hook_id, OP_EXIT, true, &[0]);
assert_eq!(leaf.active_session_count(), 0); assert_eq!(leaf.active_session_count(), 0);
assert_hook_removed(&endpoint_a, hook_id); assert_hook_present(&endpoint_a, hook_id);
assert_hook_removed(&endpoint_b, hook_id); assert_hook_removed(&endpoint_b, hook_id);
} }
@@ -252,10 +251,7 @@ fn pty_leaf_does_not_consume_other_leaf_packets() {
endpoint.connections.insert((ENDPOINT_A, true)); endpoint.connections.insert((ENDPOINT_A, true));
endpoint endpoint
.add_inbound_from( .add_inbound_from(ENDPOINT_A, pty_open_packet(vec![ENDPOINT_A, ENDPOINT_B], 7))
ENDPOINT_A,
pty_open_packet(vec![ENDPOINT_A, ENDPOINT_B], 7, &[ENDPOINT_A]),
)
.unwrap(); .unwrap();
endpoint endpoint
.add_inbound_from( .add_inbound_from(
+1 -5
View File
@@ -72,11 +72,7 @@ pub(super) fn open_pty_session(
) -> u16 { ) -> u16 {
let hook_id = endpoint_a.get_hook_id(); let hook_id = endpoint_a.get_hook_id();
endpoint_a endpoint_a
.add_outbound(pty_open_packet( .add_outbound(pty_open_packet(vec![ENDPOINT_A, ENDPOINT_B], hook_id))
vec![ENDPOINT_A, ENDPOINT_B],
hook_id,
&[ENDPOINT_A],
))
.unwrap(); .unwrap();
transfer_packets(endpoint_a, endpoint_b, ENDPOINT_B, ENDPOINT_A); transfer_packets(endpoint_a, endpoint_b, ENDPOINT_B, ENDPOINT_A);