mirror of
https://github.com/Astatin3/unshell.git
synced 2026-06-08 22:38:01 -06:00
Add runtime API redesign scaffold
This commit is contained in:
@@ -0,0 +1,21 @@
|
||||
[package]
|
||||
name = "unshell-runtime"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
description = "Transport-neutral runtime API types for UnShell"
|
||||
|
||||
[dependencies]
|
||||
unshell-protocol = { workspace = true }
|
||||
|
||||
[lints.rust]
|
||||
elided_lifetimes_in_paths = "warn"
|
||||
future_incompatible = { level = "warn", priority = -1 }
|
||||
nonstandard_style = { level = "warn", priority = -1 }
|
||||
rust_2018_idioms = { level = "warn", priority = -1 }
|
||||
rust_2021_prelude_collisions = "warn"
|
||||
semicolon_in_expressions_from_macros = "warn"
|
||||
unsafe_op_in_unsafe_fn = "warn"
|
||||
unused_import_braces = "warn"
|
||||
unused_lifetimes = "warn"
|
||||
trivial_casts = "allow"
|
||||
missing_docs = "warn"
|
||||
@@ -0,0 +1,291 @@
|
||||
//! Runtime connection admission and routing metadata.
|
||||
//!
|
||||
//! A connection is not routable just because a transport exists. Only
|
||||
//! [`ConnectionState::Registered`] connections are allowed to produce protocol
|
||||
//! ingress or receive forwarded frames.
|
||||
|
||||
use crate::alloc::string::String;
|
||||
use crate::alloc::vec::Vec;
|
||||
|
||||
/// Stable runtime handle for one transport connection slot.
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
|
||||
pub struct ConnectionId(u64);
|
||||
|
||||
impl ConnectionId {
|
||||
/// Creates a connection identifier from a raw value.
|
||||
#[must_use]
|
||||
pub const fn new(value: u64) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
|
||||
/// Returns the raw identifier value.
|
||||
#[must_use]
|
||||
pub const fn get(self) -> u64 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Monotonic incarnation number for one connection slot.
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
|
||||
pub struct ConnectionGeneration(u64);
|
||||
|
||||
impl ConnectionGeneration {
|
||||
/// First generation assigned to a new connection slot.
|
||||
pub const INITIAL: Self = Self(0);
|
||||
|
||||
/// Creates a generation from a raw value.
|
||||
#[must_use]
|
||||
pub const fn new(value: u64) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
|
||||
/// Returns the raw generation value.
|
||||
#[must_use]
|
||||
pub const fn get(self) -> u64 {
|
||||
self.0
|
||||
}
|
||||
|
||||
/// Returns the next generation, saturating at `u64::MAX`.
|
||||
#[must_use]
|
||||
pub const fn next(self) -> Self {
|
||||
Self(self.0.saturating_add(1))
|
||||
}
|
||||
}
|
||||
|
||||
/// Local tree relationship for a registered connection.
|
||||
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
|
||||
pub enum ConnectionDirection {
|
||||
/// The peer is the direct parent of this endpoint.
|
||||
Parent,
|
||||
/// The peer is a direct child of this endpoint.
|
||||
Child,
|
||||
}
|
||||
|
||||
/// Metadata that makes a connection routable.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct RegisteredConnection {
|
||||
direction: ConnectionDirection,
|
||||
peer_path: Vec<String>,
|
||||
generation: ConnectionGeneration,
|
||||
}
|
||||
|
||||
impl RegisteredConnection {
|
||||
/// Creates registered routing metadata.
|
||||
#[must_use]
|
||||
pub const fn new(
|
||||
direction: ConnectionDirection,
|
||||
peer_path: Vec<String>,
|
||||
generation: ConnectionGeneration,
|
||||
) -> Self {
|
||||
Self {
|
||||
direction,
|
||||
peer_path,
|
||||
generation,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the local tree relationship.
|
||||
#[must_use]
|
||||
pub const fn direction(&self) -> ConnectionDirection {
|
||||
self.direction
|
||||
}
|
||||
|
||||
/// Returns the registered peer path.
|
||||
#[must_use]
|
||||
pub fn peer_path(&self) -> &[String] {
|
||||
&self.peer_path
|
||||
}
|
||||
|
||||
/// Returns the connection generation.
|
||||
#[must_use]
|
||||
pub const fn generation(&self) -> ConnectionGeneration {
|
||||
self.generation
|
||||
}
|
||||
}
|
||||
|
||||
/// Runtime lifecycle state for one connection slot.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub enum ConnectionState {
|
||||
/// The transport exists but has not started or completed admission.
|
||||
Connected {
|
||||
/// Connection generation for this transport incarnation.
|
||||
generation: ConnectionGeneration,
|
||||
},
|
||||
/// The runtime is evaluating whether this peer should become routable.
|
||||
Authenticating {
|
||||
/// Connection generation for this transport incarnation.
|
||||
generation: ConnectionGeneration,
|
||||
},
|
||||
/// The peer is admitted into protocol routing.
|
||||
Registered(RegisteredConnection),
|
||||
/// The runtime is tearing this connection down and should reject new work.
|
||||
Draining {
|
||||
/// Connection generation for this transport incarnation.
|
||||
generation: ConnectionGeneration,
|
||||
},
|
||||
/// The connection is closed and retained only as historical metadata.
|
||||
Closed {
|
||||
/// Connection generation for this transport incarnation.
|
||||
generation: ConnectionGeneration,
|
||||
},
|
||||
}
|
||||
|
||||
impl ConnectionState {
|
||||
/// Returns the generation associated with this state.
|
||||
#[must_use]
|
||||
pub const fn generation(&self) -> ConnectionGeneration {
|
||||
match self {
|
||||
Self::Connected { generation }
|
||||
| Self::Authenticating { generation }
|
||||
| Self::Draining { generation }
|
||||
| Self::Closed { generation } => *generation,
|
||||
Self::Registered(registered) => registered.generation(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns registered metadata when this connection is routable.
|
||||
#[must_use]
|
||||
pub const fn registered(&self) -> Option<&RegisteredConnection> {
|
||||
match self {
|
||||
Self::Registered(registered) => Some(registered),
|
||||
Self::Connected { .. }
|
||||
| Self::Authenticating { .. }
|
||||
| Self::Draining { .. }
|
||||
| Self::Closed { .. } => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// One runtime connection slot.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct Connection {
|
||||
id: ConnectionId,
|
||||
state: ConnectionState,
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
/// Creates a connected but unroutable connection slot.
|
||||
#[must_use]
|
||||
pub const fn connected(id: ConnectionId, generation: ConnectionGeneration) -> Self {
|
||||
Self {
|
||||
id,
|
||||
state: ConnectionState::Connected { generation },
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a registered connection slot.
|
||||
#[must_use]
|
||||
pub const fn registered(
|
||||
id: ConnectionId,
|
||||
direction: ConnectionDirection,
|
||||
peer_path: Vec<String>,
|
||||
generation: ConnectionGeneration,
|
||||
) -> Self {
|
||||
Self {
|
||||
id,
|
||||
state: ConnectionState::Registered(RegisteredConnection::new(
|
||||
direction, peer_path, generation,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the connection id.
|
||||
#[must_use]
|
||||
pub const fn id(&self) -> ConnectionId {
|
||||
self.id
|
||||
}
|
||||
|
||||
/// Returns the current connection state.
|
||||
#[must_use]
|
||||
pub const fn state(&self) -> &ConnectionState {
|
||||
&self.state
|
||||
}
|
||||
|
||||
/// Replaces the current connection state.
|
||||
pub fn set_state(&mut self, state: ConnectionState) {
|
||||
self.state = state;
|
||||
}
|
||||
}
|
||||
|
||||
/// Connection metadata table owned by the runtime.
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq)]
|
||||
pub struct Connections {
|
||||
entries: Vec<Connection>,
|
||||
}
|
||||
|
||||
impl Connections {
|
||||
/// Creates an empty table.
|
||||
#[must_use]
|
||||
pub const fn new() -> Self {
|
||||
Self {
|
||||
entries: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Inserts a connection descriptor.
|
||||
pub fn push(&mut self, connection: Connection) {
|
||||
self.entries.push(connection);
|
||||
}
|
||||
|
||||
/// Returns all connection descriptors.
|
||||
#[must_use]
|
||||
pub fn entries(&self) -> &[Connection] {
|
||||
&self.entries
|
||||
}
|
||||
|
||||
/// Finds a connection by id.
|
||||
#[must_use]
|
||||
pub fn get(&self, id: ConnectionId) -> Option<&Connection> {
|
||||
self.entries.iter().find(|entry| entry.id() == id)
|
||||
}
|
||||
|
||||
/// Finds a mutable connection by id.
|
||||
#[must_use]
|
||||
pub fn get_mut(&mut self, id: ConnectionId) -> Option<&mut Connection> {
|
||||
self.entries.iter_mut().find(|entry| entry.id() == id)
|
||||
}
|
||||
|
||||
/// Returns registered metadata for a routable connection.
|
||||
#[must_use]
|
||||
pub fn registered(&self, id: ConnectionId) -> Option<&RegisteredConnection> {
|
||||
self.get(id)
|
||||
.and_then(|connection| connection.state().registered())
|
||||
}
|
||||
|
||||
/// Finds a registered connection by direction.
|
||||
#[must_use]
|
||||
pub fn registered_by_direction(&self, direction: ConnectionDirection) -> Option<&Connection> {
|
||||
self.entries.iter().find(|entry| {
|
||||
entry
|
||||
.state()
|
||||
.registered()
|
||||
.is_some_and(|registered| registered.direction() == direction)
|
||||
})
|
||||
}
|
||||
|
||||
/// Finds a registered connection by direction and peer path.
|
||||
#[must_use]
|
||||
pub fn registered_by_path(
|
||||
&self,
|
||||
direction: ConnectionDirection,
|
||||
peer_path: &[String],
|
||||
) -> Option<&Connection> {
|
||||
self.entries.iter().find(|entry| {
|
||||
entry.state().registered().is_some_and(|registered| {
|
||||
registered.direction() == direction && registered.peer_path() == peer_path
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Read-only connection table view exposed to leaf contexts.
|
||||
pub trait ConnectionTable {
|
||||
/// Returns registered metadata for a routable connection.
|
||||
fn registered(&self, id: ConnectionId) -> Option<&RegisteredConnection>;
|
||||
}
|
||||
|
||||
impl ConnectionTable for Connections {
|
||||
fn registered(&self, id: ConnectionId) -> Option<&RegisteredConnection> {
|
||||
Self::registered(self, id)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,205 @@
|
||||
//! Request-only context exposed to leaf callbacks.
|
||||
//!
|
||||
//! Leaf code never receives direct access to route tables, hook state, endpoint
|
||||
//! internals, or transport handles. It can only enqueue [`LeafAction`] values.
|
||||
//! The runtime validates and applies those actions later.
|
||||
|
||||
use crate::alloc::string::String;
|
||||
use crate::alloc::vec::Vec;
|
||||
use crate::connections::{ConnectionDirection, ConnectionId, Connections};
|
||||
use crate::leaf::{LeafCapabilities, LeafId};
|
||||
use unshell_protocol::ProtocolFault;
|
||||
|
||||
/// Context handed to one leaf callback.
|
||||
#[derive(Debug)]
|
||||
pub struct LeafContext<'a> {
|
||||
local_path: &'a [String],
|
||||
leaf_id: &'a LeafId,
|
||||
capabilities: &'a LeafCapabilities,
|
||||
connections: &'a Connections,
|
||||
actions: Vec<LeafAction>,
|
||||
}
|
||||
|
||||
impl<'a> LeafContext<'a> {
|
||||
/// Creates a context for one leaf callback.
|
||||
#[must_use]
|
||||
pub const fn new(
|
||||
local_path: &'a [String],
|
||||
leaf_id: &'a LeafId,
|
||||
capabilities: &'a LeafCapabilities,
|
||||
connections: &'a Connections,
|
||||
) -> Self {
|
||||
Self {
|
||||
local_path,
|
||||
leaf_id,
|
||||
capabilities,
|
||||
connections,
|
||||
actions: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns this endpoint's absolute path.
|
||||
#[must_use]
|
||||
pub const fn local_path(&self) -> &[String] {
|
||||
self.local_path
|
||||
}
|
||||
|
||||
/// Returns the leaf currently using this context.
|
||||
#[must_use]
|
||||
pub const fn leaf_id(&self) -> &LeafId {
|
||||
self.leaf_id
|
||||
}
|
||||
|
||||
/// Returns the permissions granted to this leaf.
|
||||
#[must_use]
|
||||
pub const fn capabilities(&self) -> &LeafCapabilities {
|
||||
self.capabilities
|
||||
}
|
||||
|
||||
/// Returns read-only connection metadata.
|
||||
#[must_use]
|
||||
pub const fn connections(&self) -> &Connections {
|
||||
self.connections
|
||||
}
|
||||
|
||||
/// Returns queued leaf actions.
|
||||
#[must_use]
|
||||
pub fn actions(&self) -> &[LeafAction] {
|
||||
&self.actions
|
||||
}
|
||||
|
||||
/// Consumes the context and returns queued actions.
|
||||
#[must_use]
|
||||
pub fn into_actions(self) -> Vec<LeafAction> {
|
||||
self.actions
|
||||
}
|
||||
|
||||
/// Requests an outbound call.
|
||||
pub fn call(&mut self, call: OutboundCall) -> Result<(), RequestDenied> {
|
||||
if !self.capabilities.permissions.send_calls {
|
||||
return Err(RequestDenied::MissingCapability(
|
||||
RuntimeCapability::SendCalls,
|
||||
));
|
||||
}
|
||||
self.actions.push(LeafAction::SendCall(call));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Requests data on an existing hook.
|
||||
pub fn hook_data(&mut self, data: OutboundHookData) -> Result<(), RequestDenied> {
|
||||
if !self.capabilities.permissions.send_hook_data {
|
||||
return Err(RequestDenied::MissingCapability(
|
||||
RuntimeCapability::SendHookData,
|
||||
));
|
||||
}
|
||||
self.actions.push(LeafAction::SendHookData(data));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Requests hook termination with a protocol fault.
|
||||
pub fn fail_hook(&mut self, hook_id: u64, fault: ProtocolFault) -> Result<(), RequestDenied> {
|
||||
if !self.capabilities.permissions.send_hook_data {
|
||||
return Err(RequestDenied::MissingCapability(
|
||||
RuntimeCapability::SendHookData,
|
||||
));
|
||||
}
|
||||
self.actions.push(LeafAction::FailHook { hook_id, fault });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Requests a connection admission or teardown action.
|
||||
pub fn connection(&mut self, request: ConnectionAction) -> Result<(), RequestDenied> {
|
||||
if !self.capabilities.permissions.manage_connections {
|
||||
return Err(RequestDenied::MissingCapability(
|
||||
RuntimeCapability::ManageConnections,
|
||||
));
|
||||
}
|
||||
self.actions.push(LeafAction::Connection(request));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Runtime action requested by leaf code.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub enum LeafAction {
|
||||
/// Build and send one outbound call.
|
||||
SendCall(OutboundCall),
|
||||
/// Build and send one hook data packet.
|
||||
SendHookData(OutboundHookData),
|
||||
/// Terminate a hook with a protocol fault.
|
||||
FailHook {
|
||||
/// Hook identifier scoped by the hook host.
|
||||
hook_id: u64,
|
||||
/// Stable protocol fault code.
|
||||
fault: ProtocolFault,
|
||||
},
|
||||
/// Request a connection state change.
|
||||
Connection(ConnectionAction),
|
||||
}
|
||||
|
||||
/// Outbound call request before packet construction.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct OutboundCall {
|
||||
/// Destination endpoint path.
|
||||
pub dst_path: Vec<String>,
|
||||
/// Optional destination leaf name.
|
||||
pub dst_leaf: Option<String>,
|
||||
/// Canonical procedure id.
|
||||
pub procedure_id: String,
|
||||
/// Opaque request payload.
|
||||
pub payload: Vec<u8>,
|
||||
/// Whether the runtime should allocate a response hook.
|
||||
pub expects_response: bool,
|
||||
}
|
||||
|
||||
/// Hook data request before packet construction.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct OutboundHookData {
|
||||
/// Destination endpoint path for the hook packet.
|
||||
pub dst_path: Vec<String>,
|
||||
/// Hook identifier scoped by the receiving endpoint.
|
||||
pub hook_id: u64,
|
||||
/// Canonical procedure id associated with the hook stream.
|
||||
pub procedure_id: String,
|
||||
/// Opaque payload bytes.
|
||||
pub payload: Vec<u8>,
|
||||
/// Whether this packet closes the local side of the hook.
|
||||
pub end_hook: bool,
|
||||
}
|
||||
|
||||
/// Requested connection state change.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub enum ConnectionAction {
|
||||
/// Register an existing connection as a direct parent or child.
|
||||
Register {
|
||||
/// Runtime transport connection id.
|
||||
connection: ConnectionId,
|
||||
/// Requested tree direction.
|
||||
direction: ConnectionDirection,
|
||||
/// Peer path to register.
|
||||
peer_path: Vec<String>,
|
||||
},
|
||||
/// Remove a connection from runtime routing.
|
||||
Unregister {
|
||||
/// Runtime transport connection id.
|
||||
connection: ConnectionId,
|
||||
},
|
||||
}
|
||||
|
||||
/// Capability checked by [`LeafContext`] helpers.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum RuntimeCapability {
|
||||
/// Permission to request outbound calls.
|
||||
SendCalls,
|
||||
/// Permission to request hook data or hook faults.
|
||||
SendHookData,
|
||||
/// Permission to request connection state changes.
|
||||
ManageConnections,
|
||||
}
|
||||
|
||||
/// Rejection reason for a leaf action request.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum RequestDenied {
|
||||
/// The leaf does not have the required capability.
|
||||
MissingCapability(RuntimeCapability),
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
//! Runtime effects produced by packet processing.
|
||||
|
||||
use crate::alloc::vec::Vec;
|
||||
use crate::connections::{ConnectionGeneration, ConnectionId};
|
||||
use unshell_protocol::FrameBytes;
|
||||
use unshell_protocol::tree::LocalEvent;
|
||||
|
||||
/// Side effect selected by endpoint packet processing.
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum RuntimeEffect {
|
||||
/// Send a frame to a registered connection.
|
||||
SendFrame {
|
||||
/// Destination connection id.
|
||||
connection: ConnectionId,
|
||||
/// Generation observed when the effect was queued.
|
||||
generation: ConnectionGeneration,
|
||||
/// Encoded protocol frame.
|
||||
frame: FrameBytes,
|
||||
},
|
||||
/// Deliver a local protocol event to the future leaf/session dispatcher.
|
||||
Local(LocalEvent),
|
||||
/// The frame was intentionally dropped by protocol state.
|
||||
Dropped,
|
||||
}
|
||||
|
||||
/// FIFO queue of runtime effects.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct EffectQueue {
|
||||
entries: Vec<RuntimeEffect>,
|
||||
}
|
||||
|
||||
impl EffectQueue {
|
||||
/// Creates an empty effect queue.
|
||||
#[must_use]
|
||||
pub const fn new() -> Self {
|
||||
Self {
|
||||
entries: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Queues an effect.
|
||||
pub fn push(&mut self, effect: RuntimeEffect) {
|
||||
self.entries.push(effect);
|
||||
}
|
||||
|
||||
/// Returns queued effects.
|
||||
#[must_use]
|
||||
pub fn entries(&self) -> &[RuntimeEffect] {
|
||||
&self.entries
|
||||
}
|
||||
|
||||
/// Drains queued effects in FIFO order.
|
||||
pub fn drain(&mut self) -> impl Iterator<Item = RuntimeEffect> + '_ {
|
||||
self.entries.drain(..)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,110 @@
|
||||
//! Leaf-facing runtime types.
|
||||
|
||||
use crate::alloc::string::String;
|
||||
use crate::alloc::vec::Vec;
|
||||
use crate::context::LeafContext;
|
||||
use unshell_protocol::tree::{IncomingCall, IncomingData, IncomingFault};
|
||||
|
||||
/// Stable identifier for a locally hosted leaf binding.
|
||||
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
|
||||
pub struct LeafId(String);
|
||||
|
||||
impl LeafId {
|
||||
/// Creates a leaf id from an owned string.
|
||||
#[must_use]
|
||||
pub const fn new(value: String) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
|
||||
/// Returns the leaf id as a string slice.
|
||||
#[must_use]
|
||||
pub fn as_str(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Runtime permissions granted to one leaf binding.
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
|
||||
pub struct LeafPermissions {
|
||||
/// The leaf may request new outbound calls.
|
||||
pub send_calls: bool,
|
||||
/// The leaf may request data or faults on hook streams.
|
||||
pub send_hook_data: bool,
|
||||
/// The leaf may request connection registration or removal.
|
||||
pub manage_connections: bool,
|
||||
}
|
||||
|
||||
impl LeafPermissions {
|
||||
/// Grants no runtime-side effects.
|
||||
pub const NONE: Self = Self {
|
||||
send_calls: false,
|
||||
send_hook_data: false,
|
||||
manage_connections: false,
|
||||
};
|
||||
|
||||
/// Grants the common permission set for a passive responder leaf.
|
||||
pub const REPLY_ONLY: Self = Self {
|
||||
send_calls: false,
|
||||
send_hook_data: true,
|
||||
manage_connections: false,
|
||||
};
|
||||
|
||||
/// Grants all current permissions. Use sparingly.
|
||||
pub const ALL: Self = Self {
|
||||
send_calls: true,
|
||||
send_hook_data: true,
|
||||
manage_connections: true,
|
||||
};
|
||||
}
|
||||
|
||||
/// Protocol surface and runtime permissions for one leaf.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct LeafCapabilities {
|
||||
/// Canonical dotted leaf name.
|
||||
pub leaf_name: String,
|
||||
/// Canonical procedure ids supported by the leaf.
|
||||
pub procedures: Vec<String>,
|
||||
/// Runtime permissions granted to this leaf binding.
|
||||
pub permissions: LeafPermissions,
|
||||
}
|
||||
|
||||
/// One hosted leaf implementation.
|
||||
pub trait Leaf {
|
||||
/// Leaf-specific error type.
|
||||
type Error;
|
||||
|
||||
/// Returns static protocol and runtime capabilities.
|
||||
fn capabilities(&self) -> &LeafCapabilities;
|
||||
|
||||
/// Handles one opening call routed to this leaf.
|
||||
fn on_call(
|
||||
&mut self,
|
||||
_ctx: &mut LeafContext<'_>,
|
||||
_call: IncomingCall,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handles hook data routed to this leaf or its session adapter.
|
||||
fn on_data(
|
||||
&mut self,
|
||||
_ctx: &mut LeafContext<'_>,
|
||||
_data: IncomingData,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handles hook fault routed to this leaf or its session adapter.
|
||||
fn on_fault(
|
||||
&mut self,
|
||||
_ctx: &mut LeafContext<'_>,
|
||||
_fault: IncomingFault,
|
||||
) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Gives the leaf one bounded opportunity to request local work.
|
||||
fn poll(&mut self, _ctx: &mut LeafContext<'_>) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,122 @@
|
||||
//! # UnShell Runtime
|
||||
//!
|
||||
//! Single-threaded runtime scaffolding for hosting UnShell protocol nodes. This
|
||||
//! crate currently bridges the existing protocol endpoint state while defining
|
||||
//! the concrete transport, connection, and leaf-action APIs the redesign will use.
|
||||
|
||||
#![no_std]
|
||||
|
||||
pub extern crate alloc;
|
||||
|
||||
pub mod connections;
|
||||
pub mod context;
|
||||
pub mod effects;
|
||||
pub mod leaf;
|
||||
pub mod node;
|
||||
pub mod transport;
|
||||
|
||||
pub use connections::{
|
||||
Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, ConnectionState,
|
||||
ConnectionTable, Connections, RegisteredConnection,
|
||||
};
|
||||
pub use context::{
|
||||
ConnectionAction, LeafAction, LeafContext, OutboundCall, OutboundHookData, RequestDenied,
|
||||
RuntimeCapability,
|
||||
};
|
||||
pub use effects::{EffectQueue, RuntimeEffect};
|
||||
pub use leaf::{Leaf, LeafCapabilities, LeafId, LeafPermissions};
|
||||
pub use node::{
|
||||
EndpointState, Node, NodeId, NodeRuntime, NodeRuntimeError, NodeState, TickBudget, TickOutcome,
|
||||
};
|
||||
pub use transport::Transport;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::alloc::string::String;
|
||||
use crate::alloc::vec;
|
||||
use crate::alloc::vec::Vec;
|
||||
|
||||
use super::{
|
||||
Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, ConnectionState,
|
||||
Connections, LeafAction, LeafCapabilities, LeafContext, LeafId, LeafPermissions,
|
||||
OutboundCall, OutboundHookData, RequestDenied, RuntimeCapability,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn connection_generation_advances_without_wrapping() {
|
||||
assert_eq!(ConnectionGeneration::INITIAL.get(), 0);
|
||||
assert_eq!(ConnectionGeneration::new(41).next().get(), 42);
|
||||
assert_eq!(ConnectionGeneration::new(u64::MAX).next().get(), u64::MAX);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_table_reports_registered_connection_metadata() {
|
||||
let id = ConnectionId::new(7);
|
||||
let mut connections = Connections::new();
|
||||
connections.push(Connection::registered(
|
||||
id,
|
||||
ConnectionDirection::Child,
|
||||
vec![String::from("root"), String::from("child")],
|
||||
ConnectionGeneration::new(3),
|
||||
));
|
||||
|
||||
let registered = connections
|
||||
.registered(id)
|
||||
.expect("connection is registered");
|
||||
assert_eq!(registered.direction(), ConnectionDirection::Child);
|
||||
assert_eq!(registered.generation().get(), 3);
|
||||
assert_eq!(registered.peer_path(), ["root", "child"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connected_connections_are_not_routable() {
|
||||
let id = ConnectionId::new(9);
|
||||
let mut connections = Connections::new();
|
||||
connections.push(Connection::connected(id, ConnectionGeneration::INITIAL));
|
||||
|
||||
assert!(connections.registered(id).is_none());
|
||||
assert!(matches!(
|
||||
connections.get(id).unwrap().state(),
|
||||
ConnectionState::Connected { .. }
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn leaf_context_queues_only_capability_checked_actions() {
|
||||
let id = LeafId::new(String::from("org.example.v1.echo"));
|
||||
let capabilities = LeafCapabilities {
|
||||
leaf_name: String::from("org.example.v1.echo"),
|
||||
procedures: vec![String::from("org.example.v1.echo.invoke")],
|
||||
permissions: LeafPermissions::REPLY_ONLY,
|
||||
};
|
||||
let connections = Connections::new();
|
||||
let local_path = vec![String::from("root")];
|
||||
let mut ctx = LeafContext::new(&local_path, &id, &capabilities, &connections);
|
||||
|
||||
ctx.hook_data(OutboundHookData {
|
||||
dst_path: vec![String::from("root")],
|
||||
hook_id: 7,
|
||||
procedure_id: String::from("org.example.v1.echo.invoke"),
|
||||
payload: vec![1, 2, 3],
|
||||
end_hook: true,
|
||||
})
|
||||
.expect("reply-only leaf can send hook data");
|
||||
|
||||
let denied = ctx.call(OutboundCall {
|
||||
dst_path: vec![String::from("root"), String::from("child")],
|
||||
dst_leaf: None,
|
||||
procedure_id: String::from("org.example.v1.echo.invoke"),
|
||||
payload: Vec::new(),
|
||||
expects_response: false,
|
||||
});
|
||||
|
||||
assert_eq!(ctx.local_path(), ["root"]);
|
||||
assert!(matches!(ctx.actions()[0], LeafAction::SendHookData(_)));
|
||||
assert_eq!(
|
||||
denied,
|
||||
Err(RequestDenied::MissingCapability(
|
||||
RuntimeCapability::SendCalls
|
||||
))
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
//! Node-level runtime identity types.
|
||||
//!
|
||||
//! A node is the local runtime owner for protocol state, leaf bindings, and
|
||||
//! transport connections. This module only models identity and lifecycle state.
|
||||
|
||||
pub mod packet;
|
||||
pub mod runtime;
|
||||
pub mod state;
|
||||
|
||||
pub use packet::{EndpointState, PacketProcessor};
|
||||
pub use runtime::{NodeRuntime, NodeRuntimeError, TickBudget, TickOutcome};
|
||||
pub use state::NodeState;
|
||||
|
||||
use crate::alloc::string::String;
|
||||
|
||||
/// Stable identifier for a runtime node.
|
||||
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
|
||||
pub struct NodeId(String);
|
||||
|
||||
impl NodeId {
|
||||
/// Creates a node identifier from an owned string.
|
||||
#[must_use]
|
||||
pub const fn new(value: String) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
|
||||
/// Returns the identifier as a string slice.
|
||||
#[must_use]
|
||||
pub fn as_str(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
|
||||
/// Consumes the identifier and returns the owned string.
|
||||
#[must_use]
|
||||
pub fn into_string(self) -> String {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Minimal runtime node descriptor.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct Node {
|
||||
id: NodeId,
|
||||
state: NodeState,
|
||||
}
|
||||
|
||||
impl Node {
|
||||
/// Creates a new node descriptor in the default [`NodeState::Created`] state.
|
||||
#[must_use]
|
||||
pub const fn new(id: NodeId) -> Self {
|
||||
Self {
|
||||
id,
|
||||
state: NodeState::Created,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the node identifier.
|
||||
#[must_use]
|
||||
pub const fn id(&self) -> &NodeId {
|
||||
&self.id
|
||||
}
|
||||
|
||||
/// Returns the current node lifecycle state.
|
||||
#[must_use]
|
||||
pub const fn state(&self) -> NodeState {
|
||||
self.state
|
||||
}
|
||||
|
||||
/// Updates the current node lifecycle state.
|
||||
pub const fn set_state(&mut self, state: NodeState) {
|
||||
self.state = state;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,86 @@
|
||||
//! Transitional packet-processing wrapper around the current protocol endpoint.
|
||||
//!
|
||||
//! This module is intentionally small. It gives the new runtime crate a concrete
|
||||
//! bridge to the existing packet state machine while the protocol crate is split
|
||||
//! into packet-only and runtime-owned layers. The wrapper does not own transport
|
||||
//! handles, does not dispatch leaves, and does not make admission decisions.
|
||||
|
||||
use unshell_protocol::{FrameBytes, tree::Endpoint as ProtocolEndpointTrait};
|
||||
|
||||
pub use unshell_protocol::tree::{
|
||||
ChildRoute, EndpointError, EndpointOutcome, HookKey, Ingress, LeafSpec, LocalEvent,
|
||||
ProtocolEndpoint, RouteDecision,
|
||||
};
|
||||
|
||||
/// Minimal packet processor used by future single-threaded runtimes.
|
||||
///
|
||||
/// The processor receives one frame with an already-derived ingress side and
|
||||
/// returns the existing endpoint outcome. A full `NodeRuntime` should derive the
|
||||
/// ingress from registered connection metadata before calling this trait.
|
||||
pub trait PacketProcessor {
|
||||
/// Processes one serialized frame through protocol validation, routing, and
|
||||
/// hook-state transitions.
|
||||
fn process_frame(
|
||||
&mut self,
|
||||
ingress: &Ingress,
|
||||
frame: FrameBytes,
|
||||
) -> Result<EndpointOutcome, EndpointError>;
|
||||
}
|
||||
|
||||
/// Runtime-owned endpoint packet state.
|
||||
///
|
||||
/// This is a compatibility shell around [`ProtocolEndpoint`]. It exists so new
|
||||
/// runtime code can depend on `unshell_runtime::node::EndpointState` while the
|
||||
/// old protocol-tree endpoint remains the source of truth for packet invariants.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct EndpointState {
|
||||
endpoint: ProtocolEndpoint,
|
||||
}
|
||||
|
||||
impl EndpointState {
|
||||
/// Creates a packet state wrapper from an existing protocol endpoint.
|
||||
#[must_use]
|
||||
pub const fn new(endpoint: ProtocolEndpoint) -> Self {
|
||||
Self { endpoint }
|
||||
}
|
||||
|
||||
/// Creates packet state for a root-assumed endpoint.
|
||||
#[must_use]
|
||||
pub fn root(
|
||||
local_id: impl Into<alloc::string::String>,
|
||||
leaves: alloc::vec::Vec<LeafSpec>,
|
||||
) -> Self {
|
||||
Self::new(ProtocolEndpoint::root(local_id, leaves))
|
||||
}
|
||||
|
||||
/// Returns the wrapped protocol endpoint.
|
||||
#[must_use]
|
||||
pub const fn endpoint(&self) -> &ProtocolEndpoint {
|
||||
&self.endpoint
|
||||
}
|
||||
|
||||
/// Returns mutable access to the wrapped protocol endpoint.
|
||||
///
|
||||
/// This is intentionally exposed only on the transitional wrapper. New runtime
|
||||
/// code should prefer smaller methods as the endpoint state is split apart.
|
||||
#[must_use]
|
||||
pub const fn endpoint_mut(&mut self) -> &mut ProtocolEndpoint {
|
||||
&mut self.endpoint
|
||||
}
|
||||
|
||||
/// Consumes the wrapper and returns the underlying protocol endpoint.
|
||||
#[must_use]
|
||||
pub fn into_endpoint(self) -> ProtocolEndpoint {
|
||||
self.endpoint
|
||||
}
|
||||
}
|
||||
|
||||
impl PacketProcessor for EndpointState {
|
||||
fn process_frame(
|
||||
&mut self,
|
||||
ingress: &Ingress,
|
||||
frame: FrameBytes,
|
||||
) -> Result<EndpointOutcome, EndpointError> {
|
||||
self.endpoint.receive(ingress, frame)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,439 @@
|
||||
//! Single-threaded runtime shell around endpoint packet state.
|
||||
//!
|
||||
//! This first slice owns transport and connection metadata, derives ingress from
|
||||
//! registered connections, delegates packet invariants to [`EndpointState`], and
|
||||
//! queues concrete runtime effects. Leaf dispatch and leaf-action application are
|
||||
//! intentionally not implemented in this slice.
|
||||
|
||||
use crate::connections::{ConnectionDirection, ConnectionId, Connections, RegisteredConnection};
|
||||
use crate::effects::{EffectQueue, RuntimeEffect};
|
||||
use crate::transport::Transport;
|
||||
use unshell_protocol::FrameBytes;
|
||||
use unshell_protocol::tree::{EndpointError, EndpointOutcome, Ingress, RouteDecision};
|
||||
|
||||
use super::{EndpointState, PacketProcessor};
|
||||
|
||||
/// Limits one runtime progress step.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct TickBudget {
|
||||
/// Maximum inbound frames to poll from the transport.
|
||||
pub max_inbound_frames: usize,
|
||||
/// Whether queued outbound frame effects should be flushed through transport.
|
||||
pub flush_outbound: bool,
|
||||
}
|
||||
|
||||
impl Default for TickBudget {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_inbound_frames: 16,
|
||||
flush_outbound: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Summary returned after one runtime step.
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
|
||||
pub struct TickOutcome {
|
||||
/// Number of inbound frames processed.
|
||||
pub inbound_frames: usize,
|
||||
/// Number of outbound frames sent.
|
||||
pub outbound_frames: usize,
|
||||
/// Number of frames intentionally dropped.
|
||||
pub dropped_frames: usize,
|
||||
/// Number of local endpoint events queued for later leaf dispatch.
|
||||
pub local_events: usize,
|
||||
}
|
||||
|
||||
/// Error surfaced by [`NodeRuntime`].
|
||||
#[derive(Debug)]
|
||||
pub enum NodeRuntimeError<TransportError> {
|
||||
/// The connection is unknown or not registered for protocol routing.
|
||||
UnregisteredConnection(ConnectionId),
|
||||
/// The endpoint selected a route with no matching registered connection.
|
||||
MissingRouteConnection,
|
||||
/// Packet processing failed inside endpoint state.
|
||||
Endpoint(EndpointError),
|
||||
/// Transport send, receive, or flush failed.
|
||||
Transport(TransportError),
|
||||
}
|
||||
|
||||
impl<TransportError> core::fmt::Display for NodeRuntimeError<TransportError>
|
||||
where
|
||||
TransportError: core::fmt::Display,
|
||||
{
|
||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||
match self {
|
||||
Self::UnregisteredConnection(connection) => {
|
||||
write!(f, "connection {} is not registered", connection.get())
|
||||
}
|
||||
Self::MissingRouteConnection => f.write_str("route has no registered connection"),
|
||||
Self::Endpoint(error) => write!(f, "{error}"),
|
||||
Self::Transport(error) => write!(f, "{error}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TransportError> core::error::Error for NodeRuntimeError<TransportError> where
|
||||
TransportError: core::error::Error + 'static
|
||||
{
|
||||
}
|
||||
|
||||
/// Runtime owner for one endpoint, transport, and connection table.
|
||||
#[derive(Debug)]
|
||||
pub struct NodeRuntime<T> {
|
||||
endpoint: EndpointState,
|
||||
connections: Connections,
|
||||
transport: T,
|
||||
effects: EffectQueue,
|
||||
}
|
||||
|
||||
impl<T> NodeRuntime<T> {
|
||||
/// Creates a runtime from endpoint state, registered connection metadata, and
|
||||
/// one concrete transport.
|
||||
#[must_use]
|
||||
pub const fn new(endpoint: EndpointState, connections: Connections, transport: T) -> Self {
|
||||
Self {
|
||||
endpoint,
|
||||
connections,
|
||||
transport,
|
||||
effects: EffectQueue::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns endpoint packet state.
|
||||
#[must_use]
|
||||
pub const fn endpoint(&self) -> &EndpointState {
|
||||
&self.endpoint
|
||||
}
|
||||
|
||||
/// Returns mutable endpoint packet state.
|
||||
#[must_use]
|
||||
pub const fn endpoint_mut(&mut self) -> &mut EndpointState {
|
||||
&mut self.endpoint
|
||||
}
|
||||
|
||||
/// Returns connection metadata.
|
||||
#[must_use]
|
||||
pub const fn connections(&self) -> &Connections {
|
||||
&self.connections
|
||||
}
|
||||
|
||||
/// Returns mutable connection metadata.
|
||||
#[must_use]
|
||||
pub const fn connections_mut(&mut self) -> &mut Connections {
|
||||
&mut self.connections
|
||||
}
|
||||
|
||||
/// Returns the transport.
|
||||
#[must_use]
|
||||
pub const fn transport(&self) -> &T {
|
||||
&self.transport
|
||||
}
|
||||
|
||||
/// Returns the mutable transport.
|
||||
#[must_use]
|
||||
pub const fn transport_mut(&mut self) -> &mut T {
|
||||
&mut self.transport
|
||||
}
|
||||
|
||||
/// Returns currently queued effects.
|
||||
#[must_use]
|
||||
pub fn effects(&self) -> &[RuntimeEffect] {
|
||||
self.effects.entries()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> NodeRuntime<T>
|
||||
where
|
||||
T: Transport,
|
||||
{
|
||||
/// Processes one nonblocking runtime step.
|
||||
pub fn tick(&mut self, budget: TickBudget) -> Result<TickOutcome, NodeRuntimeError<T::Error>> {
|
||||
let mut outcome = TickOutcome::default();
|
||||
|
||||
for _ in 0..budget.max_inbound_frames {
|
||||
let Some((connection, frame)) = self
|
||||
.transport
|
||||
.poll_recv()
|
||||
.map_err(NodeRuntimeError::Transport)?
|
||||
else {
|
||||
break;
|
||||
};
|
||||
self.receive_frame(connection, frame)?;
|
||||
outcome.inbound_frames += 1;
|
||||
}
|
||||
|
||||
outcome.dropped_frames += self
|
||||
.effects
|
||||
.entries()
|
||||
.iter()
|
||||
.filter(|effect| matches!(effect, RuntimeEffect::Dropped))
|
||||
.count();
|
||||
outcome.local_events += self
|
||||
.effects
|
||||
.entries()
|
||||
.iter()
|
||||
.filter(|effect| matches!(effect, RuntimeEffect::Local(_)))
|
||||
.count();
|
||||
|
||||
if budget.flush_outbound {
|
||||
outcome.outbound_frames = self.flush_outbound()?;
|
||||
}
|
||||
Ok(outcome)
|
||||
}
|
||||
|
||||
/// Processes one frame from a known transport connection.
|
||||
pub fn receive_frame(
|
||||
&mut self,
|
||||
connection: ConnectionId,
|
||||
frame: FrameBytes,
|
||||
) -> Result<(), NodeRuntimeError<T::Error>> {
|
||||
let registered = self
|
||||
.connections
|
||||
.registered(connection)
|
||||
.ok_or(NodeRuntimeError::UnregisteredConnection(connection))?;
|
||||
let ingress = ingress_for(registered);
|
||||
let outcome = self
|
||||
.endpoint
|
||||
.process_frame(&ingress, frame)
|
||||
.map_err(NodeRuntimeError::Endpoint)?;
|
||||
self.apply_outcome(outcome)
|
||||
}
|
||||
|
||||
fn apply_outcome(
|
||||
&mut self,
|
||||
outcome: EndpointOutcome,
|
||||
) -> Result<(), NodeRuntimeError<T::Error>> {
|
||||
match outcome {
|
||||
EndpointOutcome::Forward { route, frame } => self.queue_forward(route, frame),
|
||||
EndpointOutcome::Local(event) => {
|
||||
self.effects.push(RuntimeEffect::Local(event));
|
||||
Ok(())
|
||||
}
|
||||
EndpointOutcome::Dropped => {
|
||||
self.effects.push(RuntimeEffect::Dropped);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn queue_forward(
|
||||
&mut self,
|
||||
route: RouteDecision,
|
||||
frame: FrameBytes,
|
||||
) -> Result<(), NodeRuntimeError<T::Error>> {
|
||||
let (connection, generation) = match route {
|
||||
RouteDecision::Parent => self
|
||||
.connections
|
||||
.registered_by_direction(ConnectionDirection::Parent)
|
||||
.and_then(|connection| {
|
||||
connection
|
||||
.state()
|
||||
.registered()
|
||||
.map(|registered| (connection.id(), registered.generation()))
|
||||
}),
|
||||
RouteDecision::Child(index) => self
|
||||
.endpoint
|
||||
.endpoint()
|
||||
.child_routes()
|
||||
.get(index)
|
||||
.and_then(|child| {
|
||||
self.connections
|
||||
.registered_by_path(ConnectionDirection::Child, &child.path)
|
||||
})
|
||||
.and_then(|connection| {
|
||||
connection
|
||||
.state()
|
||||
.registered()
|
||||
.map(|registered| (connection.id(), registered.generation()))
|
||||
}),
|
||||
RouteDecision::Local | RouteDecision::Drop => None,
|
||||
}
|
||||
.ok_or(NodeRuntimeError::MissingRouteConnection)?;
|
||||
|
||||
self.effects.push(RuntimeEffect::SendFrame {
|
||||
connection,
|
||||
generation,
|
||||
frame,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush_outbound(&mut self) -> Result<usize, NodeRuntimeError<T::Error>> {
|
||||
let mut retained = EffectQueue::new();
|
||||
let mut sent = 0usize;
|
||||
for effect in self.effects.drain() {
|
||||
match effect {
|
||||
RuntimeEffect::SendFrame {
|
||||
connection,
|
||||
generation,
|
||||
frame,
|
||||
} if self
|
||||
.connections
|
||||
.registered(connection)
|
||||
.is_some_and(|registered| registered.generation() == generation) =>
|
||||
{
|
||||
self.transport
|
||||
.send_frame(connection, frame)
|
||||
.map_err(NodeRuntimeError::Transport)?;
|
||||
sent += 1;
|
||||
}
|
||||
RuntimeEffect::SendFrame { .. } => {}
|
||||
other => retained.push(other),
|
||||
}
|
||||
}
|
||||
self.effects = retained;
|
||||
self.transport
|
||||
.flush()
|
||||
.map_err(NodeRuntimeError::Transport)?;
|
||||
Ok(sent)
|
||||
}
|
||||
}
|
||||
|
||||
fn ingress_for(registered: &RegisteredConnection) -> Ingress {
|
||||
match registered.direction() {
|
||||
ConnectionDirection::Parent => Ingress::Parent,
|
||||
ConnectionDirection::Child => Ingress::Child(registered.peer_path().to_vec()),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::alloc::string::String;
|
||||
use crate::alloc::vec;
|
||||
use crate::alloc::vec::Vec;
|
||||
use crate::connections::{
|
||||
Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, Connections,
|
||||
};
|
||||
use crate::effects::RuntimeEffect;
|
||||
use crate::transport::Transport;
|
||||
use unshell_protocol::tree::{ChildRoute, ProtocolEndpoint};
|
||||
use unshell_protocol::{CallMessage, FrameBytes, PacketHeader, PacketType, encode_packet};
|
||||
|
||||
use super::{EndpointState, NodeRuntime, TickBudget};
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct RecordingTransport {
|
||||
inbound: Option<(ConnectionId, FrameBytes)>,
|
||||
sent: Vec<(ConnectionId, FrameBytes)>,
|
||||
}
|
||||
|
||||
impl Transport for RecordingTransport {
|
||||
type Error = core::convert::Infallible;
|
||||
|
||||
fn poll_recv(&mut self) -> Result<Option<(ConnectionId, FrameBytes)>, Self::Error> {
|
||||
Ok(self.inbound.take())
|
||||
}
|
||||
|
||||
fn send_frame(
|
||||
&mut self,
|
||||
connection: ConnectionId,
|
||||
frame: FrameBytes,
|
||||
) -> Result<(), Self::Error> {
|
||||
self.sent.push((connection, frame));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_derives_ingress_and_sends_forwarded_child_frame() {
|
||||
let parent = ConnectionId::new(1);
|
||||
let child = ConnectionId::new(2);
|
||||
let mut connections = Connections::new();
|
||||
connections.push(Connection::registered(
|
||||
parent,
|
||||
ConnectionDirection::Parent,
|
||||
vec![],
|
||||
ConnectionGeneration::INITIAL,
|
||||
));
|
||||
connections.push(Connection::registered(
|
||||
child,
|
||||
ConnectionDirection::Child,
|
||||
vec![String::from("agent"), String::from("grand")],
|
||||
ConnectionGeneration::INITIAL,
|
||||
));
|
||||
|
||||
let endpoint = ProtocolEndpoint::new(
|
||||
vec![String::from("agent")],
|
||||
Some(vec![]),
|
||||
vec![ChildRoute::registered(vec![
|
||||
String::from("agent"),
|
||||
String::from("grand"),
|
||||
])],
|
||||
vec![],
|
||||
);
|
||||
|
||||
let frame = encode_packet(
|
||||
&PacketHeader {
|
||||
packet_type: PacketType::Call,
|
||||
src_path: vec![],
|
||||
dst_path: vec![String::from("agent"), String::from("grand")],
|
||||
dst_leaf: None,
|
||||
hook_id: None,
|
||||
},
|
||||
&CallMessage {
|
||||
procedure_id: String::from("org.example.v1.echo.invoke"),
|
||||
data: vec![],
|
||||
response_hook: None,
|
||||
},
|
||||
)
|
||||
.expect("frame encodes");
|
||||
|
||||
let transport = RecordingTransport {
|
||||
inbound: Some((parent, frame)),
|
||||
sent: Vec::new(),
|
||||
};
|
||||
let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport);
|
||||
|
||||
let outcome = runtime.tick(TickBudget::default()).expect("tick succeeds");
|
||||
|
||||
assert_eq!(outcome.inbound_frames, 1);
|
||||
assert_eq!(outcome.outbound_frames, 1);
|
||||
assert!(runtime.effects().is_empty());
|
||||
assert_eq!(runtime.transport().sent[0].0, child);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn receive_keeps_local_events_queued_for_leaf_dispatch() {
|
||||
let parent = ConnectionId::new(1);
|
||||
let mut connections = Connections::new();
|
||||
connections.push(Connection::registered(
|
||||
parent,
|
||||
ConnectionDirection::Parent,
|
||||
vec![],
|
||||
ConnectionGeneration::INITIAL,
|
||||
));
|
||||
|
||||
let mut endpoint =
|
||||
ProtocolEndpoint::new(vec![String::from("agent")], Some(vec![]), vec![], vec![]);
|
||||
endpoint
|
||||
.add_endpoint_procedure("org.example.v1.echo.invoke")
|
||||
.expect("procedure registers");
|
||||
let frame = encode_packet(
|
||||
&PacketHeader {
|
||||
packet_type: PacketType::Call,
|
||||
src_path: vec![],
|
||||
dst_path: vec![String::from("agent")],
|
||||
dst_leaf: None,
|
||||
hook_id: None,
|
||||
},
|
||||
&CallMessage {
|
||||
procedure_id: String::from("org.example.v1.echo.invoke"),
|
||||
data: vec![],
|
||||
response_hook: None,
|
||||
},
|
||||
)
|
||||
.expect("frame encodes");
|
||||
|
||||
let mut runtime = NodeRuntime::new(
|
||||
EndpointState::new(endpoint),
|
||||
connections,
|
||||
RecordingTransport::default(),
|
||||
);
|
||||
|
||||
runtime
|
||||
.receive_frame(parent, frame)
|
||||
.expect("frame processes");
|
||||
assert!(matches!(runtime.effects()[0], RuntimeEffect::Local(_)));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
//! Node lifecycle state.
|
||||
|
||||
/// Lifecycle state for a runtime node.
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
|
||||
pub enum NodeState {
|
||||
/// The node has been constructed but has not started transport activity.
|
||||
#[default]
|
||||
Created,
|
||||
/// The node is accepting local work and transport events.
|
||||
Running,
|
||||
/// The node is draining work before shutdown.
|
||||
Stopping,
|
||||
/// The node has stopped and should not accept new work.
|
||||
Stopped,
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
//! Nonblocking transport contract for the single-threaded runtime.
|
||||
//!
|
||||
//! Transports move already-framed protocol packets. They do not know tree paths,
|
||||
//! leaf names, hook state, admission policy, or route decisions.
|
||||
|
||||
use crate::connections::ConnectionId;
|
||||
use unshell_protocol::FrameBytes;
|
||||
|
||||
/// Nonblocking frame transport used by [`crate::node::NodeRuntime`].
|
||||
pub trait Transport {
|
||||
/// Transport-specific error.
|
||||
type Error;
|
||||
|
||||
/// Polls for one inbound frame.
|
||||
///
|
||||
/// `Ok(None)` means no frame is currently ready. Implementations must not
|
||||
/// block inside this method; callers drive progress by calling `tick` again.
|
||||
fn poll_recv(&mut self) -> Result<Option<(ConnectionId, FrameBytes)>, Self::Error>;
|
||||
|
||||
/// Sends one framed packet on a registered connection.
|
||||
fn send_frame(
|
||||
&mut self,
|
||||
connection: ConnectionId,
|
||||
frame: FrameBytes,
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
/// Flushes buffered outbound transport data, if the transport has any.
|
||||
fn flush(&mut self) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user