From a61c0ce72de6e5424bd2bec60bc2463f1366d2e2 Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Sat, 9 May 2026 12:45:14 -0600 Subject: [PATCH] Add runtime API redesign scaffold --- API.md | 305 +++++++++++++++++++ Cargo.lock | 8 + Cargo.toml | 3 + src/lib.rs | 3 + unshell-runtime/Cargo.toml | 21 ++ unshell-runtime/src/connections.rs | 291 ++++++++++++++++++ unshell-runtime/src/context.rs | 205 +++++++++++++ unshell-runtime/src/effects.rs | 56 ++++ unshell-runtime/src/leaf.rs | 110 +++++++ unshell-runtime/src/lib.rs | 122 ++++++++ unshell-runtime/src/node/mod.rs | 73 +++++ unshell-runtime/src/node/packet.rs | 86 ++++++ unshell-runtime/src/node/runtime.rs | 439 ++++++++++++++++++++++++++++ unshell-runtime/src/node/state.rs | 15 + unshell-runtime/src/transport.rs | 31 ++ ush-obfuscate/src/lib.rs | 1 - 16 files changed, 1768 insertions(+), 1 deletion(-) create mode 100644 API.md create mode 100644 unshell-runtime/Cargo.toml create mode 100644 unshell-runtime/src/connections.rs create mode 100644 unshell-runtime/src/context.rs create mode 100644 unshell-runtime/src/effects.rs create mode 100644 unshell-runtime/src/leaf.rs create mode 100644 unshell-runtime/src/lib.rs create mode 100644 unshell-runtime/src/node/mod.rs create mode 100644 unshell-runtime/src/node/packet.rs create mode 100644 unshell-runtime/src/node/runtime.rs create mode 100644 unshell-runtime/src/node/state.rs create mode 100644 unshell-runtime/src/transport.rs diff --git a/API.md b/API.md new file mode 100644 index 0000000..9fab9d4 --- /dev/null +++ b/API.md @@ -0,0 +1,305 @@ +# UnShell Runtime API Proposal + +This document records the proposed public API direction for the runtime redesign. +The goal is to split packet processing from node orchestration while keeping the +implant-facing runtime single-threaded, explicit, and hard to misuse. + +## Goals + +- Keep `unshell-protocol` focused on packet types, framing, encoding, decoding, + and static validation. +- Move endpoint state, routing state, hook state, connection admission, transport + ownership, leaf dispatch, and scheduling into `unshell-runtime`. +- Run without internal threads. Progress happens only when the caller drives the + runtime with `tick` or explicit local actions. +- Let every leaf request calls, hook data, faults, and connection changes without + giving leaves direct access to routes, hooks, endpoint internals, or transports. +- Preserve protocol authority rules by deriving ingress from registered connection + metadata, never from caller-provided values. +- Keep hot packet paths allocation-aware and move toward borrowed packet/event + views where the current protocol API permits it. + +## Crate Boundary + +```text +unshell-protocol + PacketHeader, CallMessage, DataMessage, FaultMessage + encode_packet, decode_frame + validate_header, validate_call, validate_procedure_id + introspection payload schemas + +unshell-runtime + EndpointState + NodeRuntime + Connections + Transport + Leaf, LeafContext, LeafAction + runtime effects and scheduling + +unshell + facade re-exports: protocol, runtime, leaves, macros +``` + +`EndpointState` is transitional. Today it wraps the existing +`ProtocolEndpoint`. Long term, the endpoint state machine should live in +`unshell-runtime`, while `unshell-protocol` becomes packet-only. + +## Transport API + +Transports move already-framed protocol packets. They do not know paths, leaves, +hooks, routing, or admission policy. + +```rust +pub trait Transport { + type Error; + + fn poll_recv(&mut self) -> Result, Self::Error>; + + fn send_frame( + &mut self, + connection: ConnectionId, + frame: FrameBytes, + ) -> Result<(), Self::Error>; + + fn flush(&mut self) -> Result<(), Self::Error> { + Ok(()) + } +} +``` + +Rules: + +- `poll_recv` must not block. +- `ConnectionId` is a runtime handle, not a protocol path. +- The runtime maps `ConnectionId` to protocol ingress. + +## Connection API + +Connections are not routable until registered. + +```rust +pub struct ConnectionId(u64); +pub struct ConnectionGeneration(u64); + +pub enum ConnectionDirection { + Parent, + Child, +} + +pub struct RegisteredConnection { + direction: ConnectionDirection, + peer_path: Vec, + generation: ConnectionGeneration, +} + +pub enum ConnectionState { + Connected { generation: ConnectionGeneration }, + Authenticating { generation: ConnectionGeneration }, + Registered(RegisteredConnection), + Draining { generation: ConnectionGeneration }, + Closed { generation: ConnectionGeneration }, +} +``` + +Rules: + +- Only `Registered` connections can produce protocol ingress or receive routed + frames. +- Parent registration must be exactly the direct parent path. +- Child registration must be exactly one segment below the local path. +- Registering or unregistering a connection must update connection state, + endpoint routes, hook cleanup, and queued generation checks atomically. +- Queued outbound frames carry `ConnectionGeneration`; stale sends are dropped + when a connection slot is reused. + +## Runtime API + +`NodeRuntime` owns endpoint packet state, connections, transport, and queued +effects. + +```rust +pub struct NodeRuntime { + endpoint: EndpointState, + connections: Connections, + transport: T, + effects: EffectQueue, +} + +pub struct TickBudget { + pub max_inbound_frames: usize, + pub flush_outbound: bool, +} + +pub struct TickOutcome { + pub inbound_frames: usize, + pub outbound_frames: usize, + pub dropped_frames: usize, + pub local_events: usize, +} +``` + +Primary operations: + +```rust +impl NodeRuntime { + pub fn tick(&mut self, budget: TickBudget) -> Result>; + + pub fn receive_frame( + &mut self, + connection: ConnectionId, + frame: FrameBytes, + ) -> Result<(), NodeRuntimeError>; +} +``` + +Runtime flow: + +```text +transport poll -> (ConnectionId, FrameBytes) + -> look up registered connection + -> derive Ingress from registered direction/path + -> EndpointState::process_frame + -> RuntimeEffect::SendFrame | RuntimeEffect::Local | RuntimeEffect::Dropped + -> flush SendFrame effects through Transport +``` + +Rules: + +- Callers never pass `Ingress` into `NodeRuntime`. +- Runtime counts per-tick progress, not retained backlog. +- Local events should be dispatched to leaves, not retained forever. +- Send failures must not drop unrelated queued effects. + +## Leaf API + +Leaves are request-only. They can ask the runtime to do work, but cannot mutate +endpoint state, hooks, route tables, connection maps, or transports. + +```rust +pub trait Leaf { + type Error; + + fn capabilities(&self) -> &LeafCapabilities; + + fn on_call(&mut self, ctx: &mut LeafContext<'_>, call: IncomingCall) + -> Result<(), Self::Error>; + + fn on_data(&mut self, ctx: &mut LeafContext<'_>, data: IncomingData) + -> Result<(), Self::Error>; + + fn on_fault(&mut self, ctx: &mut LeafContext<'_>, fault: IncomingFault) + -> Result<(), Self::Error>; + + fn poll(&mut self, ctx: &mut LeafContext<'_>) -> Result<(), Self::Error>; +} +``` + +Leaf permissions: + +```rust +pub struct LeafPermissions { + pub send_calls: bool, + pub send_hook_data: bool, + pub manage_connections: bool, +} +``` + +Leaf actions: + +```rust +pub enum LeafAction { + SendCall(OutboundCall), + SendHookData(OutboundHookData), + FailHook { hook_id: u64, fault: ProtocolFault }, + Connection(ConnectionAction), +} + +pub enum ConnectionAction { + Register { + connection: ConnectionId, + direction: ConnectionDirection, + peer_path: Vec, + }, + Unregister { connection: ConnectionId }, +} +``` + +Rules: + +- A leaf may queue only actions allowed by its `LeafPermissions`. +- Runtime policy still validates every action. Permission is not authority. +- Connection actions request runtime changes. They do not mutate state directly. +- Leaf callbacks must be bounded and nonblocking. +- No nested leaf dispatch. Leaf actions are applied after the callback returns. + +## Required Runtime Semantics + +### Inbound Forwarding + +```text +parent frame for /agent/grand + -> NodeRuntime derives Ingress::Parent + -> EndpointState routes to child /agent/grand + -> RuntimeEffect::SendFrame { connection: grandchild, generation, frame } + -> Transport::send_frame(grandchild, frame) +``` + +### Local Call Delivery + +```text +parent frame for local endpoint + -> NodeRuntime derives ingress + -> EndpointState validates and returns Local(Call) + -> NodeRuntime dispatches to matching Leaf::on_call + -> leaf queues LeafAction values + -> runtime validates and applies actions +``` + +### Outbound Leaf Call + +```text +leaf queues LeafAction::SendCall + -> runtime validates permission and target + -> EndpointState builds/routes call + -> pending hook is reserved if needed + -> RuntimeEffect::SendFrame or RuntimeEffect::Local +``` + +### Disconnect + +```text +connection closes or unregisters + -> mark connection Draining/Closed and advance generation + -> remove matching route entries + -> remove pending hooks associated with peer/subtree + -> remove active hooks associated with peer/subtree + -> notify or close leaf sessions + -> drop queued SendFrame effects with stale generation +``` + +## Known Gaps In The Current Branch + +- `Leaf` is defined but not yet registered or dispatched by `NodeRuntime`. +- `LeafAction` values are queued by `LeafContext` but not yet applied by + `NodeRuntime`. +- Local outbound calls through the runtime are not implemented. +- Connection registration does not yet atomically update endpoint routes. +- Disconnect does not yet clean hooks, sessions, route state, and queued effects. +- `RouteDecision::Child(index)` still depends on index compatibility with the + existing `ProtocolEndpoint` route table. +- Child ingress still allocates because the existing `Ingress::Child` owns a + `Vec`. + +## Next Implementation Slice + +Implement one narrow end-to-end path: + +1. Add a leaf registry to `NodeRuntime`. +2. Dispatch `RuntimeEffect::Local(Call)` into `Leaf::on_call`. +3. Apply `LeafAction::SendHookData` through endpoint packet state. +4. Route the produced frame through `Transport`. +5. Add tests proving a local call reaches a leaf and the leaf reply is framed and + sent through a registered connection. + +That slice forces the real architecture to work without overbuilding the rest of +the migration. diff --git a/Cargo.lock b/Cargo.lock index 86042f1..8ca27a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1447,6 +1447,7 @@ dependencies = [ "unshell-leaves", "unshell-macros", "unshell-protocol", + "unshell-runtime", ] [[package]] @@ -1477,6 +1478,13 @@ dependencies = [ "unshell-macros", ] +[[package]] +name = "unshell-runtime" +version = "0.1.0" +dependencies = [ + "unshell-protocol", +] + [[package]] name = "ush-obfuscate" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 7313486..aa75033 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "base62", "unshell-macros", "unshell-protocol", + "unshell-runtime", "unshell-leaves", "treetest", ] @@ -31,6 +32,7 @@ portable-pty = "0.9.0" crossbeam-channel = "0.5.15" unshell = { path = "." } unshell-protocol = { path = "./unshell-protocol" } +unshell-runtime = { path = "./unshell-runtime" } unshell-leaves = { path = "./unshell-leaves" } unshell-macros = { path = "./unshell-macros" } @@ -63,6 +65,7 @@ chrono = { workspace = true, optional = true } static_init = { workspace = true } unshell-macros = { workspace = true } unshell-protocol = { workspace = true } +unshell-runtime = { workspace = true } unshell-leaves = { workspace = true } [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index f964ab6..8554e0e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,9 @@ pub use unshell_protocol as protocol; /// Re-export the leaf library crate behind the historical `unshell::leaves` path pub use unshell_leaves as leaves; +/// Re-export the runtime crate behind the `unshell::runtime` path. +pub use unshell_runtime as runtime; + pub use unshell_macros::{Procedure, leaf, procedures}; /// Creates a root-assumed endpoint from one local identifier plus any number of leaf hosts. diff --git a/unshell-runtime/Cargo.toml b/unshell-runtime/Cargo.toml new file mode 100644 index 0000000..2c98898 --- /dev/null +++ b/unshell-runtime/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "unshell-runtime" +version.workspace = true +edition.workspace = true +description = "Transport-neutral runtime API types for UnShell" + +[dependencies] +unshell-protocol = { workspace = true } + +[lints.rust] +elided_lifetimes_in_paths = "warn" +future_incompatible = { level = "warn", priority = -1 } +nonstandard_style = { level = "warn", priority = -1 } +rust_2018_idioms = { level = "warn", priority = -1 } +rust_2021_prelude_collisions = "warn" +semicolon_in_expressions_from_macros = "warn" +unsafe_op_in_unsafe_fn = "warn" +unused_import_braces = "warn" +unused_lifetimes = "warn" +trivial_casts = "allow" +missing_docs = "warn" diff --git a/unshell-runtime/src/connections.rs b/unshell-runtime/src/connections.rs new file mode 100644 index 0000000..b4b6fe2 --- /dev/null +++ b/unshell-runtime/src/connections.rs @@ -0,0 +1,291 @@ +//! Runtime connection admission and routing metadata. +//! +//! A connection is not routable just because a transport exists. Only +//! [`ConnectionState::Registered`] connections are allowed to produce protocol +//! ingress or receive forwarded frames. + +use crate::alloc::string::String; +use crate::alloc::vec::Vec; + +/// Stable runtime handle for one transport connection slot. +#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct ConnectionId(u64); + +impl ConnectionId { + /// Creates a connection identifier from a raw value. + #[must_use] + pub const fn new(value: u64) -> Self { + Self(value) + } + + /// Returns the raw identifier value. + #[must_use] + pub const fn get(self) -> u64 { + self.0 + } +} + +/// Monotonic incarnation number for one connection slot. +#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct ConnectionGeneration(u64); + +impl ConnectionGeneration { + /// First generation assigned to a new connection slot. + pub const INITIAL: Self = Self(0); + + /// Creates a generation from a raw value. + #[must_use] + pub const fn new(value: u64) -> Self { + Self(value) + } + + /// Returns the raw generation value. + #[must_use] + pub const fn get(self) -> u64 { + self.0 + } + + /// Returns the next generation, saturating at `u64::MAX`. + #[must_use] + pub const fn next(self) -> Self { + Self(self.0.saturating_add(1)) + } +} + +/// Local tree relationship for a registered connection. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum ConnectionDirection { + /// The peer is the direct parent of this endpoint. + Parent, + /// The peer is a direct child of this endpoint. + Child, +} + +/// Metadata that makes a connection routable. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct RegisteredConnection { + direction: ConnectionDirection, + peer_path: Vec, + generation: ConnectionGeneration, +} + +impl RegisteredConnection { + /// Creates registered routing metadata. + #[must_use] + pub const fn new( + direction: ConnectionDirection, + peer_path: Vec, + generation: ConnectionGeneration, + ) -> Self { + Self { + direction, + peer_path, + generation, + } + } + + /// Returns the local tree relationship. + #[must_use] + pub const fn direction(&self) -> ConnectionDirection { + self.direction + } + + /// Returns the registered peer path. + #[must_use] + pub fn peer_path(&self) -> &[String] { + &self.peer_path + } + + /// Returns the connection generation. + #[must_use] + pub const fn generation(&self) -> ConnectionGeneration { + self.generation + } +} + +/// Runtime lifecycle state for one connection slot. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum ConnectionState { + /// The transport exists but has not started or completed admission. + Connected { + /// Connection generation for this transport incarnation. + generation: ConnectionGeneration, + }, + /// The runtime is evaluating whether this peer should become routable. + Authenticating { + /// Connection generation for this transport incarnation. + generation: ConnectionGeneration, + }, + /// The peer is admitted into protocol routing. + Registered(RegisteredConnection), + /// The runtime is tearing this connection down and should reject new work. + Draining { + /// Connection generation for this transport incarnation. + generation: ConnectionGeneration, + }, + /// The connection is closed and retained only as historical metadata. + Closed { + /// Connection generation for this transport incarnation. + generation: ConnectionGeneration, + }, +} + +impl ConnectionState { + /// Returns the generation associated with this state. + #[must_use] + pub const fn generation(&self) -> ConnectionGeneration { + match self { + Self::Connected { generation } + | Self::Authenticating { generation } + | Self::Draining { generation } + | Self::Closed { generation } => *generation, + Self::Registered(registered) => registered.generation(), + } + } + + /// Returns registered metadata when this connection is routable. + #[must_use] + pub const fn registered(&self) -> Option<&RegisteredConnection> { + match self { + Self::Registered(registered) => Some(registered), + Self::Connected { .. } + | Self::Authenticating { .. } + | Self::Draining { .. } + | Self::Closed { .. } => None, + } + } +} + +/// One runtime connection slot. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Connection { + id: ConnectionId, + state: ConnectionState, +} + +impl Connection { + /// Creates a connected but unroutable connection slot. + #[must_use] + pub const fn connected(id: ConnectionId, generation: ConnectionGeneration) -> Self { + Self { + id, + state: ConnectionState::Connected { generation }, + } + } + + /// Creates a registered connection slot. + #[must_use] + pub const fn registered( + id: ConnectionId, + direction: ConnectionDirection, + peer_path: Vec, + generation: ConnectionGeneration, + ) -> Self { + Self { + id, + state: ConnectionState::Registered(RegisteredConnection::new( + direction, peer_path, generation, + )), + } + } + + /// Returns the connection id. + #[must_use] + pub const fn id(&self) -> ConnectionId { + self.id + } + + /// Returns the current connection state. + #[must_use] + pub const fn state(&self) -> &ConnectionState { + &self.state + } + + /// Replaces the current connection state. + pub fn set_state(&mut self, state: ConnectionState) { + self.state = state; + } +} + +/// Connection metadata table owned by the runtime. +#[derive(Clone, Debug, Default, Eq, PartialEq)] +pub struct Connections { + entries: Vec, +} + +impl Connections { + /// Creates an empty table. + #[must_use] + pub const fn new() -> Self { + Self { + entries: Vec::new(), + } + } + + /// Inserts a connection descriptor. + pub fn push(&mut self, connection: Connection) { + self.entries.push(connection); + } + + /// Returns all connection descriptors. + #[must_use] + pub fn entries(&self) -> &[Connection] { + &self.entries + } + + /// Finds a connection by id. + #[must_use] + pub fn get(&self, id: ConnectionId) -> Option<&Connection> { + self.entries.iter().find(|entry| entry.id() == id) + } + + /// Finds a mutable connection by id. + #[must_use] + pub fn get_mut(&mut self, id: ConnectionId) -> Option<&mut Connection> { + self.entries.iter_mut().find(|entry| entry.id() == id) + } + + /// Returns registered metadata for a routable connection. + #[must_use] + pub fn registered(&self, id: ConnectionId) -> Option<&RegisteredConnection> { + self.get(id) + .and_then(|connection| connection.state().registered()) + } + + /// Finds a registered connection by direction. + #[must_use] + pub fn registered_by_direction(&self, direction: ConnectionDirection) -> Option<&Connection> { + self.entries.iter().find(|entry| { + entry + .state() + .registered() + .is_some_and(|registered| registered.direction() == direction) + }) + } + + /// Finds a registered connection by direction and peer path. + #[must_use] + pub fn registered_by_path( + &self, + direction: ConnectionDirection, + peer_path: &[String], + ) -> Option<&Connection> { + self.entries.iter().find(|entry| { + entry.state().registered().is_some_and(|registered| { + registered.direction() == direction && registered.peer_path() == peer_path + }) + }) + } +} + +/// Read-only connection table view exposed to leaf contexts. +pub trait ConnectionTable { + /// Returns registered metadata for a routable connection. + fn registered(&self, id: ConnectionId) -> Option<&RegisteredConnection>; +} + +impl ConnectionTable for Connections { + fn registered(&self, id: ConnectionId) -> Option<&RegisteredConnection> { + Self::registered(self, id) + } +} diff --git a/unshell-runtime/src/context.rs b/unshell-runtime/src/context.rs new file mode 100644 index 0000000..c51b168 --- /dev/null +++ b/unshell-runtime/src/context.rs @@ -0,0 +1,205 @@ +//! Request-only context exposed to leaf callbacks. +//! +//! Leaf code never receives direct access to route tables, hook state, endpoint +//! internals, or transport handles. It can only enqueue [`LeafAction`] values. +//! The runtime validates and applies those actions later. + +use crate::alloc::string::String; +use crate::alloc::vec::Vec; +use crate::connections::{ConnectionDirection, ConnectionId, Connections}; +use crate::leaf::{LeafCapabilities, LeafId}; +use unshell_protocol::ProtocolFault; + +/// Context handed to one leaf callback. +#[derive(Debug)] +pub struct LeafContext<'a> { + local_path: &'a [String], + leaf_id: &'a LeafId, + capabilities: &'a LeafCapabilities, + connections: &'a Connections, + actions: Vec, +} + +impl<'a> LeafContext<'a> { + /// Creates a context for one leaf callback. + #[must_use] + pub const fn new( + local_path: &'a [String], + leaf_id: &'a LeafId, + capabilities: &'a LeafCapabilities, + connections: &'a Connections, + ) -> Self { + Self { + local_path, + leaf_id, + capabilities, + connections, + actions: Vec::new(), + } + } + + /// Returns this endpoint's absolute path. + #[must_use] + pub const fn local_path(&self) -> &[String] { + self.local_path + } + + /// Returns the leaf currently using this context. + #[must_use] + pub const fn leaf_id(&self) -> &LeafId { + self.leaf_id + } + + /// Returns the permissions granted to this leaf. + #[must_use] + pub const fn capabilities(&self) -> &LeafCapabilities { + self.capabilities + } + + /// Returns read-only connection metadata. + #[must_use] + pub const fn connections(&self) -> &Connections { + self.connections + } + + /// Returns queued leaf actions. + #[must_use] + pub fn actions(&self) -> &[LeafAction] { + &self.actions + } + + /// Consumes the context and returns queued actions. + #[must_use] + pub fn into_actions(self) -> Vec { + self.actions + } + + /// Requests an outbound call. + pub fn call(&mut self, call: OutboundCall) -> Result<(), RequestDenied> { + if !self.capabilities.permissions.send_calls { + return Err(RequestDenied::MissingCapability( + RuntimeCapability::SendCalls, + )); + } + self.actions.push(LeafAction::SendCall(call)); + Ok(()) + } + + /// Requests data on an existing hook. + pub fn hook_data(&mut self, data: OutboundHookData) -> Result<(), RequestDenied> { + if !self.capabilities.permissions.send_hook_data { + return Err(RequestDenied::MissingCapability( + RuntimeCapability::SendHookData, + )); + } + self.actions.push(LeafAction::SendHookData(data)); + Ok(()) + } + + /// Requests hook termination with a protocol fault. + pub fn fail_hook(&mut self, hook_id: u64, fault: ProtocolFault) -> Result<(), RequestDenied> { + if !self.capabilities.permissions.send_hook_data { + return Err(RequestDenied::MissingCapability( + RuntimeCapability::SendHookData, + )); + } + self.actions.push(LeafAction::FailHook { hook_id, fault }); + Ok(()) + } + + /// Requests a connection admission or teardown action. + pub fn connection(&mut self, request: ConnectionAction) -> Result<(), RequestDenied> { + if !self.capabilities.permissions.manage_connections { + return Err(RequestDenied::MissingCapability( + RuntimeCapability::ManageConnections, + )); + } + self.actions.push(LeafAction::Connection(request)); + Ok(()) + } +} + +/// Runtime action requested by leaf code. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum LeafAction { + /// Build and send one outbound call. + SendCall(OutboundCall), + /// Build and send one hook data packet. + SendHookData(OutboundHookData), + /// Terminate a hook with a protocol fault. + FailHook { + /// Hook identifier scoped by the hook host. + hook_id: u64, + /// Stable protocol fault code. + fault: ProtocolFault, + }, + /// Request a connection state change. + Connection(ConnectionAction), +} + +/// Outbound call request before packet construction. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct OutboundCall { + /// Destination endpoint path. + pub dst_path: Vec, + /// Optional destination leaf name. + pub dst_leaf: Option, + /// Canonical procedure id. + pub procedure_id: String, + /// Opaque request payload. + pub payload: Vec, + /// Whether the runtime should allocate a response hook. + pub expects_response: bool, +} + +/// Hook data request before packet construction. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct OutboundHookData { + /// Destination endpoint path for the hook packet. + pub dst_path: Vec, + /// Hook identifier scoped by the receiving endpoint. + pub hook_id: u64, + /// Canonical procedure id associated with the hook stream. + pub procedure_id: String, + /// Opaque payload bytes. + pub payload: Vec, + /// Whether this packet closes the local side of the hook. + pub end_hook: bool, +} + +/// Requested connection state change. +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum ConnectionAction { + /// Register an existing connection as a direct parent or child. + Register { + /// Runtime transport connection id. + connection: ConnectionId, + /// Requested tree direction. + direction: ConnectionDirection, + /// Peer path to register. + peer_path: Vec, + }, + /// Remove a connection from runtime routing. + Unregister { + /// Runtime transport connection id. + connection: ConnectionId, + }, +} + +/// Capability checked by [`LeafContext`] helpers. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum RuntimeCapability { + /// Permission to request outbound calls. + SendCalls, + /// Permission to request hook data or hook faults. + SendHookData, + /// Permission to request connection state changes. + ManageConnections, +} + +/// Rejection reason for a leaf action request. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum RequestDenied { + /// The leaf does not have the required capability. + MissingCapability(RuntimeCapability), +} diff --git a/unshell-runtime/src/effects.rs b/unshell-runtime/src/effects.rs new file mode 100644 index 0000000..cd39455 --- /dev/null +++ b/unshell-runtime/src/effects.rs @@ -0,0 +1,56 @@ +//! Runtime effects produced by packet processing. + +use crate::alloc::vec::Vec; +use crate::connections::{ConnectionGeneration, ConnectionId}; +use unshell_protocol::FrameBytes; +use unshell_protocol::tree::LocalEvent; + +/// Side effect selected by endpoint packet processing. +#[derive(Clone, Debug)] +pub enum RuntimeEffect { + /// Send a frame to a registered connection. + SendFrame { + /// Destination connection id. + connection: ConnectionId, + /// Generation observed when the effect was queued. + generation: ConnectionGeneration, + /// Encoded protocol frame. + frame: FrameBytes, + }, + /// Deliver a local protocol event to the future leaf/session dispatcher. + Local(LocalEvent), + /// The frame was intentionally dropped by protocol state. + Dropped, +} + +/// FIFO queue of runtime effects. +#[derive(Clone, Debug, Default)] +pub struct EffectQueue { + entries: Vec, +} + +impl EffectQueue { + /// Creates an empty effect queue. + #[must_use] + pub const fn new() -> Self { + Self { + entries: Vec::new(), + } + } + + /// Queues an effect. + pub fn push(&mut self, effect: RuntimeEffect) { + self.entries.push(effect); + } + + /// Returns queued effects. + #[must_use] + pub fn entries(&self) -> &[RuntimeEffect] { + &self.entries + } + + /// Drains queued effects in FIFO order. + pub fn drain(&mut self) -> impl Iterator + '_ { + self.entries.drain(..) + } +} diff --git a/unshell-runtime/src/leaf.rs b/unshell-runtime/src/leaf.rs new file mode 100644 index 0000000..c637b73 --- /dev/null +++ b/unshell-runtime/src/leaf.rs @@ -0,0 +1,110 @@ +//! Leaf-facing runtime types. + +use crate::alloc::string::String; +use crate::alloc::vec::Vec; +use crate::context::LeafContext; +use unshell_protocol::tree::{IncomingCall, IncomingData, IncomingFault}; + +/// Stable identifier for a locally hosted leaf binding. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct LeafId(String); + +impl LeafId { + /// Creates a leaf id from an owned string. + #[must_use] + pub const fn new(value: String) -> Self { + Self(value) + } + + /// Returns the leaf id as a string slice. + #[must_use] + pub fn as_str(&self) -> &str { + &self.0 + } +} + +/// Runtime permissions granted to one leaf binding. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct LeafPermissions { + /// The leaf may request new outbound calls. + pub send_calls: bool, + /// The leaf may request data or faults on hook streams. + pub send_hook_data: bool, + /// The leaf may request connection registration or removal. + pub manage_connections: bool, +} + +impl LeafPermissions { + /// Grants no runtime-side effects. + pub const NONE: Self = Self { + send_calls: false, + send_hook_data: false, + manage_connections: false, + }; + + /// Grants the common permission set for a passive responder leaf. + pub const REPLY_ONLY: Self = Self { + send_calls: false, + send_hook_data: true, + manage_connections: false, + }; + + /// Grants all current permissions. Use sparingly. + pub const ALL: Self = Self { + send_calls: true, + send_hook_data: true, + manage_connections: true, + }; +} + +/// Protocol surface and runtime permissions for one leaf. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct LeafCapabilities { + /// Canonical dotted leaf name. + pub leaf_name: String, + /// Canonical procedure ids supported by the leaf. + pub procedures: Vec, + /// Runtime permissions granted to this leaf binding. + pub permissions: LeafPermissions, +} + +/// One hosted leaf implementation. +pub trait Leaf { + /// Leaf-specific error type. + type Error; + + /// Returns static protocol and runtime capabilities. + fn capabilities(&self) -> &LeafCapabilities; + + /// Handles one opening call routed to this leaf. + fn on_call( + &mut self, + _ctx: &mut LeafContext<'_>, + _call: IncomingCall, + ) -> Result<(), Self::Error> { + Ok(()) + } + + /// Handles hook data routed to this leaf or its session adapter. + fn on_data( + &mut self, + _ctx: &mut LeafContext<'_>, + _data: IncomingData, + ) -> Result<(), Self::Error> { + Ok(()) + } + + /// Handles hook fault routed to this leaf or its session adapter. + fn on_fault( + &mut self, + _ctx: &mut LeafContext<'_>, + _fault: IncomingFault, + ) -> Result<(), Self::Error> { + Ok(()) + } + + /// Gives the leaf one bounded opportunity to request local work. + fn poll(&mut self, _ctx: &mut LeafContext<'_>) -> Result<(), Self::Error> { + Ok(()) + } +} diff --git a/unshell-runtime/src/lib.rs b/unshell-runtime/src/lib.rs new file mode 100644 index 0000000..94e6893 --- /dev/null +++ b/unshell-runtime/src/lib.rs @@ -0,0 +1,122 @@ +//! # UnShell Runtime +//! +//! Single-threaded runtime scaffolding for hosting UnShell protocol nodes. This +//! crate currently bridges the existing protocol endpoint state while defining +//! the concrete transport, connection, and leaf-action APIs the redesign will use. + +#![no_std] + +pub extern crate alloc; + +pub mod connections; +pub mod context; +pub mod effects; +pub mod leaf; +pub mod node; +pub mod transport; + +pub use connections::{ + Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, ConnectionState, + ConnectionTable, Connections, RegisteredConnection, +}; +pub use context::{ + ConnectionAction, LeafAction, LeafContext, OutboundCall, OutboundHookData, RequestDenied, + RuntimeCapability, +}; +pub use effects::{EffectQueue, RuntimeEffect}; +pub use leaf::{Leaf, LeafCapabilities, LeafId, LeafPermissions}; +pub use node::{ + EndpointState, Node, NodeId, NodeRuntime, NodeRuntimeError, NodeState, TickBudget, TickOutcome, +}; +pub use transport::Transport; + +#[cfg(test)] +mod tests { + use crate::alloc::string::String; + use crate::alloc::vec; + use crate::alloc::vec::Vec; + + use super::{ + Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, ConnectionState, + Connections, LeafAction, LeafCapabilities, LeafContext, LeafId, LeafPermissions, + OutboundCall, OutboundHookData, RequestDenied, RuntimeCapability, + }; + + #[test] + fn connection_generation_advances_without_wrapping() { + assert_eq!(ConnectionGeneration::INITIAL.get(), 0); + assert_eq!(ConnectionGeneration::new(41).next().get(), 42); + assert_eq!(ConnectionGeneration::new(u64::MAX).next().get(), u64::MAX); + } + + #[test] + fn connection_table_reports_registered_connection_metadata() { + let id = ConnectionId::new(7); + let mut connections = Connections::new(); + connections.push(Connection::registered( + id, + ConnectionDirection::Child, + vec![String::from("root"), String::from("child")], + ConnectionGeneration::new(3), + )); + + let registered = connections + .registered(id) + .expect("connection is registered"); + assert_eq!(registered.direction(), ConnectionDirection::Child); + assert_eq!(registered.generation().get(), 3); + assert_eq!(registered.peer_path(), ["root", "child"]); + } + + #[test] + fn connected_connections_are_not_routable() { + let id = ConnectionId::new(9); + let mut connections = Connections::new(); + connections.push(Connection::connected(id, ConnectionGeneration::INITIAL)); + + assert!(connections.registered(id).is_none()); + assert!(matches!( + connections.get(id).unwrap().state(), + ConnectionState::Connected { .. } + )); + } + + #[test] + fn leaf_context_queues_only_capability_checked_actions() { + let id = LeafId::new(String::from("org.example.v1.echo")); + let capabilities = LeafCapabilities { + leaf_name: String::from("org.example.v1.echo"), + procedures: vec![String::from("org.example.v1.echo.invoke")], + permissions: LeafPermissions::REPLY_ONLY, + }; + let connections = Connections::new(); + let local_path = vec![String::from("root")]; + let mut ctx = LeafContext::new(&local_path, &id, &capabilities, &connections); + + ctx.hook_data(OutboundHookData { + dst_path: vec![String::from("root")], + hook_id: 7, + procedure_id: String::from("org.example.v1.echo.invoke"), + payload: vec![1, 2, 3], + end_hook: true, + }) + .expect("reply-only leaf can send hook data"); + + let denied = ctx.call(OutboundCall { + dst_path: vec![String::from("root"), String::from("child")], + dst_leaf: None, + procedure_id: String::from("org.example.v1.echo.invoke"), + payload: Vec::new(), + expects_response: false, + }); + + assert_eq!(ctx.local_path(), ["root"]); + assert!(matches!(ctx.actions()[0], LeafAction::SendHookData(_))); + assert_eq!( + denied, + Err(RequestDenied::MissingCapability( + RuntimeCapability::SendCalls + )) + ); + } +} diff --git a/unshell-runtime/src/node/mod.rs b/unshell-runtime/src/node/mod.rs new file mode 100644 index 0000000..4bfd5bf --- /dev/null +++ b/unshell-runtime/src/node/mod.rs @@ -0,0 +1,73 @@ +//! Node-level runtime identity types. +//! +//! A node is the local runtime owner for protocol state, leaf bindings, and +//! transport connections. This module only models identity and lifecycle state. + +pub mod packet; +pub mod runtime; +pub mod state; + +pub use packet::{EndpointState, PacketProcessor}; +pub use runtime::{NodeRuntime, NodeRuntimeError, TickBudget, TickOutcome}; +pub use state::NodeState; + +use crate::alloc::string::String; + +/// Stable identifier for a runtime node. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct NodeId(String); + +impl NodeId { + /// Creates a node identifier from an owned string. + #[must_use] + pub const fn new(value: String) -> Self { + Self(value) + } + + /// Returns the identifier as a string slice. + #[must_use] + pub fn as_str(&self) -> &str { + &self.0 + } + + /// Consumes the identifier and returns the owned string. + #[must_use] + pub fn into_string(self) -> String { + self.0 + } +} + +/// Minimal runtime node descriptor. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Node { + id: NodeId, + state: NodeState, +} + +impl Node { + /// Creates a new node descriptor in the default [`NodeState::Created`] state. + #[must_use] + pub const fn new(id: NodeId) -> Self { + Self { + id, + state: NodeState::Created, + } + } + + /// Returns the node identifier. + #[must_use] + pub const fn id(&self) -> &NodeId { + &self.id + } + + /// Returns the current node lifecycle state. + #[must_use] + pub const fn state(&self) -> NodeState { + self.state + } + + /// Updates the current node lifecycle state. + pub const fn set_state(&mut self, state: NodeState) { + self.state = state; + } +} diff --git a/unshell-runtime/src/node/packet.rs b/unshell-runtime/src/node/packet.rs new file mode 100644 index 0000000..dcaacc6 --- /dev/null +++ b/unshell-runtime/src/node/packet.rs @@ -0,0 +1,86 @@ +//! Transitional packet-processing wrapper around the current protocol endpoint. +//! +//! This module is intentionally small. It gives the new runtime crate a concrete +//! bridge to the existing packet state machine while the protocol crate is split +//! into packet-only and runtime-owned layers. The wrapper does not own transport +//! handles, does not dispatch leaves, and does not make admission decisions. + +use unshell_protocol::{FrameBytes, tree::Endpoint as ProtocolEndpointTrait}; + +pub use unshell_protocol::tree::{ + ChildRoute, EndpointError, EndpointOutcome, HookKey, Ingress, LeafSpec, LocalEvent, + ProtocolEndpoint, RouteDecision, +}; + +/// Minimal packet processor used by future single-threaded runtimes. +/// +/// The processor receives one frame with an already-derived ingress side and +/// returns the existing endpoint outcome. A full `NodeRuntime` should derive the +/// ingress from registered connection metadata before calling this trait. +pub trait PacketProcessor { + /// Processes one serialized frame through protocol validation, routing, and + /// hook-state transitions. + fn process_frame( + &mut self, + ingress: &Ingress, + frame: FrameBytes, + ) -> Result; +} + +/// Runtime-owned endpoint packet state. +/// +/// This is a compatibility shell around [`ProtocolEndpoint`]. It exists so new +/// runtime code can depend on `unshell_runtime::node::EndpointState` while the +/// old protocol-tree endpoint remains the source of truth for packet invariants. +#[derive(Debug, Default)] +pub struct EndpointState { + endpoint: ProtocolEndpoint, +} + +impl EndpointState { + /// Creates a packet state wrapper from an existing protocol endpoint. + #[must_use] + pub const fn new(endpoint: ProtocolEndpoint) -> Self { + Self { endpoint } + } + + /// Creates packet state for a root-assumed endpoint. + #[must_use] + pub fn root( + local_id: impl Into, + leaves: alloc::vec::Vec, + ) -> Self { + Self::new(ProtocolEndpoint::root(local_id, leaves)) + } + + /// Returns the wrapped protocol endpoint. + #[must_use] + pub const fn endpoint(&self) -> &ProtocolEndpoint { + &self.endpoint + } + + /// Returns mutable access to the wrapped protocol endpoint. + /// + /// This is intentionally exposed only on the transitional wrapper. New runtime + /// code should prefer smaller methods as the endpoint state is split apart. + #[must_use] + pub const fn endpoint_mut(&mut self) -> &mut ProtocolEndpoint { + &mut self.endpoint + } + + /// Consumes the wrapper and returns the underlying protocol endpoint. + #[must_use] + pub fn into_endpoint(self) -> ProtocolEndpoint { + self.endpoint + } +} + +impl PacketProcessor for EndpointState { + fn process_frame( + &mut self, + ingress: &Ingress, + frame: FrameBytes, + ) -> Result { + self.endpoint.receive(ingress, frame) + } +} diff --git a/unshell-runtime/src/node/runtime.rs b/unshell-runtime/src/node/runtime.rs new file mode 100644 index 0000000..636fe94 --- /dev/null +++ b/unshell-runtime/src/node/runtime.rs @@ -0,0 +1,439 @@ +//! Single-threaded runtime shell around endpoint packet state. +//! +//! This first slice owns transport and connection metadata, derives ingress from +//! registered connections, delegates packet invariants to [`EndpointState`], and +//! queues concrete runtime effects. Leaf dispatch and leaf-action application are +//! intentionally not implemented in this slice. + +use crate::connections::{ConnectionDirection, ConnectionId, Connections, RegisteredConnection}; +use crate::effects::{EffectQueue, RuntimeEffect}; +use crate::transport::Transport; +use unshell_protocol::FrameBytes; +use unshell_protocol::tree::{EndpointError, EndpointOutcome, Ingress, RouteDecision}; + +use super::{EndpointState, PacketProcessor}; + +/// Limits one runtime progress step. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct TickBudget { + /// Maximum inbound frames to poll from the transport. + pub max_inbound_frames: usize, + /// Whether queued outbound frame effects should be flushed through transport. + pub flush_outbound: bool, +} + +impl Default for TickBudget { + fn default() -> Self { + Self { + max_inbound_frames: 16, + flush_outbound: true, + } + } +} + +/// Summary returned after one runtime step. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct TickOutcome { + /// Number of inbound frames processed. + pub inbound_frames: usize, + /// Number of outbound frames sent. + pub outbound_frames: usize, + /// Number of frames intentionally dropped. + pub dropped_frames: usize, + /// Number of local endpoint events queued for later leaf dispatch. + pub local_events: usize, +} + +/// Error surfaced by [`NodeRuntime`]. +#[derive(Debug)] +pub enum NodeRuntimeError { + /// The connection is unknown or not registered for protocol routing. + UnregisteredConnection(ConnectionId), + /// The endpoint selected a route with no matching registered connection. + MissingRouteConnection, + /// Packet processing failed inside endpoint state. + Endpoint(EndpointError), + /// Transport send, receive, or flush failed. + Transport(TransportError), +} + +impl core::fmt::Display for NodeRuntimeError +where + TransportError: core::fmt::Display, +{ + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::UnregisteredConnection(connection) => { + write!(f, "connection {} is not registered", connection.get()) + } + Self::MissingRouteConnection => f.write_str("route has no registered connection"), + Self::Endpoint(error) => write!(f, "{error}"), + Self::Transport(error) => write!(f, "{error}"), + } + } +} + +impl core::error::Error for NodeRuntimeError where + TransportError: core::error::Error + 'static +{ +} + +/// Runtime owner for one endpoint, transport, and connection table. +#[derive(Debug)] +pub struct NodeRuntime { + endpoint: EndpointState, + connections: Connections, + transport: T, + effects: EffectQueue, +} + +impl NodeRuntime { + /// Creates a runtime from endpoint state, registered connection metadata, and + /// one concrete transport. + #[must_use] + pub const fn new(endpoint: EndpointState, connections: Connections, transport: T) -> Self { + Self { + endpoint, + connections, + transport, + effects: EffectQueue::new(), + } + } + + /// Returns endpoint packet state. + #[must_use] + pub const fn endpoint(&self) -> &EndpointState { + &self.endpoint + } + + /// Returns mutable endpoint packet state. + #[must_use] + pub const fn endpoint_mut(&mut self) -> &mut EndpointState { + &mut self.endpoint + } + + /// Returns connection metadata. + #[must_use] + pub const fn connections(&self) -> &Connections { + &self.connections + } + + /// Returns mutable connection metadata. + #[must_use] + pub const fn connections_mut(&mut self) -> &mut Connections { + &mut self.connections + } + + /// Returns the transport. + #[must_use] + pub const fn transport(&self) -> &T { + &self.transport + } + + /// Returns the mutable transport. + #[must_use] + pub const fn transport_mut(&mut self) -> &mut T { + &mut self.transport + } + + /// Returns currently queued effects. + #[must_use] + pub fn effects(&self) -> &[RuntimeEffect] { + self.effects.entries() + } +} + +impl NodeRuntime +where + T: Transport, +{ + /// Processes one nonblocking runtime step. + pub fn tick(&mut self, budget: TickBudget) -> Result> { + let mut outcome = TickOutcome::default(); + + for _ in 0..budget.max_inbound_frames { + let Some((connection, frame)) = self + .transport + .poll_recv() + .map_err(NodeRuntimeError::Transport)? + else { + break; + }; + self.receive_frame(connection, frame)?; + outcome.inbound_frames += 1; + } + + outcome.dropped_frames += self + .effects + .entries() + .iter() + .filter(|effect| matches!(effect, RuntimeEffect::Dropped)) + .count(); + outcome.local_events += self + .effects + .entries() + .iter() + .filter(|effect| matches!(effect, RuntimeEffect::Local(_))) + .count(); + + if budget.flush_outbound { + outcome.outbound_frames = self.flush_outbound()?; + } + Ok(outcome) + } + + /// Processes one frame from a known transport connection. + pub fn receive_frame( + &mut self, + connection: ConnectionId, + frame: FrameBytes, + ) -> Result<(), NodeRuntimeError> { + let registered = self + .connections + .registered(connection) + .ok_or(NodeRuntimeError::UnregisteredConnection(connection))?; + let ingress = ingress_for(registered); + let outcome = self + .endpoint + .process_frame(&ingress, frame) + .map_err(NodeRuntimeError::Endpoint)?; + self.apply_outcome(outcome) + } + + fn apply_outcome( + &mut self, + outcome: EndpointOutcome, + ) -> Result<(), NodeRuntimeError> { + match outcome { + EndpointOutcome::Forward { route, frame } => self.queue_forward(route, frame), + EndpointOutcome::Local(event) => { + self.effects.push(RuntimeEffect::Local(event)); + Ok(()) + } + EndpointOutcome::Dropped => { + self.effects.push(RuntimeEffect::Dropped); + Ok(()) + } + } + } + + fn queue_forward( + &mut self, + route: RouteDecision, + frame: FrameBytes, + ) -> Result<(), NodeRuntimeError> { + let (connection, generation) = match route { + RouteDecision::Parent => self + .connections + .registered_by_direction(ConnectionDirection::Parent) + .and_then(|connection| { + connection + .state() + .registered() + .map(|registered| (connection.id(), registered.generation())) + }), + RouteDecision::Child(index) => self + .endpoint + .endpoint() + .child_routes() + .get(index) + .and_then(|child| { + self.connections + .registered_by_path(ConnectionDirection::Child, &child.path) + }) + .and_then(|connection| { + connection + .state() + .registered() + .map(|registered| (connection.id(), registered.generation())) + }), + RouteDecision::Local | RouteDecision::Drop => None, + } + .ok_or(NodeRuntimeError::MissingRouteConnection)?; + + self.effects.push(RuntimeEffect::SendFrame { + connection, + generation, + frame, + }); + Ok(()) + } + + fn flush_outbound(&mut self) -> Result> { + let mut retained = EffectQueue::new(); + let mut sent = 0usize; + for effect in self.effects.drain() { + match effect { + RuntimeEffect::SendFrame { + connection, + generation, + frame, + } if self + .connections + .registered(connection) + .is_some_and(|registered| registered.generation() == generation) => + { + self.transport + .send_frame(connection, frame) + .map_err(NodeRuntimeError::Transport)?; + sent += 1; + } + RuntimeEffect::SendFrame { .. } => {} + other => retained.push(other), + } + } + self.effects = retained; + self.transport + .flush() + .map_err(NodeRuntimeError::Transport)?; + Ok(sent) + } +} + +fn ingress_for(registered: &RegisteredConnection) -> Ingress { + match registered.direction() { + ConnectionDirection::Parent => Ingress::Parent, + ConnectionDirection::Child => Ingress::Child(registered.peer_path().to_vec()), + } +} + +#[cfg(test)] +mod tests { + use crate::alloc::string::String; + use crate::alloc::vec; + use crate::alloc::vec::Vec; + use crate::connections::{ + Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, Connections, + }; + use crate::effects::RuntimeEffect; + use crate::transport::Transport; + use unshell_protocol::tree::{ChildRoute, ProtocolEndpoint}; + use unshell_protocol::{CallMessage, FrameBytes, PacketHeader, PacketType, encode_packet}; + + use super::{EndpointState, NodeRuntime, TickBudget}; + + #[derive(Debug, Default)] + struct RecordingTransport { + inbound: Option<(ConnectionId, FrameBytes)>, + sent: Vec<(ConnectionId, FrameBytes)>, + } + + impl Transport for RecordingTransport { + type Error = core::convert::Infallible; + + fn poll_recv(&mut self) -> Result, Self::Error> { + Ok(self.inbound.take()) + } + + fn send_frame( + &mut self, + connection: ConnectionId, + frame: FrameBytes, + ) -> Result<(), Self::Error> { + self.sent.push((connection, frame)); + Ok(()) + } + } + + #[test] + fn tick_derives_ingress_and_sends_forwarded_child_frame() { + let parent = ConnectionId::new(1); + let child = ConnectionId::new(2); + let mut connections = Connections::new(); + connections.push(Connection::registered( + parent, + ConnectionDirection::Parent, + vec![], + ConnectionGeneration::INITIAL, + )); + connections.push(Connection::registered( + child, + ConnectionDirection::Child, + vec![String::from("agent"), String::from("grand")], + ConnectionGeneration::INITIAL, + )); + + let endpoint = ProtocolEndpoint::new( + vec![String::from("agent")], + Some(vec![]), + vec![ChildRoute::registered(vec![ + String::from("agent"), + String::from("grand"), + ])], + vec![], + ); + + let frame = encode_packet( + &PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent"), String::from("grand")], + dst_leaf: None, + hook_id: None, + }, + &CallMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![], + response_hook: None, + }, + ) + .expect("frame encodes"); + + let transport = RecordingTransport { + inbound: Some((parent, frame)), + sent: Vec::new(), + }; + let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport); + + let outcome = runtime.tick(TickBudget::default()).expect("tick succeeds"); + + assert_eq!(outcome.inbound_frames, 1); + assert_eq!(outcome.outbound_frames, 1); + assert!(runtime.effects().is_empty()); + assert_eq!(runtime.transport().sent[0].0, child); + } + + #[test] + fn receive_keeps_local_events_queued_for_leaf_dispatch() { + let parent = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::registered( + parent, + ConnectionDirection::Parent, + vec![], + ConnectionGeneration::INITIAL, + )); + + let mut endpoint = + ProtocolEndpoint::new(vec![String::from("agent")], Some(vec![]), vec![], vec![]); + endpoint + .add_endpoint_procedure("org.example.v1.echo.invoke") + .expect("procedure registers"); + let frame = encode_packet( + &PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent")], + dst_leaf: None, + hook_id: None, + }, + &CallMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![], + response_hook: None, + }, + ) + .expect("frame encodes"); + + let mut runtime = NodeRuntime::new( + EndpointState::new(endpoint), + connections, + RecordingTransport::default(), + ); + + runtime + .receive_frame(parent, frame) + .expect("frame processes"); + assert!(matches!(runtime.effects()[0], RuntimeEffect::Local(_))); + } +} diff --git a/unshell-runtime/src/node/state.rs b/unshell-runtime/src/node/state.rs new file mode 100644 index 0000000..d15102e --- /dev/null +++ b/unshell-runtime/src/node/state.rs @@ -0,0 +1,15 @@ +//! Node lifecycle state. + +/// Lifecycle state for a runtime node. +#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum NodeState { + /// The node has been constructed but has not started transport activity. + #[default] + Created, + /// The node is accepting local work and transport events. + Running, + /// The node is draining work before shutdown. + Stopping, + /// The node has stopped and should not accept new work. + Stopped, +} diff --git a/unshell-runtime/src/transport.rs b/unshell-runtime/src/transport.rs new file mode 100644 index 0000000..750f148 --- /dev/null +++ b/unshell-runtime/src/transport.rs @@ -0,0 +1,31 @@ +//! Nonblocking transport contract for the single-threaded runtime. +//! +//! Transports move already-framed protocol packets. They do not know tree paths, +//! leaf names, hook state, admission policy, or route decisions. + +use crate::connections::ConnectionId; +use unshell_protocol::FrameBytes; + +/// Nonblocking frame transport used by [`crate::node::NodeRuntime`]. +pub trait Transport { + /// Transport-specific error. + type Error; + + /// Polls for one inbound frame. + /// + /// `Ok(None)` means no frame is currently ready. Implementations must not + /// block inside this method; callers drive progress by calling `tick` again. + fn poll_recv(&mut self) -> Result, Self::Error>; + + /// Sends one framed packet on a registered connection. + fn send_frame( + &mut self, + connection: ConnectionId, + frame: FrameBytes, + ) -> Result<(), Self::Error>; + + /// Flushes buffered outbound transport data, if the transport has any. + fn flush(&mut self) -> Result<(), Self::Error> { + Ok(()) + } +} diff --git a/ush-obfuscate/src/lib.rs b/ush-obfuscate/src/lib.rs index cb37a22..9ce4a2a 100644 --- a/ush-obfuscate/src/lib.rs +++ b/ush-obfuscate/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(proc_macro_quote)] #![feature(proc_macro_span)] #![allow(dead_code, unused_macros, unused_imports)]