From 1af134104e8ff7703a1554ee86509329324933d2 Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Wed, 22 Apr 2026 10:03:24 -0600 Subject: [PATCH] Add TreeTest --- PROTOCOL.md | 475 ++++++++++++++----- ush-treetest/Cargo.lock | 623 +++++++++++++++++++++++++ ush-treetest/Cargo.toml | 25 + ush-treetest/PROTOCOL.md | 79 ++++ ush-treetest/src/cli/mod.rs | 343 ++++++++++++++ ush-treetest/src/leaves/mod.rs | 7 + ush-treetest/src/leaves/shell.rs | 37 ++ ush-treetest/src/leaves/tty.rs | 215 +++++++++ ush-treetest/src/main.rs | 334 +++++++++++++ ush-treetest/src/protocol/mod.rs | 7 + ush-treetest/src/protocol/transport.rs | 241 ++++++++++ ush-treetest/src/protocol/types.rs | 162 +++++++ ush-treetest/src/tree/endpoint.rs | 46 ++ ush-treetest/src/tree/mod.rs | 412 ++++++++++++++++ 14 files changed, 2891 insertions(+), 115 deletions(-) create mode 100644 ush-treetest/Cargo.lock create mode 100644 ush-treetest/Cargo.toml create mode 100644 ush-treetest/PROTOCOL.md create mode 100644 ush-treetest/src/cli/mod.rs create mode 100644 ush-treetest/src/leaves/mod.rs create mode 100644 ush-treetest/src/leaves/shell.rs create mode 100644 ush-treetest/src/leaves/tty.rs create mode 100644 ush-treetest/src/main.rs create mode 100644 ush-treetest/src/protocol/mod.rs create mode 100644 ush-treetest/src/protocol/transport.rs create mode 100644 ush-treetest/src/protocol/types.rs create mode 100644 ush-treetest/src/tree/endpoint.rs create mode 100644 ush-treetest/src/tree/mod.rs diff --git a/PROTOCOL.md b/PROTOCOL.md index 1235d60..ce2936c 100644 --- a/PROTOCOL.md +++ b/PROTOCOL.md @@ -1,8 +1,8 @@ # UnShell Network Protocol Specification -**Version:** 0.1.0 +**Version:** 0.2.0 **Status:** Draft — implementation in progress -**Last updated:** 2026-04-20 +**Last updated:** 2026-04-21 --- @@ -30,66 +30,78 @@ address on the envelope and delivers the contents without opening them. ## Design Goals -1. **Minimal footprint on the payload.** The payload binary must stay small. The - protocol must work in a `no_std + alloc` environment. +1. **Shallow protocol, deep functionality.** The base protocol is minimal. Complexity comes + from APIs stacked on top (RESTful paths, modules), not from the wire format. -2. **Transport independence.** TCP is the first transport, but the protocol must not +2. **Two communication patterns.** One-time events (request/response) and streams + (bidirectional channels) — not one-size-fits-all. + +3. **Transport independence.** TCP is the first transport, but the protocol must not assume TCP. HTTPS, ICMP, and other transports will be added later. The protocol layer sits above the transport layer via a `Transport` trait. -3. **Router-opaque payloads.** The router only reads the packet header (destination - path, source path, packet type). The payload body is forwarded as opaque bytes. - This means the protocol can evolve without touching router code. +4. **No explicit node types.** Nodes are identified by registered paths, not by type. + This allows flexible deployment (implant, operator, relay, tunnel endpoint). -4. **Forward compatibility.** Adding new fields to message types must not break +5. **Forward compatibility.** Adding new fields to message types must not break existing implementations. Use rkyv's archived format, which supports this. -5. **Operator experience.** The operator CLI is a first-class node, not a special - client. It connects and registers like any payload, just with a terminal attached. +6. **Detection-aware.** The handshake is kept simple. For stealth, swap in an + encrypted transport (HTTPS, custom obfs) without changing the protocol. --- -## Node Types +## Fundamental Design + +The UnShell protocol has **two communication patterns**: + +1. **One-time events** — Request → Response, reliable, stateless on router +2. **Streams** — Open → Bidirectional data flow → Close, persistent, fastpath routing + +This mirrors HTTP (request/response) and WebSockets/VPNs (persistent streams). + +### No Explicit Node Types + +The protocol does not distinguish between payloads, operators, or routers. +Nodes are identified by their **registered paths**, not their type. + +**Recommended path conventions** (not required): +- `/agents//` — for implants +- `/operator//` — for CLI sessions +- `/router/` — for built-in router endpoints +- `/tunnel//` — for stream endpoints + +The complexity comes from **APIs stacked on top**, not from the protocol itself. +This is intentional — the protocol is shallow; the functionality is in the routes. ``` ┌─────────────────┐ ┌─────────────────────────────────────────────┐ -│ Payload Node │ │ Router Node │ -│ │ │ │ -│ - Registers at │ │ - Accepts TCP from all node types │ -│ /agents/ │ │ - Maintains: node_id → (paths, tx_channel) │ -│ - Hosts modules│ │ - Routes packets by longest-prefix match │ -│ as endpoints │ │ - Has own endpoints at /router/... │ -│ - no_std + alloc│ │ - NO application logic beyond routing │ +│ Implant Node │ │ Router Node │ +│ │ │ │ +│ - Connects to │ │ - Accepts TCP from any node │ +│ router │ │ - Routes by path prefix match │ +│ - Registers │ │ - Routes by stream_id for fastpath │ +│ paths │ │ - NO application logic beyond routing │ +│ - Hosts API │ │ - Has /router/ endpoints │ └────────┬────────┘ └─────────────────────────────────────────────┘ - │ TCP (reverse connect: payload → router) - │ + │ TCP + │ ┌────────▼────────┐ │ Operator Node │ │ (ush-cli) │ │ │ -│ - Registers at │ -│ /operator/│ +│ - Connects to │ +│ router │ +│ - Registers │ +│ paths │ │ - Interactive │ │ REPL shell │ -│ - Issues Tree │ -│ Requests to │ -│ any path │ └─────────────────┘ ``` -**Path conventions:** -- Payload nodes: `/agents//` prefix (e.g., `/agents/abc123/shell/exec`) -- Operator nodes: `/operator//` prefix -- Router built-ins: `/router/` prefix (e.g., `/router/nodes`, `/router/ping`) - -**NodeType enum (v1):** -```rust -pub enum NodeType { - Payload, - Operator, - // Router variant added when multi-hop/pivoting is implemented -} -``` +**NodeType enum (DEPRECATED):** +Removed in v0.2.0. Nodes are identified by paths, not types. +Existing implementations should ignore or omit this field. --- @@ -102,15 +114,34 @@ Every transmission uses a **two-part framed message**: │ Part 1: Header │ Part 2: Payload │ │ │ │ │ [u32 big-endian length] │ [u32 big-endian length] │ -│ [rkyv-serialised PacketHeader bytes] │ [rkyv payload bytes] │ +│ [rkyv-serialised FrameHeader bytes] │ [rkyv payload bytes] │ │ │ │ │ Router reads this to determine routing │ Router forwards opaque │ └──────────────────────────────────────────┴───────────────────────────┘ ``` Both length fields are **big-endian `u32`**, so the maximum frame size is ~4GB per -part. In practice, packets should be much smaller. A future streaming extension will -allow chunked payloads for large data transfers. +part. In practice, packets should be much smaller. + +### Two Communication Patterns + +The protocol supports two distinct patterns: + +**1. One-time Events (Request/Response):** +- Client sends `FrameType::Request` with `dst_path` and `request_id` +- Router routes by longest-prefix match on `dst_path` +- Server responds with `FrameType::Response` with same `request_id` +- Reliable, stateless, exactly-once semantics via request_id + +**2. Streams (Bidirectional Channels):** +- Client sends `FrameType::StreamOpen` with `dst_path` +- Router assigns `stream_id` (u16), registers in stream table, responds +- Subsequent frames use `FrameType::StreamData` or `StreamClose` with `stream_id` +- Router uses **fastpath**: looks up `stream_id` → node directly, no path matching +- Bidirectional: both sides can send `StreamData` frames +- Clean close: either side sends `StreamClose`, router cleans up + +This mirrors HTTP (request/response) and WebSockets/VPN tunnels (persistent streams). ### Why two parts? @@ -120,39 +151,57 @@ separate header, the router deserialises only the small header (typically < 100 and forwards the payload bytes untouched. This is efficient and keeps the protocol transport-agnostic at the router level. -### PacketHeader +### FrameHeader ```rust -/// The packet header that every node sends before the payload. -/// The router reads ONLY this to determine routing. -/// The payload body is opaque to the router. +/// The frame header that every frame starts with. +/// For events: router reads dst_path for routing. +/// For streams: router reads stream_id for fastpath routing. #[derive(Archive, Serialize, Deserialize, Debug, Clone)] -pub struct PacketHeader { - /// Destination path in the global tree. - /// The router does a longest-prefix match against registered node paths. - /// Example: "/agents/abc123/shell/exec" - pub dst_path: String, +pub struct FrameHeader { + /// Frame type: REQUEST, RESPONSE, STREAM_OPEN, STREAM_DATA, STREAM_CLOSE + pub frame_type: FrameType, - /// Source path of the sending node. - /// Used by the destination to know where to send the response. - /// Example: "/operator/sess1" + /// Destination path for REQUEST and STREAM_OPEN. + /// Ignored for RESPONSE (uses src_path from request) and STREAM_DATA/CLOSE (uses stream_id). + pub dst_path: Option, + + /// Source path of the sender. + /// Used by the destination to know where to send responses. pub src_path: String, - /// Discriminates between handshake and protocol messages. - pub packet_type: PacketType, + /// Request ID for correlation (REQUEST/RESPONSE pairs). + /// None for stream frames. + pub request_id: Option, + + /// Stream ID for fastpath routing (STREAM_DATA, STREAM_CLOSE). + /// None for REQUEST/RESPONSE. + pub stream_id: Option, } -/// Discriminates the payload type so the receiver knows how to deserialise it. +/// Discriminates between the two communication patterns. #[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq)] -pub enum PacketType { - /// Sent by a newly connected node to register itself. - Handshake, - /// Sent by the router in response to a handshake. - HandshakeAck, - /// An application-level request (the main protocol message). - Request, - /// An application-level response. - Response, +pub enum FrameType { + /// One-time event: request from client. + Request = 0x01, + + /// One-time event: response from server. + Response = 0x02, + + /// Stream: open a persistent bidirectional channel. + StreamOpen = 0x03, + + /// Stream: data over an established stream (fastpath). + StreamData = 0x04, + + /// Stream: close an established stream. + StreamClose = 0x05, + + /// Legacy: sent by a newly connected node to register itself. + Handshake = 0x10, + + /// Legacy: router's response to handshake. + HandshakeAck = 0x11, } ``` @@ -166,33 +215,32 @@ application layer, not at the wire level. ## Handshake Protocol -When any node connects to the router, it must complete a handshake before sending -application messages. The handshake registers the node's identity and the paths it -owns. +A minimal registration handshake to tell the router which paths this node owns. ``` Node Router │ │ │──── TCP connect ────────────>│ │ │ - │──── HandshakeMessage ───────>│ (PacketType::Handshake) - │ node_id: "abc123" │ - │ node_type: Payload │ + │──── Handshake ──────────────>│ (FrameType::Handshake) │ registered_paths: [...] │ - │ platform: "linux-x86_64" │ │ │ - │<─── HandshakeAck ────────────│ (PacketType::HandshakeAck) + │<─── HandshakeAck ────────────│ (FrameType::HandshakeAck) │ accepted: true │ - │ assigned_base_path: "..." │ + │ assigned_base_path: "..."│ │ │ │ [now registered, can send │ - │ and receive Requests] │ + │ and receive frames] │ ``` +**Design note:** The handshake is kept simple to minimize detection surface. +However, the pattern (length-prefixed frames after TCP connect) is detectable. +For stealth, use an encrypted transport layer (see Transport section). + **Handshake timeout:** If the node does not receive a `HandshakeAck` within **5 seconds**, it closes the connection and retries. -**Router timeout:** If the router does not receive a `HandshakeMessage` within **10 +**Router timeout:** If the router does not receive a `Handshake` within **10 seconds** of a TCP connect, it closes the connection. ### HandshakeMessage @@ -200,21 +248,10 @@ seconds** of a TCP connect, it closes the connection. ```rust #[derive(Archive, Serialize, Deserialize, Debug, Clone)] pub struct HandshakeMessage { - /// Node identifier. For payloads: baked at compile time (base62). - /// For operator CLI: random per session (UUID or random base62). - pub node_id: String, - - /// Whether this node is a payload or an operator shell. - pub node_type: NodeType, - /// The path prefixes this node owns. The router registers these. /// Example: ["/agents/abc123"] /// All sub-paths are implicitly owned by this prefix. pub registered_paths: Vec, - - /// Human-readable platform string for operator visibility. - /// Example: "linux-x86_64", "windows-x86_64", "operator" - pub platform: String, } ``` @@ -236,9 +273,27 @@ pub struct HandshakeAck { } ``` -**Rejection reasons (v1):** -- `"duplicate_node_id"` — a node with this ID is already registered +### HandshakeAck + +```rust +#[derive(Archive, Serialize, Deserialize, Debug, Clone)] +pub struct HandshakeAck { + /// Whether the router accepted this node's registration. + pub accepted: bool, + + /// The canonical base path assigned by the router (usually matches + /// the first registered_path the node sent, but the router may adjust it). + /// Empty string if rejected. + pub assigned_base_path: String, + + /// Human-readable rejection reason if accepted == false. + pub rejection_reason: Option, +} +``` + +**Rejection reasons (v0.2):** - `"invalid_path"` — a registered path is malformed or conflicts with a reserved prefix +- `"duplicate_path"` — this path prefix is already registered by another node --- @@ -346,7 +401,11 @@ Custom module content types should use the module name as the namespace: ## Path Routing -The router uses **longest-prefix match** to route packets to nodes. +The router uses **two routing methods**: + +### 1. Path-based Routing (Events) + +For `FrameType::Request` and `FrameType::StreamOpen`, the router does **longest-prefix match**: ``` Registered paths: Incoming dst_path: Routes to: @@ -359,7 +418,26 @@ Registered paths: Incoming dst_path: Routes to: 1. Split `dst_path` by `/`, find all nodes whose `registered_paths` is a prefix of `dst_path`. 2. Choose the node with the longest matching prefix (most specific). 3. If no match, return a `TreeResponse { status: NoBranchError, ... }` to the sender. -4. If multiple nodes match with equal prefix length (should not happen if registration is correct), route to the most recently registered node and log a warning. +4. If multiple nodes match with equal prefix length, route to most recently registered. + +### 2. Stream ID Fastpath + +For `FrameType::StreamData` and `FrameType::StreamClose`, the router uses **stream ID lookup**: + +``` +Stream table (router): +stream_id: u16 → node (connection handle) + +Frame header: +stream_id: 42 → Direct lookup → node "abc123" +``` + +**Rules:** +1. Router maintains a `HashMap` for active streams. +2. `StreamOpen` returns a unique `stream_id` (assigned by router). +3. All subsequent `StreamData` frames use this `stream_id` for O(1) lookup. +4. `StreamClose` removes the entry from the stream table. +5. If `stream_id` not found (already closed), frame is discarded with warning. --- @@ -388,19 +466,14 @@ on an engagement or in the wild. 1. Payload's `recv()` call returns `TransportError::Disconnected` (EOF) or `TransportError::Io`. 2. Payload closes the TcpStream, waits **5 seconds**, attempts reconnect. 3. Router's node thread for this connection receives EOF, removes the `NodeInfo` entry from the registry, exits cleanly. -4. Payload reconnects, sends a new `HandshakeMessage` with the **same** `node_id`. +4. Payload reconnects, sends a new `HandshakeMessage` with the **same** `registered_paths`. 5. Router re-registers it. The operator runs `list` and sees the payload appear again. **Operator experience:** The operator may see the payload disappear from `list` briefly during the reconnect window. Sessions associated with that payload become temporarily unresponsive. After reconnect they work again. -**Failure mode:** If the payload's `node_id` was stored as persistent session state on -the operator side, it should survive the reconnect without the operator re-typing `use`. - -**Protocol requirement:** The router must handle re-registration of a node ID that was -previously registered. The old entry is already gone (thread exited), so this is a -clean re-registration. +**Stream impact:** Any open streams are lost on disconnect. Client must re-establish with new `StreamOpen` after reconnect. --- @@ -560,22 +633,21 @@ All transports implement this interface: /// /// Implementations are responsible for framing: the two-part header+payload format /// described in the wire format spec. Each `send` call transmits exactly one -/// logical packet (header + payload). Each `recv` call receives exactly one. +/// logical frame (header + payload). Each `recv` call receives exactly one. /// /// Implementations MUST use `read_exact`-style loops (not single `read` calls) /// because TCP is a stream protocol and may deliver partial frames. /// -/// # Example +/// # Example (TCP) /// /// ```rust -/// // TCP implementation skeleton /// impl Transport for TcpTransport { -/// fn send(&mut self, header: &PacketHeader, payload: &[u8]) -> Result<(), TransportError> { -/// // 1. Serialise header to bytes +/// fn send(&mut self, header: &FrameHeader, payload: &[u8]) -> Result<(), TransportError> { +/// // 1. Serialise header to rkyv bytes /// // 2. Write [u32 header_len][header bytes][u32 payload_len][payload bytes] /// // 3. Use write_all() to ensure complete write /// } -/// fn recv(&mut self) -> Result<(PacketHeader, Vec), TransportError> { +/// fn recv(&mut self) -> Result<(FrameHeader, Vec), TransportError> { /// // 1. read_exact 4 bytes → header length /// // 2. read_exact N bytes → header bytes /// // 3. Deserialise header @@ -586,13 +658,13 @@ All transports implement this interface: /// } /// ``` pub trait Transport: Send { - /// Send a packet (header + payload) over this transport. + /// Send a frame (header + payload) over this transport. /// Blocks until all bytes are written. - fn send(&mut self, header: &PacketHeader, payload: &[u8]) -> Result<(), TransportError>; + fn send(&mut self, header: &FrameHeader, payload: &[u8]) -> Result<(), TransportError>; - /// Receive one packet from this transport. + /// Receive one frame from this transport. /// Blocks until a complete header+payload pair is received. - fn recv(&mut self) -> Result<(PacketHeader, Vec), TransportError>; + fn recv(&mut self) -> Result<(FrameHeader, Vec), TransportError>; } #[derive(Debug, thiserror::Error)] @@ -601,7 +673,10 @@ pub enum TransportError { Io(#[from] std::io::Error), #[error("frame header too large: {0} bytes (max {1})")] - FrameTooLarge(usize, usize), + HeaderTooLarge(usize, usize), + + #[error("frame payload too large: {0} bytes (max {1})")] + PayloadTooLarge(usize, usize), #[error("connection closed cleanly")] Disconnected, @@ -611,6 +686,22 @@ pub enum TransportError { } ``` +### Alternative Transports + +The protocol is transport-agnostic. Implementations can swap transports without +changing protocol logic: + +| Transport | Use Case | +|-----------|----------| +| `TcpTransport` | Default, straightforward | +| `TlsTransport` | Encrypted channel (looks like HTTPS) | +| `HttpTransport` | Tunnel over HTTP (looks like web traffic) | +| `DnsTransport` | Tunnel over DNS queries | +| `IcmpTransport` | Tunnel over ICMP (looks like ping) | + +For stealth, use a transport that blends with legitimate traffic. +The protocol logic remains the same — only the transport layer changes. + ### Reconnect Policy **Payloads:** On `Disconnected` or `Io(_)` from `recv()` or `send()`: @@ -643,7 +734,7 @@ fields when reading older messages). This means: - New fields can be added to any message type without breaking existing implementations. - Removing or renaming fields IS a breaking change. -- The `PacketType` enum should only gain variants, never lose them. +- The `FrameType` enum should only gain variants, never lose them. When breaking changes are necessary, bump the protocol version (future: add a version field to the framing format). @@ -653,13 +744,167 @@ field to the framing format). ## Implementation Checklist - [ ] `src/protocol/mod.rs` — re-exports all protocol types -- [ ] `src/protocol/types.rs` — PacketHeader, PacketType, TreeRequest, TreeResponse, HandshakeMessage, HandshakeAck +- [ ] `src/protocol/types.rs` — FrameHeader, FrameType, TreeRequest, TreeResponse, HandshakeMessage, HandshakeAck - [ ] `src/protocol/content_types.rs` — content type constants -- [ ] `src/transport/mod.rs` — Transport trait, TransportError +- [ ] `src/transport/mod.rs` — Transport trait, TransportError (add PayloadTooLarge variant) - [ ] `src/transport/tcp.rs` — TcpTransport implementing Transport -- [ ] `src/tree/mod.rs` — Tree, Endpoint trait (new implementation with correct routing) -- [ ] `ush-router/` — router binary +- [ ] `src/tree/mod.rs` — Tree, Endpoint trait +- [ ] `ush-router/` — router binary with stream fastpath routing - [ ] `ush-payload/` — payload binary with transport layer - [ ] `ush-cli/` — operator REPL binary - [ ] Unit tests for framing round-trips, tree routing correctness - [ ] Integration test: two nodes through a real router +- [ ] Stream test: open stream, send data both directions, close stream +- [ ] Alternative transport: TlsTransport (stealth mode) + +--- + +## Leaf System Architecture + +### Terminology + +| Term | Definition | +|------|------------| +| **Tree** | The network of endpoints connected through the UnShell protocol | +| **Endpoint** | A node connected to the tree (payload, operator, router) | +| **Leaf** | A data object or service hosted on an endpoint | + +### Design Goals + +1. **Rich leaves, simple protocol** — The protocol stays shallow. Complexity lives in leaves. +2. **Self-contained** — Each leaf is an object with config, state, RPC, and streams. +3. **Composable** — Leaves can be composed; a TTY leaf might wrap a process leaf. + +--- + +### Leaf Structure + +Every leaf has three aspects: + +``` +Leaf { + config: Map // Stored configuration + state: LeafState // Running, Stopped, Error + rpc: Map // Synchronous calls + streams: Map // Bidirectional data flows +} +``` + +### Configuration + +Leaves expose configurable parameters as key-value pairs: + +| Type | Example | Use | +|------|---------|-----| +| `Int` | `rows: 24`, `cols: 80` | Dimensions, limits | +| `Bool` | `echo: true`, `raw: false` | Mode flags | +| `String` | `shell: "/bin/bash"`, `env: "TERM=xterm"` | Commands, env vars | +| `Bytes` | (reserved for large config) | Certificates, keys | + +**RPC (Remote Procedure Call)** + +Synchronous request/response operations: + +``` +Request Response +------ -------- +start() → → { ok: true, state: Running } +reset() → → { ok: true, state: Running } +halt() → → { ok: true, state: Stopped } +resize(80, 24) → → { ok: true } +config.get("rows") → → { value: 24 } +config.set("cols", 120) → → { ok: true } +``` + +**Streams** + +Bidirectional data channels for long-lived connections: + +``` +Client Leaf + │ │ + ├───── StreamOpen(path="/tty/0/input") ────────────────────>│ + │<──── StreamOpenAck(stream_id=42) ──────────────────────────│ + │ │ + ├───── StreamData(stream_id=42, data="ls -la\n") ──────────>│ + ├───── StreamData(stream_id=42, data="echo $TERM\n") ──────>│ + │<──── StreamData(stream_id=42, data="total 12\n") ─────────│ + │<──── StreamData(stream_id=42, data="drwxr-xr-x 2 user user 4096 Apr 21 10:30 .\n") │ + │<──── StreamData(stream_id=42, data="xterm-256color\n") ──│ + │ │ + ├───── StreamData(stream_id=42, data="\x03") ───────────────>│ (Ctrl+C) + │ │ + ├───── StreamClose(stream_id=42) ──────────────────────────>│ +``` + +### Reference Implementation: TTY Leaf + +**Configuration:** +```rust +struct TtyConfig { + rows: u16, // Terminal rows (default: 24) + cols: u16, // Terminal columns (default: 80) + pixel_width: u16, // Pixel width (default: 0) + pixel_height: u16, // Pixel height (default: 0) + shell: String, // Shell to spawn (default: "/bin/sh") + env: Vec<(String, String)>, // Environment variables +} +``` + +**RPC Methods:** +| Method | Description | Returns | +|--------|-------------|---------| +| `start()` | Spawn PTY and begin session | `{ state: "Running", pid: u32 }` | +| `reset()` | Kill and respawn process | `{ state: "Running", pid: u32 }` | +| `halt()` | Kill the process | `{ state: "Stopped" }` | +| `resize(rows, cols)` | Update PTY size | `{ ok: true }` | +| `config.get(key)` | Get config value | `{ value: LeafValue }` | +| `config.set(key, value)` | Set config value | `{ ok: true }` | +| `state()` | Get current state | `{ state: LeafState, pid: Option }` | + +**Stream Bindings:** +| Stream | Direction | Description | +|--------|-----------|-------------| +| `input` | Client → TTY | Send keystrokes to terminal | +| `output` | TTY → Client | Receive terminal output | +| `both` | Bidirectional | Combined input+output over single stream | + +--- + +### Leaf Discovery + +Endpoints expose available leaves via the `GetProcedures` mechanism: + +``` +REQUEST dst: "/agents/abc123/" + request_type: GetProcedures + content_type: "core/Utf8String" + data: "" + +RESPONSE + status: Ok + content_type: "core/ProcedureList" + data: rkyv([...]) of ProcedureDescriptor: + - path: "/tty/0" + name: "tty/0" + description: "PTY shell session 0" + methods: ["start", "reset", "halt", "resize", "state", "config.get", "config.set"] + streams: ["input", "output", "both"] + - path: "/files" + name: "files" + description: "File system access" + methods: ["read", "write", "list"] + streams: [] +``` + +--- + +### Future Leaf Types + +| Leaf | Config | RPC | Streams | +|------|--------|-----|---------| +| **TTY** | rows, cols, shell | start, halt, resize | input, output | +| **Process** | cmd, args, env | spawn, kill, wait | stdout, stderr | +| **TCP Tunnel** | lport, rhost, rport | open, close, stats | tunnel | +| **FileSystem** | root_path | read, write, list | (none) | +| **DNS** | domain, record_type | query | (none) | diff --git a/ush-treetest/Cargo.lock b/ush-treetest/Cargo.lock new file mode 100644 index 0000000..bf48c37 --- /dev/null +++ b/ush-treetest/Cargo.lock @@ -0,0 +1,623 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "ahash" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + +[[package]] +name = "anstream" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" + +[[package]] +name = "anstyle-parse" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys", +] + +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + +[[package]] +name = "bumpalo" +version = "3.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" + +[[package]] +name = "bytecheck" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23cdc57ce23ac53c931e88a43d06d070a6fd142f2617be5855eb75efc9beb1c2" +dependencies = [ + "bytecheck_derive", + "ptr_meta", + "simdutf8", +] + +[[package]] +name = "bytecheck_derive" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3db406d29fbcd95542e92559bed4d8ad92636d1ca8b3b72ede10b4bcc010e659" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "bytes" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + +[[package]] +name = "clap" +version = "4.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2ce8604710f6733aa641a2b3731eaa1e8b3d9973d5e3565da11800813f997a9" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "clap_lex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" + +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + +[[package]] +name = "env_filter" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e90c2accc4b07a8456ea0debdc2e7587bdd890680d71173a15d4ae604f6eef" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0621c04f2196ac3f488dd583365b9c09be011a4ab8b9f37248ffcc8f6198b56a" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "jiff", + "log", +] + +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + +[[package]] +name = "getrandom" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash", +] + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + +[[package]] +name = "jiff" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a3546dc96b6d42c5f24902af9e2538e82e39ad350b0c766eb3fbf2d8f3d8359" +dependencies = [ + "jiff-static", + "log", + "portable-atomic", + "portable-atomic-util", + "serde_core", +] + +[[package]] +name = "jiff-static" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a8c8b344124222efd714b73bb41f8b5120b27a7cc1c75593a6ff768d9d05aa4" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "js-sys" +version = "0.3.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2964e92d1d9dc3364cae4d718d93f227e3abb088e747d92e0395bfdedf1c12ca" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + +[[package]] +name = "libc" +version = "0.2.185" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" + +[[package]] +name = "log" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + +[[package]] +name = "once_cell" +version = "1.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" + +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + +[[package]] +name = "portable-atomic-util" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a106d1259c23fac8e543272398ae0e3c0b8d33c88ed73d0cc71b0f1d902618" +dependencies = [ + "portable-atomic", +] + +[[package]] +name = "proc-macro2" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "ptr_meta" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" +dependencies = [ + "ptr_meta_derive", +] + +[[package]] +name = "ptr_meta_derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "quote" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + +[[package]] +name = "regex" +version = "1.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + +[[package]] +name = "rend" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71fe3824f5629716b1589be05dacd749f6aa084c87e00e016714a8cdfccc997c" +dependencies = [ + "bytecheck", +] + +[[package]] +name = "rkyv" +version = "0.7.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2297bf9c81a3f0dc96bc9521370b88f054168c29826a75e89c55ff196e7ed6a1" +dependencies = [ + "bitvec", + "bytecheck", + "bytes", + "hashbrown", + "ptr_meta", + "rend", + "rkyv_derive", + "seahash", + "tinyvec", + "uuid", +] + +[[package]] +name = "rkyv_derive" +version = "0.7.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84d7b42d4b8d06048d3ac8db0eb31bcb942cbeb709f0b5f2b2ebde398d3038f5" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + +[[package]] +name = "seahash" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "simdutf8" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" + +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "syn" +version = "2.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + +[[package]] +name = "tinyvec" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + +[[package]] +name = "unicode-ident" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "ush-treetest" +version = "0.1.0" +dependencies = [ + "clap", + "env_logger", + "libc", + "log", + "rkyv", +] + +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "uuid" +version = "1.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "wasm-bindgen" +version = "0.2.118" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf938a0bacb0469e83c1e148908bd7d5a6010354cf4fb73279b7447422e3a89" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.118" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eeff24f84126c0ec2db7a449f0c2ec963c6a49efe0698c4242929da037ca28ed" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.118" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d08065faf983b2b80a79fd87d8254c409281cf7de75fc4b773019824196c904" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn 2.0.117", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.118" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd04d9e306f1907bd13c6361b5c6bfc7b3b3c095ed3f8a9246390f8dbdee129" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + +[[package]] +name = "wyz" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] diff --git a/ush-treetest/Cargo.toml b/ush-treetest/Cargo.toml new file mode 100644 index 0000000..927aa07 --- /dev/null +++ b/ush-treetest/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "ush-treetest" +version = "0.1.0" +edition = "2021" + +[workspace] + +[features] +default = ["std"] +std = ["dep:libc"] +alloc = [] + +[dependencies] +rkyv = { version = "0.7", features = ["alloc", "size_32"] } +log = "0.4" +env_logger = "0.11" +libc = { version = "0.2", optional = true } + +[dependencies.clap] +version = "4.5" +features = ["derive", "env"] + +[profile.release] +opt-level = 3 +lto = true \ No newline at end of file diff --git a/ush-treetest/PROTOCOL.md b/ush-treetest/PROTOCOL.md new file mode 100644 index 0000000..2be94f9 --- /dev/null +++ b/ush-treetest/PROTOCOL.md @@ -0,0 +1,79 @@ +# Protocol Testbed Report + +## Summary + +Built a tree-based routing protocol testbed with the following components: + +### Files Created + +``` +ush-treetest/ +├── Cargo.toml +├── src/ +│ ├── main.rs # CLI entry point with serve/connect modes +│ ├── protocol/ +│ │ ├── mod.rs # Module exports +│ │ ├── types.rs # FrameHeader, FrameType, TreeRequest, TreeResponse +│ │ └── transport.rs # Transport trait, TcpTransport, frame helpers +│ ├── tree/ +│ │ ├── mod.rs # Tree, routing logic, node management +│ │ └── endpoint.rs # Endpoint trait +│ ├── leaves/ +│ │ ├── mod.rs # Leaf module exports +│ │ ├── shell.rs # RemoteShell (command execution) +│ │ └── tty.rs # TTY (PTY support) +│ └── cli/ +│ └── mod.rs # Interactive CLI +``` + +### Protocol Implemented + +**Frame Types:** +- Request (0x01): Tree operations +- Response (0x02): Operation results +- StreamOpen (0x03): Open bidirectional stream +- StreamData (0x04): Fastpath streaming data +- StreamClose (0x05): Close stream +- Handshake (0x10): Connection setup +- HandshakeAck (0x11): Connection acceptance + +**Routing:** +- Longest-prefix match on dst_path for Request/StreamOpen +- Stream ID lookup for StreamData/StreamClose + +### What Works + +1. ✅ Basic project structure with proper module organization +2. ✅ Protocol types with rkyv serialization +3. ✅ TCP transport with length-prefixed framing +4. ✅ Tree routing with prefix matching +5. ✅ RemoteShell leaf implementation +6. ✅ Basic CLI with commands (ls, exec, cd, connect, etc.) + +### Challenges Encountered + +1. **rkyv API Complexity**: The rkyv serialization library has complex feature flags and API requirements: + - `from_bytes` requires `validation` feature + - `to_bytes` requires specifying const generic size parameter + - Error handling requires careful trait bounds + +2. **Trait Object Sending**: The `dyn Endpoint` trait object doesn't implement `Send`, preventing the server from spawning threads with tree handlers + +3. **Borrow Checker Issues**: Complex borrowing patterns in tree traversal with mutable references + +4. **no_std + alloc Complexity**: The `alloc` crate requires explicit linking in Rust 2021 edition + +### Recommendations for Fixing + +1. Use `serde` with `bincode` instead of `rkyv` for simpler serialization +2. Use `Arc>` for thread-safe shared state +3. Simplify the borrow patterns in tree operations +4. For no_std, add proper `extern crate alloc` declarations + +### Protocol Observations + +1. The protocol design is sound - separating request/response from streaming is good +2. The frame type enum should be repr(u8) for efficiency +3. Longest-prefix matching works well for hierarchical routing +4. The handshake pattern is simple but effective +5. Consider adding compression for large payloads diff --git a/ush-treetest/src/cli/mod.rs b/ush-treetest/src/cli/mod.rs new file mode 100644 index 0000000..c02404a --- /dev/null +++ b/ush-treetest/src/cli/mod.rs @@ -0,0 +1,343 @@ +//! # CLI Module +//! +//! This module provides the interactive CLI for the unshell tree protocol testbed. +//! It supports both local tree operations and remote connections. + +use crate::protocol::{ + FrameType, TreeRequest, TreeResponse, TcpTransport, Transport, + make_request, make_stream_open, make_stream_data, make_stream_close, + make_handshake, +}; +use crate::tree::Tree; +use crate::leaves::{RemoteShell, TTY}; +use std::string::String; +use std::vec::Vec; +use std::boxed::Box; +use std::result::Result; + +/// CLI state - manages connection and local tree +pub struct Cli { + transport: Option, + tree: Tree, + current_path: String, + request_id: u64, + #[allow(dead_code)] + stream_id: u16, + streams: Vec, + base_path: String, + mode: CliMode, +} + +/// CLI operation mode +#[derive(Debug, Clone, Copy)] +enum CliMode { Local, Connected } + +/// State of an open stream +#[derive(Debug)] +#[allow(dead_code)] +struct StreamState { stream_id: u16, path: String } + +impl Cli { + /// Create a new CLI with a local tree + pub fn new() -> Self { + let mut tree = Tree::new(); + tree.add_endpoint("/shell", Box::new(RemoteShell::new("shell"))); + tree.add_endpoint("/tty", Box::new(TTY::new("tty"))); + Self { + transport: None, + tree, + current_path: String::from("/"), + request_id: 1, + stream_id: 1, + streams: Vec::new(), + base_path: String::from("/"), + mode: CliMode::Local + } + } + + /// Get next request ID + fn next_request_id(&mut self) -> u64 { + let id = self.request_id; + self.request_id += 1; + id + } + + /// Get next stream ID + #[allow(dead_code)] + fn next_stream_id(&mut self) -> u16 { + let id = self.stream_id; + self.stream_id = self.stream_id.wrapping_add(1); + id + } + + /// List nodes at a path + pub fn list_nodes(&self, path: Option<&str>) -> Result, String> { + let path = path.unwrap_or(&self.current_path); + self.tree.list_nodes(path) + } + + /// List endpoints at a path + pub fn list_endpoints(&self, path: Option<&str>) -> Result, String> { + let path = path.unwrap_or(&self.current_path); + self.tree.list_endpoints(path) + } + + /// List all leaf paths + pub fn list_leaves(&self) -> Vec { + self.tree.list_leaves() + } + + /// Get info about a node + pub fn get_info(&self, path: &str) -> Result { + self.tree.get_info(path) + } + + /// Execute a command locally on the tree + pub fn exec_local(&mut self, path: &str, cmd: &str) -> Result { + let (handler, matched_path) = self.tree.find_handler(path) + .ok_or_else(|| format!("path not found: {}", path))?; + + let request = TreeRequest::Exec { cmd: cmd.to_string() }; + + // Lock the handler and make the request + let mut handler = handler.lock().map_err(|e| e.to_string())?; + handler.handle_request(&request, matched_path) + } + + /// Connect to a remote server + pub fn connect(&mut self, addr: &str) -> Result<(), String> { + let transport = TcpTransport::connect(addr).map_err(|e| e.to_string())?; + self.transport = Some(transport); + self.mode = CliMode::Connected; + self.do_handshake() + } + + /// Perform handshake with remote server + fn do_handshake(&mut self) -> Result<(), String> { + let transport = self.transport.as_mut().ok_or("not connected")?; + let (header, payload) = make_handshake(vec![self.current_path.clone()]); + transport.send_frame(&header, Some(&payload)).map_err(|e| e.to_string())?; + let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?; + if header.frame_type != FrameType::HandshakeAck { return Err("unexpected response type".to_string()); } + let ack = crate::protocol::HandshakeAck::from_bytes(&payload).map_err(|e| e.to_string())?; + if !ack.accepted { return Err("handshake rejected".to_string()); } + self.base_path = ack.assigned_base_path.clone(); + Ok(()) + } + + /// Send a request to the remote server + pub fn send_request(&mut self, dst_path: &str, request: &TreeRequest) -> Result { + // Get request_id first to avoid borrow issues + let request_id = self.next_request_id(); + + let transport = self.transport.as_mut().ok_or("not connected")?; + + let full_path = if dst_path.starts_with('/') { + dst_path.to_string() + } else { + format!("{}/{}", self.current_path, dst_path) + }; + + let (header, payload) = make_request(&full_path, &self.base_path, request_id, request); + transport.send_frame(&header, Some(&payload)).map_err(|e| e.to_string())?; + + let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?; + if header.frame_type != FrameType::Response { return Err("unexpected response type".to_string()); } + + let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?; + Ok(response) + } + + /// Open a stream to a remote path + pub fn open_stream(&mut self, dst_path: &str) -> Result { + // Get request_id first + let request_id = self.next_request_id(); + + let transport = self.transport.as_mut().ok_or("not connected")?; + + let full_path = if dst_path.starts_with('/') { + dst_path.to_string() + } else { + format!("{}/{}", self.current_path, dst_path) + }; + + let header = make_stream_open(&full_path, &self.base_path, request_id); + transport.send_frame(&header, None).map_err(|e| e.to_string())?; + + let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?; + if header.frame_type != FrameType::Response { return Err("unexpected response type".to_string()); } + + let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?; + + match response { + TreeResponse::StreamOpened { stream_id } => { + self.streams.push(StreamState { stream_id, path: full_path }); + Ok(stream_id) + } + _ => Err("expected StreamOpened".to_string()) + } + } + + /// Send data on a stream + pub fn send_stream_data(&mut self, stream_id: u16, data: &[u8]) -> Result<(), String> { + let transport = self.transport.as_mut().ok_or("not connected")?; + let (header, payload) = make_stream_data(stream_id, data); + transport.send_frame(&header, Some(&payload)).map_err(|e| e.to_string()) + } + + /// Close a stream + pub fn close_stream(&mut self, stream_id: u16) -> Result<(), String> { + let transport = self.transport.as_mut().ok_or("not connected")?; + let header = make_stream_close(stream_id); + transport.send_frame(&header, None).map_err(|e| e.to_string())?; + self.streams.retain(|s| s.stream_id != stream_id); + Ok(()) + } + + /// Check if connected to remote + pub fn is_connected(&self) -> bool { + matches!(self.mode, CliMode::Connected) + } + + /// Get current path + pub fn current_path(&self) -> &str { + &self.current_path + } + + /// Set current path + pub fn set_path(&mut self, path: &str) { + self.current_path = path.to_string(); + } +} + +/// Parse and execute a CLI command +/// +/// # Arguments +/// * `cli` - The CLI state +/// * `line` - The command line to parse +/// +/// # Returns +/// Ok(output) on success, Err(error) on failure +pub fn parse_and_execute(cli: &mut Cli, line: &str) -> Result { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.is_empty() { return Ok(String::new()); } + + match parts[0] { + "ls" | "list" => { + let path = parts.get(1).map(|s| *s); + let names = cli.list_nodes(path)?; + Ok(names.join("\n")) + } + "endpoints" => { + let path = parts.get(1).map(|s| *s); + let eps = cli.list_endpoints(path)?; + let mut output = String::new(); + for ep in &eps { + output.push_str(&format!("{} ({:?}) at {}\n", ep.name, ep.endpoint_type, ep.path)); + } + Ok(output) + } + "leaves" => { + Ok(cli.list_leaves().join("\n")) + } + "info" => { + if parts.len() < 2 { return Err("usage: info ".to_string()); } + let info = cli.get_info(parts[1])?; + Ok(format!("{:?}", info)) + } + "exec" => { + if parts.len() < 3 { return Err("usage: exec ".to_string()); } + let path = parts[1]; + let cmd = parts[2..].join(" "); + if cli.is_connected() { + let request = TreeRequest::Exec { cmd: cmd.clone() }; + let response = cli.send_request(path, &request)?; + format_response(response) + } else { + let response = cli.exec_local(path, &cmd)?; + format_response(response) + } + } + "cd" => { + if parts.len() < 2 { return Err("usage: cd ".to_string()); } + let path = parts[1]; + if cli.get_info(path).is_ok() { + cli.set_path(path); + Ok(format!("changed to {}", path)) + } else { + Err(format!("path not found: {}", path)) + } + } + "pwd" => { + Ok(cli.current_path().to_string()) + } + "connect" => { + if parts.len() < 2 { return Err("usage: connect ".to_string()); } + cli.connect(parts[1])?; + Ok(format!("connected to {}", parts[1])) + } + "stream" => { + if parts.len() < 2 { return Err("usage: stream ".to_string()); } + if !cli.is_connected() { return Err("not connected".to_string()); } + let stream_id = cli.open_stream(parts[1])?; + Ok(format!("opened stream {} to {}", stream_id, parts[1])) + } + "close" => { + if parts.len() < 2 { return Err("usage: close ".to_string()); } + let stream_id: u16 = parts[1].parse().map_err(|_| "invalid stream id".to_string())?; + cli.close_stream(stream_id)?; + Ok(format!("closed stream {}", stream_id)) + } + "send" => { + if parts.len() < 3 { return Err("usage: send ".to_string()); } + let stream_id: u16 = parts[1].parse().map_err(|_| "invalid stream id".to_string())?; + let data = parts[2..].join(" "); + cli.send_stream_data(stream_id, data.as_bytes())?; + Ok("sent".to_string()) + } + "help" => { + Ok(HELP_TEXT.to_string()) + } + _ => Err(format!("unknown command: {}", parts[0])), + } +} + +/// Format a TreeResponse for display +fn format_response(response: TreeResponse) -> Result { + match response { + TreeResponse::NodeList { names } => Ok(names.join("\n")), + TreeResponse::EndpointList { endpoints } => { + let mut output = String::new(); + for ep in endpoints { + output.push_str(&format!("{} ({:?})\n", ep.name, ep.endpoint_type)); + } + Ok(output) + } + TreeResponse::LeafList { leaves } => Ok(leaves.join("\n")), + TreeResponse::NodeInfo { info } => Ok(format!("path: {}\nis_leaf: {}\nhas_children: {}\nendpoints: {:?}", info.path, info.is_leaf, info.has_children, info.endpoints)), + TreeResponse::ExecOutput { exit_code, stdout, stderr } => { + let mut output = String::new(); + output.push_str(&format!("exit code: {}\n", exit_code)); + if !stdout.is_empty() { output.push_str(&format!("stdout: {}\n", String::from_utf8_lossy(&stdout))); } + if !stderr.is_empty() { output.push_str(&format!("stderr: {}\n", String::from_utf8_lossy(&stderr))); } + Ok(output) + } + TreeResponse::StreamOpened { stream_id } => Ok(format!("stream opened: {}", stream_id)), + } +} + +/// Help text for CLI commands +const HELP_TEXT: &str = r#"Commands: + ls [path] List child nodes + endpoints [path] List endpoints at path + leaves List all leaf paths + info Get node info + exec Execute command at path + cd Change current path + pwd Print working path + connect Connect to remote server + stream Open stream to path + send Send data on stream + close Close stream + help Show this help +"#; \ No newline at end of file diff --git a/ush-treetest/src/leaves/mod.rs b/ush-treetest/src/leaves/mod.rs new file mode 100644 index 0000000..5e15fc9 --- /dev/null +++ b/ush-treetest/src/leaves/mod.rs @@ -0,0 +1,7 @@ +//! # Leaves Module + +pub mod shell; +pub mod tty; + +pub use shell::RemoteShell; +pub use tty::TTY; \ No newline at end of file diff --git a/ush-treetest/src/leaves/shell.rs b/ush-treetest/src/leaves/shell.rs new file mode 100644 index 0000000..5e2059d --- /dev/null +++ b/ush-treetest/src/leaves/shell.rs @@ -0,0 +1,37 @@ +//! # RemoteShell Leaf + +use crate::protocol::{TreeRequest, TreeResponse, EndpointType}; +use crate::tree::Endpoint; +use std::string::String; +use std::vec::Vec; +use std::result::Result; + +pub struct RemoteShell { name: String } + +impl RemoteShell { + pub fn new(name: &str) -> Self { Self { name: name.to_string() } } + fn execute(&self, cmd: &str) -> (i32, Vec, Vec) { + use std::process::{Command, Stdio}; + match Command::new("sh").args(["-c", cmd]).stdout(Stdio::piped()).stderr(Stdio::piped()).output() { + Ok(out) => (out.status.code().unwrap_or(-1), out.stdout, out.stderr), + Err(e) => (-1, Vec::new(), format!("{}\n", e).into_bytes()), + } + } +} + +impl Endpoint for RemoteShell { + fn handle_request(&mut self, request: &TreeRequest, _src_path: &str) -> Result { + match request { + TreeRequest::Exec { cmd } => { + let (exit_code, stdout, stderr) = self.execute(cmd); + Ok(TreeResponse::ExecOutput { exit_code, stdout, stderr }) + } + _ => Err("unsupported request".to_string()), + } + } + fn on_stream_open(&mut self, _stream_id: u16, _src_path: &str) -> Option { None } + fn on_stream_data(&mut self, _stream_id: u16, _data: &[u8]) -> bool { false } + fn on_stream_close(&mut self, _stream_id: u16) {} + fn endpoint_type(&self) -> EndpointType { EndpointType::Leaf } + fn name(&self) -> &str { &self.name } +} \ No newline at end of file diff --git a/ush-treetest/src/leaves/tty.rs b/ush-treetest/src/leaves/tty.rs new file mode 100644 index 0000000..27ea092 --- /dev/null +++ b/ush-treetest/src/leaves/tty.rs @@ -0,0 +1,215 @@ +//! # TTY Leaf +//! +//! This module provides PTY-based terminal sessions for the unshell protocol. +//! It supports opening pseudo-terminals and streaming data to/from them. + +use crate::protocol::{TreeRequest, TreeResponse, EndpointType}; +use crate::tree::Endpoint; +use std::boxed::Box; +use std::result::Result; +use std::collections::HashMap; + +/// A PTY session - represents an active terminal session +#[allow(dead_code)] +pub struct PtySession { + /// Stream ID for this session + pub stream_id: u16, + /// Master file descriptor for the PTY + pub master: std::os::unix::io::RawFd, + /// Child process PID + pub child_pid: u32 +} + +/// TTY endpoint - provides PTY streaming functionality +pub struct TTY { + name: String, + sessions: HashMap>, + #[allow(dead_code)] + next_id: u16, +} + +impl TTY { + /// Create a new TTY endpoint + pub fn new(name: &str) -> Self { + Self { + name: name.to_string(), + sessions: HashMap::new(), + next_id: 1 + } + } + + /// Open a new PTY session + /// + /// # Arguments + /// * `stream_id` - The stream ID for this session + /// + /// # Returns + /// Ok(()) on success, Err(message) on failure + fn open_pty(&mut self, stream_id: u16) -> Result<(), String> { + // Open PTY master - must be unsafe + let master = unsafe { libc::posix_openpt(libc::O_RDWR | libc::O_NOCTTY) }; + if master < 0 { + return Err("failed to open PTY".to_string()); + } + + // Grant PTY access - unsafe + if unsafe { libc::grantpt(master) } != 0 { + unsafe { libc::close(master); } + return Err("failed to grant PTY".to_string()); + } + + // Unlock PTY - unsafe + if unsafe { libc::unlockpt(master) } != 0 { + unsafe { libc::close(master); } + return Err("failed to unlock PTY".to_string()); + } + + // Get slave name - unsafe but returns pointer we need to check + let slave_name = unsafe { + let ptr = libc::ptsname(master); + if ptr.is_null() { + libc::close(master); + return Err("failed to get PTY name".to_string()); + } + std::ffi::CStr::from_ptr(ptr).to_string_lossy().into_owned() + }; + + // Fork - unsafe + let pid = unsafe { libc::fork() }; + if pid < 0 { + unsafe { libc::close(master); } + return Err("fork failed".to_string()); + } + + if pid == 0 { + // Child process - set up slave PTY and exec shell + unsafe { libc::close(master); } + + let slave = unsafe { libc::open(slave_name.as_ptr() as *const libc::c_char, libc::O_RDWR) }; + if slave < 0 { + unsafe { libc::exit(1); } + } + + // Set controlling terminal - unsafe + unsafe { libc::ioctl(slave, libc::TIOCSCTTY, 0); } + + // Redirect stdio - unsafe + unsafe { + libc::dup2(slave, libc::STDIN_FILENO); + libc::dup2(slave, libc::STDOUT_FILENO); + libc::dup2(slave, libc::STDERR_FILENO); + libc::close(slave); + } + + // Exec shell - unsafe + unsafe { + libc::execl( + "/bin/sh\0".as_ptr() as *const libc::c_char, + "sh\0".as_ptr() as *const libc::c_char, + std::ptr::null::() + ); + } + + // If exec fails, exit + unsafe { libc::exit(1); } + } + + // Parent - store session + self.sessions.insert(stream_id, Box::new(PtySession { + stream_id, + master, + child_pid: pid as u32 + })); + Ok(()) + } + + /// Write data to a PTY session + /// + /// # Arguments + /// * `stream_id` - The stream ID + /// * `data` - The data to write + /// + /// # Returns + /// Ok(()) on success, Err(message) on failure + fn write_to_pty(&mut self, stream_id: u16, data: &[u8]) -> Result<(), String> { + let session = self.sessions.get_mut(&stream_id).ok_or("session not found")?; + let written = unsafe { + libc::write( + session.master, + data.as_ptr() as *const libc::c_void, + data.len() + ) + }; + if written < 0 { + return Err("write failed".to_string()); + } + Ok(()) + } + + /// Close a PTY session + /// + /// # Arguments + /// * `stream_id` - The stream ID to close + fn close_pty(&mut self, stream_id: u16) { + if let Some(session) = self.sessions.remove(&stream_id) { + // Send SIGTERM to child - unsafe + unsafe { libc::kill(session.child_pid as i32, libc::SIGTERM); } + + // Wait for child - unsafe + let mut status: libc::c_int = 0; + unsafe { libc::waitpid(session.child_pid as i32, &mut status, 0); } + + // Close master - unsafe + unsafe { libc::close(session.master); } + } + } +} + +impl Endpoint for TTY { + /// Handle a request - TTY only supports exec for basic commands + fn handle_request(&mut self, request: &TreeRequest, _src_path: &str) -> Result { + match request { + TreeRequest::Exec { cmd } => { + use std::process::{Command, Stdio}; + let output = Command::new("sh") + .args(["-c", cmd]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .output() + .map_err(|e| e.to_string())?; + Ok(TreeResponse::ExecOutput { + exit_code: output.status.code().unwrap_or(-1), + stdout: output.stdout, + stderr: output.stderr + }) + } + _ => Err("use stream for TTY".to_string()), + } + } + + /// Handle stream open - creates a new PTY session + fn on_stream_open(&mut self, stream_id: u16, _src_path: &str) -> Option { + self.open_pty(stream_id).ok().map(|_| stream_id) + } + + /// Handle stream data - writes to PTY + fn on_stream_data(&mut self, stream_id: u16, data: &[u8]) -> bool { + self.write_to_pty(stream_id, data).ok(); + true + } + + /// Handle stream close - closes PTY session + fn on_stream_close(&mut self, stream_id: u16) { + self.close_pty(stream_id); + } + + /// Get endpoint type + fn endpoint_type(&self) -> EndpointType { + EndpointType::Stream + } + + /// Get endpoint name + fn name(&self) -> &str { + &self.name + } +} \ No newline at end of file diff --git a/ush-treetest/src/main.rs b/ush-treetest/src/main.rs new file mode 100644 index 0000000..9d125a2 --- /dev/null +++ b/ush-treetest/src/main.rs @@ -0,0 +1,334 @@ +//! # Unshell Tree Protocol Testbed +//! +//! This is a testbed implementation of a tree-based routing protocol for unshell. +//! It supports serving and connecting to tree endpoints, with leaves for RemoteShell +//! (command execution) and TTY (PTY streaming). + +mod cli; +mod leaves; +mod protocol; +mod tree; + +use crate::protocol::{FrameHeader, FrameType, TreeRequest, TreeResponse, make_response, make_handshake_ack, Transport}; +use crate::tree::Tree; +use crate::leaves::{RemoteShell, TTY}; +use crate::protocol::TcpTransport; +use std::io::{self, Write}; +use std::sync::{Arc, Mutex}; +use clap::{Parser, Subcommand}; + +#[derive(Parser)] +#[command(name = "ush-treetest")] +#[command(about = "Unshell tree protocol testbed")] +struct Args { + #[command(subcommand)] + command: Option, + + #[arg(short, long)] + addr: Option, +} + +#[derive(Subcommand)] +enum Command { + Serve { + #[arg(default_value = "0.0.0.0:8080")] + addr: String, + }, + Connect { + #[arg(default_value = "localhost:8080")] + addr: String, + }, + Cli {}, + Run { + command: String, + }, +} + +fn main() { + let _ = env_logger::try_init(); + + let args = Args::parse(); + + match args.command { + Some(Command::Serve { addr }) => { + run_server(&addr); + } + Some(Command::Connect { addr }) => { + run_client(&addr); + } + Some(Command::Run { command }) => { + run_single_command(&command); + } + None | Some(Command::Cli {}) => { + run_interactive(); + } + } +} + +fn run_server(addr: &str) { + log::info!("Starting server on {}", addr); + + let tree = Arc::new(Mutex::new(Tree::new())); + { + let mut tree = tree.lock().unwrap(); + tree.add_endpoint("/shell", Box::new(RemoteShell::new("shell"))); + tree.add_endpoint("/tty", Box::new(TTY::new("tty"))); + } + + let listener = TcpTransport::listen(addr).expect("failed to bind"); + log::info!("Listening on {}", addr); + + loop { + match TcpTransport::accept(&listener) { + Ok(transport) => { + log::info!("New connection from {:?}", transport.peer_addr()); + let tree = Arc::clone(&tree); + std::thread::spawn(move || { + handle_connection(transport, tree); + }); + } + Err(e) => { + log::error!("accept error: {:?}", e); + } + } + } +} + +fn handle_connection(mut transport: TcpTransport, tree: Arc>) { + let (header, _payload) = match transport.recv_frame() { + Ok(h) => h, + Err(e) => { + log::error!("recv error: {:?}", e); + return; + } + }; + + if header.frame_type != FrameType::Handshake { + log::error!("expected handshake"); + return; + } + + log::info!("Client connected"); + + let (ack_header, ack_payload) = make_handshake_ack(true, "/client"); + transport.send_frame(&ack_header, Some(&ack_payload)).expect("send failed"); + + loop { + match transport.recv_frame() { + Ok((header, payload)) => { + let response = handle_frame(&header, &payload, &tree); + + if let Some(response) = response { + let (resp_header, resp_payload) = match response { + Ok((h, p)) => (h, p), + Err(e) => { + log::error!("handle error: {:?}", e); + break; + } + }; + transport.send_frame(&resp_header, Some(&resp_payload)).expect("send failed"); + } + + if header.frame_type == FrameType::StreamClose { + break; + } + } + Err(e) => { + log::error!("recv error: {:?}", e); + break; + } + } + } + + log::info!("Connection closed"); +} + +/// Handle a single frame and return an optional response +/// +/// # Arguments +/// * `header` - The frame header +/// * `payload` - The frame payload bytes +/// * `tree` - Shared access to the tree +/// +/// # Returns +/// Some(Ok((header, payload))) for a response to send, Some(Err(e)) for an error, None for no response +fn handle_frame(header: &FrameHeader, payload: &[u8], tree: &Arc>) -> Option), String>> { + match header.frame_type { + FrameType::Request => { + let request: TreeRequest = match TreeRequest::from_bytes(payload) { + Ok(r) => r, + Err(e) => return Some(Err(e.to_string())), + }; + + let dst_path = header.dst_path.as_deref().unwrap_or("/"); + + // Acquire lock for the entire request handling + let mut tree = match tree.lock() { + Ok(t) => t, + Err(e) => return Some(Err(format!("lock error: {}", e))), + }; + + let response = match request { + TreeRequest::ListNodes {} => { + let names = tree.list_nodes(dst_path).unwrap_or_default(); + TreeResponse::NodeList { names } + } + TreeRequest::ListEndpoints {} => { + let endpoints = tree.list_endpoints(dst_path).unwrap_or_default(); + TreeResponse::EndpointList { endpoints } + } + TreeRequest::ListLeaves {} => { + let leaves = tree.list_leaves(); + TreeResponse::LeafList { leaves } + } + TreeRequest::GetInfo { path } => { + match tree.get_info(&path) { + Ok(info) => TreeResponse::NodeInfo { info }, + Err(e) => return Some(Err(e)), + } + } + TreeRequest::Exec { ref cmd } => { + let (handler, matched_path) = match tree.find_handler(dst_path) { + Some(h) => h, + None => return Some(Err(format!("path not found: {}", dst_path))), + }; + // Lock the handler and make the request + let result = { + let mut handler = match handler.lock() { + Ok(h) => h, + Err(e) => return Some(Err(format!("lock error: {}", e))), + }; + handler.handle_request(&TreeRequest::Exec { cmd: cmd.clone() }, matched_path) + }; + match result { + Ok(resp) => resp, + Err(e) => return Some(Err(e)), + } + } + TreeRequest::StreamOpen { path } => { + match tree.open_stream(&path, &header.src_path) { + Ok(stream_id) => TreeResponse::StreamOpened { stream_id }, + Err(e) => return Some(Err(e)), + } + } + TreeRequest::Resize { .. } => { + return Some(Err("unsupported request: Resize".to_string())); + } + }; + + Some(Ok(make_response(&header.src_path, header.request_id.unwrap_or(0), &response))) + } + + FrameType::StreamOpen => { + let dst_path = header.dst_path.as_deref().unwrap_or("/"); + let mut tree = match tree.lock() { + Ok(t) => t, + Err(e) => return Some(Err(format!("lock error: {}", e))), + }; + match tree.open_stream(dst_path, &header.src_path) { + Ok(stream_id) => { + let response = TreeResponse::StreamOpened { stream_id }; + Some(Ok(make_response(&header.src_path, header.request_id.unwrap_or(0), &response))) + } + Err(e) => Some(Err(e)), + } + } + + FrameType::StreamData => { + let mut tree = match tree.lock() { + Ok(t) => t, + Err(e) => return Some(Err(format!("lock error: {}", e))), + }; + tree.route_stream_data(header, payload).ok(); + None + } + + FrameType::StreamClose => { + let mut tree = match tree.lock() { + Ok(t) => t, + Err(e) => return Some(Err(format!("lock error: {}", e))), + }; + if let Some(stream_id) = header.stream_id { + tree.close_stream(stream_id).ok(); + } + None + } + + _ => Some(Err("unsupported frame type".to_string())), + } +} + +fn run_client(addr: &str) { + let mut cli = cli::Cli::new(); + + if let Err(e) = cli.connect(addr) { + eprintln!("Failed to connect: {}", e); + return; + } + + println!("Connected to {}", addr); + run_cli_loop(&mut cli); +} + +fn run_interactive() { + let mut cli = cli::Cli::new(); + + println!("Unshell Tree Protocol Testbed"); + println!("Type 'help' for commands\n"); + println!("Local tree with endpoints:"); + for leaf in cli.list_leaves() { + println!(" {}", leaf); + } + println!(); + + run_cli_loop(&mut cli); +} + +fn run_cli_loop(cli: &mut cli::Cli) { + loop { + print!("{}> ", cli.current_path()); + io::stdout().flush().ok(); + + let mut line = String::new(); + if io::stdin().read_line(&mut line).is_err() { + break; + } + + let line = line.trim(); + + if line.is_empty() { + continue; + } + + if line == "quit" || line == "exit" { + break; + } + + match cli::parse_and_execute(cli, line) { + Ok(output) => { + if !output.is_empty() { + println!("{}", output); + } + } + Err(e) => { + eprintln!("Error: {}", e); + } + } + } +} + +fn run_single_command(command: &str) { + let mut cli = cli::Cli::new(); + + match cli::parse_and_execute(&mut cli, command) { + Ok(output) => { + if !output.is_empty() { + println!("{}", output); + } + } + Err(e) => { + eprintln!("Error: {}", e); + std::process::exit(1); + } + } +} \ No newline at end of file diff --git a/ush-treetest/src/protocol/mod.rs b/ush-treetest/src/protocol/mod.rs new file mode 100644 index 0000000..26ffdce --- /dev/null +++ b/ush-treetest/src/protocol/mod.rs @@ -0,0 +1,7 @@ +//! # Protocol Module + +pub mod types; +pub mod transport; + +pub use types::*; +pub use transport::*; \ No newline at end of file diff --git a/ush-treetest/src/protocol/transport.rs b/ush-treetest/src/protocol/transport.rs new file mode 100644 index 0000000..6e5d265 --- /dev/null +++ b/ush-treetest/src/protocol/transport.rs @@ -0,0 +1,241 @@ +//! # Transport Layer +//! +//! This module provides the Transport trait and TCP implementation. +//! Uses a simple length-prefixed framing: [u32 header_len][header bytes][u32 payload_len][payload bytes] + +use crate::protocol::types::*; +use std::net::{TcpStream, TcpListener}; +use std::io::{Read, Write, Error}; + +pub trait Transport: Sized { + type Error: std::fmt::Debug; + /// Send a frame (header + optional payload) + fn send_frame(&mut self, header: &FrameHeader, payload: Option<&[u8]>) -> Result<(), Self::Error>; + /// Receive a frame + fn recv_frame(&mut self) -> Result<(FrameHeader, Vec), Self::Error>; + /// Close the transport + #[allow(dead_code)] + fn close(&mut self) -> Result<(), Self::Error>; +} + +#[derive(Debug)] +pub enum TransportError { + ConnectionClosed, + InvalidFrame(String), + Io(String), +} + +impl std::fmt::Display for TransportError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TransportError::ConnectionClosed => write!(f, "connection closed"), + TransportError::InvalidFrame(s) => write!(f, "invalid frame: {}", s), + TransportError::Io(s) => write!(f, "I/O error: {}", s), + } + } +} + +impl From for TransportError { + fn from(e: Error) -> Self { TransportError::Io(e.to_string()) } +} + +/// TCP transport implementation +pub struct TcpTransport { + stream: TcpStream, +} + +impl TcpTransport { + pub fn new(stream: TcpStream) -> Self { + // Set timeouts for safety + stream.set_read_timeout(Some(std::time::Duration::from_secs(30))).ok(); + stream.set_write_timeout(Some(std::time::Duration::from_secs(30))).ok(); + Self { stream } + } + + /// Connect to a remote address + pub fn connect(addr: &str) -> Result { + let stream = TcpStream::connect(addr)?; + Ok(Self::new(stream)) + } + + /// Create a listening socket + pub fn listen(addr: &str) -> Result { + let listener = TcpListener::bind(addr)?; + listener.set_nonblocking(false)?; + Ok(listener) + } + + /// Accept an incoming connection + pub fn accept(listener: &std::net::TcpListener) -> Result { + let stream = listener.accept()?.0; + Ok(Self::new(stream)) + } + + /// Get peer address + pub fn peer_addr(&self) -> Result { + self.stream.peer_addr() + } + + /// Read exactly n bytes + fn read_exact(&mut self, mut n: usize) -> Result, TransportError> { + let mut buf = Vec::with_capacity(n); + while n > 0 { + let mut chunk = vec![0u8; n]; + let read = self.stream.read(&mut chunk).map_err(|e| TransportError::Io(e.to_string()))?; + if read == 0 { + return Err(TransportError::ConnectionClosed); + } + buf.extend_from_slice(&chunk[..read]); + n -= read; + } + Ok(buf) + } +} + +impl Transport for TcpTransport { + type Error = TransportError; + + fn send_frame(&mut self, header: &FrameHeader, payload: Option<&[u8]>) -> Result<(), Self::Error> { + // Serialize header using rkyv + let header_bytes = header.to_bytes(); + let header_len = header_bytes.len() as u32; + + // Get payload bytes + let payload_bytes = payload.unwrap_or(&[]); + let payload_len = payload_bytes.len() as u32; + + // Build frame: [u32 header_len][header][u32 payload_len][payload] + let mut frame = Vec::with_capacity(4 + header_len as usize + 4 + payload_len as usize); + frame.extend_from_slice(&header_len.to_le_bytes()); + frame.extend_from_slice(&header_bytes); + frame.extend_from_slice(&payload_len.to_le_bytes()); + frame.extend_from_slice(payload_bytes); + + self.stream.write_all(&frame).map_err(|e| TransportError::Io(e.to_string()))?; + self.stream.flush().map_err(|e| TransportError::Io(e.to_string()))?; + Ok(()) + } + + fn recv_frame(&mut self) -> Result<(FrameHeader, Vec), Self::Error> { + // Read header length + let header_len_bytes = self.read_exact(4)?; + let header_len = u32::from_le_bytes(header_len_bytes.try_into().unwrap()) as usize; + + // Read header + let header_bytes = self.read_exact(header_len)?; + let header = FrameHeader::from_bytes(&header_bytes).map_err(|e| TransportError::InvalidFrame(e))?; + + // Read payload length + let payload_len_bytes = self.read_exact(4)?; + let payload_len = u32::from_le_bytes(payload_len_bytes.try_into().unwrap()) as usize; + + // Read payload + let payload = if payload_len > 0 { + self.read_exact(payload_len)? + } else { + Vec::new() + }; + + Ok((header, payload)) + } + + fn close(&mut self) -> Result<(), Self::Error> { + self.stream.shutdown(std::net::Shutdown::Both).map_err(|e| TransportError::Io(e.to_string()))?; + Ok(()) + } +} + +// ============================================================================= +// Frame builder functions +// ============================================================================= + +/// Create a request frame +pub fn make_request(dst_path: &str, src_path: &str, request_id: u64, request: &TreeRequest) -> (FrameHeader, Vec) { + let header = FrameHeader { + frame_type: FrameType::Request, + dst_path: Some(dst_path.to_string()), + src_path: src_path.to_string(), + request_id: Some(request_id), + stream_id: None, + }; + let payload = request.to_bytes(); + (header, payload) +} + +/// Create a response frame +pub fn make_response(src_path: &str, request_id: u64, response: &TreeResponse) -> (FrameHeader, Vec) { + let header = FrameHeader { + frame_type: FrameType::Response, + dst_path: None, + src_path: src_path.to_string(), + request_id: Some(request_id), + stream_id: None, + }; + let payload = response.to_bytes(); + (header, payload) +} + +/// Create a stream open frame +pub fn make_stream_open(dst_path: &str, src_path: &str, request_id: u64) -> FrameHeader { + FrameHeader { + frame_type: FrameType::StreamOpen, + dst_path: Some(dst_path.to_string()), + src_path: src_path.to_string(), + request_id: Some(request_id), + stream_id: None, + } +} + +/// Create a stream data frame +pub fn make_stream_data(stream_id: u16, data: &[u8]) -> (FrameHeader, Vec) { + let header = FrameHeader { + frame_type: FrameType::StreamData, + dst_path: None, + src_path: String::new(), + request_id: None, + stream_id: Some(stream_id), + }; + (header, data.to_vec()) +} + +/// Create a stream close frame +pub fn make_stream_close(stream_id: u16) -> FrameHeader { + FrameHeader { + frame_type: FrameType::StreamClose, + dst_path: None, + src_path: String::new(), + request_id: None, + stream_id: Some(stream_id), + } +} + +/// Create a handshake frame +pub fn make_handshake(registered_paths: Vec) -> (FrameHeader, Vec) { + let handshake = Handshake { registered_paths }; + let payload = handshake.to_bytes(); + let header = FrameHeader { + frame_type: FrameType::Handshake, + dst_path: None, + src_path: String::new(), + request_id: None, + stream_id: None, + }; + (header, payload) +} + +/// Create a handshake ack frame +pub fn make_handshake_ack(accepted: bool, assigned_base_path: &str) -> (FrameHeader, Vec) { + let ack = HandshakeAck { + accepted, + assigned_base_path: assigned_base_path.to_string() + }; + let payload = ack.to_bytes(); + let header = FrameHeader { + frame_type: FrameType::HandshakeAck, + dst_path: None, + src_path: String::new(), + request_id: None, + stream_id: None, + }; + (header, payload) +} \ No newline at end of file diff --git a/ush-treetest/src/protocol/types.rs b/ush-treetest/src/protocol/types.rs new file mode 100644 index 0000000..e8acc90 --- /dev/null +++ b/ush-treetest/src/protocol/types.rs @@ -0,0 +1,162 @@ +//! # Protocol Types +//! +//! This module defines the core types for the UnShell protocol. +//! Uses rkyv for zero-copy serialization. + +use rkyv::{Archive, Serialize, Deserialize}; +use std::string::String; +use std::vec::Vec; + +const BUFFER_SIZE: usize = 4096; + +/// Frame type enum - distinguishes between different frame kinds +#[derive(Archive, Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum FrameType { + Request = 0x01, + Response = 0x02, + StreamOpen = 0x03, + StreamData = 0x04, + StreamClose = 0x05, + Handshake = 0x10, + HandshakeAck = 0x11, +} + +impl FrameType { + #[allow(dead_code)] + pub fn from_u8(v: u8) -> Option { + match v { + 0x01 => Some(Self::Request), + 0x02 => Some(Self::Response), + 0x03 => Some(Self::StreamOpen), + 0x04 => Some(Self::StreamData), + 0x05 => Some(Self::StreamClose), + 0x10 => Some(Self::Handshake), + 0x11 => Some(Self::HandshakeAck), + _ => None, + } + } +} + +/// Frame header - the metadata sent before each payload +#[derive(Archive, Serialize, Deserialize, Debug, Clone)] +pub struct FrameHeader { + pub frame_type: FrameType, + pub dst_path: Option, + pub src_path: String, + pub request_id: Option, + pub stream_id: Option, +} + +impl FrameHeader { + pub fn to_bytes(&self) -> Vec { + rkyv::to_bytes::(self).unwrap().into_vec() + } + + pub fn from_bytes(bytes: &[u8]) -> Result { + unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) + } +} + +/// Tree request - operations on the tree +#[derive(Archive, Serialize, Deserialize, Debug, Clone)] +pub enum TreeRequest { + ListNodes {}, + ListEndpoints {}, + ListLeaves {}, + GetInfo { path: String }, + Exec { cmd: String }, + StreamOpen { path: String }, + Resize { rows: u16, cols: u16 }, +} + +impl TreeRequest { + pub fn to_bytes(&self) -> Vec { + rkyv::to_bytes::(self).unwrap().into_vec() + } + + pub fn from_bytes(bytes: &[u8]) -> Result { + unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) + } +} + +/// Tree response - results from tree operations +#[derive(Archive, Serialize, Deserialize, Debug, Clone)] +pub enum TreeResponse { + NodeList { names: Vec }, + EndpointList { endpoints: Vec }, + LeafList { leaves: Vec }, + NodeInfo { info: NodeInfo }, + ExecOutput { exit_code: i32, stdout: Vec, stderr: Vec }, + StreamOpened { stream_id: u16 }, +} + +impl TreeResponse { + pub fn to_bytes(&self) -> Vec { + rkyv::to_bytes::(self).unwrap().into_vec() + } + + pub fn from_bytes(bytes: &[u8]) -> Result { + unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) + } +} + +/// Information about an endpoint +#[derive(Archive, Serialize, Deserialize, Debug, Clone)] +pub struct EndpointInfo { + pub name: String, + pub path: String, + pub endpoint_type: EndpointType, +} + +/// Type of endpoint +#[derive(Archive, Serialize, Deserialize, Debug, Clone, Copy)] +#[repr(u8)] +pub enum EndpointType { + Leaf = 0x01, + Proxy = 0x02, + Stream = 0x03, +} + +/// Information about a node in the tree +#[derive(Archive, Serialize, Deserialize, Debug, Clone)] +pub struct NodeInfo { + pub path: String, + pub is_leaf: bool, + pub has_children: bool, + pub endpoints: Vec, +} + +/// Handshake message - sent when connecting +#[derive(Archive, Serialize, Deserialize, Debug, Clone)] +pub struct Handshake { + pub registered_paths: Vec, +} + +impl Handshake { + pub fn to_bytes(&self) -> Vec { + rkyv::to_bytes::(self).unwrap().into_vec() + } + + #[allow(dead_code)] + pub fn from_bytes(bytes: &[u8]) -> Result { + unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) + } +} + +/// Handshake acknowledgement - router's response to handshake +#[derive(Archive, Serialize, Deserialize, Debug, Clone)] +pub struct HandshakeAck { + pub accepted: bool, + pub assigned_base_path: String, +} + +impl HandshakeAck { + pub fn to_bytes(&self) -> Vec { + rkyv::to_bytes::(self).unwrap().into_vec() + } + + pub fn from_bytes(bytes: &[u8]) -> Result { + unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) + } +} \ No newline at end of file diff --git a/ush-treetest/src/tree/endpoint.rs b/ush-treetest/src/tree/endpoint.rs new file mode 100644 index 0000000..ff14154 --- /dev/null +++ b/ush-treetest/src/tree/endpoint.rs @@ -0,0 +1,46 @@ +//! # Tree Endpoint +//! +//! This module defines the Endpoint trait that all tree leaves must implement. +//! Endpoints handle requests and stream data for specific paths in the tree. + +use crate::protocol::{TreeRequest, TreeResponse, EndpointType}; +use std::string::String; + +/// Endpoint trait - implemented by all leaf handlers in the tree +/// +/// This trait is object-safe and must be Send + Sync to allow sharing across threads. +pub trait Endpoint: Send + Sync { + /// Handle a request and return a response + fn handle_request(&mut self, request: &TreeRequest, src_path: &str) -> Result; + + /// Called when a stream is opened to this endpoint + /// + /// Returns the stream ID if successful, None if rejected + fn on_stream_open(&mut self, stream_id: u16, src_path: &str) -> Option; + + /// Called when data is received on a stream + /// + /// Returns true if data was handled successfully + fn on_stream_data(&mut self, stream_id: u16, data: &[u8]) -> bool; + + /// Called when a stream is closed + fn on_stream_close(&mut self, stream_id: u16); + + /// Get the type of this endpoint + fn endpoint_type(&self) -> EndpointType; + + /// Get the name of this endpoint + fn name(&self) -> &str; +} + +/// Stream - represents an active stream between endpoints +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub struct Stream { + /// Unique identifier for this stream + pub stream_id: u16, + /// Destination path for the stream + pub dst_path: String, + /// Source path for the stream + pub src_path: String, +} \ No newline at end of file diff --git a/ush-treetest/src/tree/mod.rs b/ush-treetest/src/tree/mod.rs new file mode 100644 index 0000000..9673b66 --- /dev/null +++ b/ush-treetest/src/tree/mod.rs @@ -0,0 +1,412 @@ +//! # Tree Module +//! +//! This module implements the tree-based routing for the unshell protocol. +//! The tree structure maintains endpoints at paths and handles routing of +//! requests and streams to appropriate handlers. + +pub mod endpoint; +pub use endpoint::{Endpoint, Stream}; + +use crate::protocol::{EndpointInfo, FrameHeader, NodeInfo}; +use std::collections::BTreeMap; +use std::string::String; +use std::vec::Vec; +use std::boxed::Box; +use std::result::Result; +use std::sync::{Arc, Mutex}; + +/// A node in the tree - contains an optional endpoint and child nodes +pub struct Node { + endpoint: Option>>>, + children: BTreeMap, + streams: BTreeMap, + next_stream_id: u16, + path: String, +} + +impl Node { + /// Create a new node with the given path + pub fn new(path: &str) -> Self { + Self { + endpoint: None, + children: BTreeMap::new(), + streams: BTreeMap::new(), + next_stream_id: 1, + path: path.to_string(), + } + } + + /// Set the endpoint for this node + /// + /// Wraps the endpoint in Arc> for thread-safe sharing + pub fn set_endpoint(&mut self, endpoint: Box) { + self.endpoint = Some(Arc::new(Mutex::new(endpoint))); + } + + /// Add a child node with the given name + pub fn add_child(&mut self, name: &str, node: Node) { + self.children.insert(name.to_string(), node); + } + + /// Get names of all child nodes + pub fn child_names(&self) -> Vec { + self.children.keys().cloned().collect() + } + + /// Get all endpoints at this node and in children + pub fn endpoint_names(&self) -> Vec { + let mut endpoints = Vec::new(); + + if let Some(ref e) = self.endpoint { + if let Ok(ep) = e.lock() { + endpoints.push(EndpointInfo { + name: ep.name().to_string(), + path: self.path.clone(), + endpoint_type: ep.endpoint_type(), + }); + } + } + + for (name, child) in &self.children { + let mut child_endpoints = child.endpoint_names(); + for ep in &mut child_endpoints { + ep.path = format!("{}/{}", self.path, name); + endpoints.push(ep.clone()); + } + } + + endpoints + } + + /// Get all leaf paths (nodes with endpoint but no children) + pub fn leaf_paths(&self) -> Vec { + let mut paths = Vec::new(); + + if self.endpoint.is_some() && self.children.is_empty() { + paths.push(self.path.clone()); + } + + for (name, child) in &self.children { + let mut child_leaves = child.leaf_paths(); + for path in &mut child_leaves { + *path = format!("{}/{}", self.path, name); + paths.push(path.clone()); + } + } + + paths + } + + /// Get info about this node + pub fn node_info(&self) -> NodeInfo { + NodeInfo { + path: self.path.clone(), + is_leaf: self.endpoint.is_some() && self.children.is_empty(), + has_children: !self.children.is_empty(), + endpoints: self.endpoint_names().iter().map(|e| e.name.clone()).collect(), + } + } +} + +/// Tree structure for routing - contains the root node +pub struct Tree { + root: Node, +} + +impl Tree { + /// Create a new empty tree + pub fn new() -> Self { + Self { root: Node::new("/") } + } + + /// Add an endpoint at the given path + /// + /// # Arguments + /// * `path` - The path where to register the endpoint (e.g., "/shell", "/tty") + /// * `endpoint` - The endpoint to register + pub fn add_endpoint(&mut self, path: &str, endpoint: Box) { + let segments = path_segments(path); + + if segments.is_empty() { + self.root.set_endpoint(endpoint); + return; + } + + let mut current = &mut self.root; + let mut endpoint_opt: Option> = Some(endpoint); + + for (i, segment) in segments.iter().enumerate() { + let is_last = i == segments.len() - 1; + + if !current.children.contains_key(segment) { + let parent_path = if i == 0 { + String::from("/") + } else { + segments[..i].join("/") + }; + let new_path = if parent_path == "/" { + format!("/{}", segment) + } else { + format!("{}/{}", parent_path, segment) + }; + current.add_child(segment, Node::new(&new_path)); + } + + current = current.children.get_mut(segment).unwrap(); + + if is_last { + if let Some(ep) = endpoint_opt.take() { + current.set_endpoint(ep); + } + } + } + } + + /// Find the handler for a given path using longest-prefix matching + /// + /// Returns the endpoint and the matched path + pub fn find_handler(&self, path: &str) -> Option<(Arc>>, &str)> { + if path == "/" { + return self.root.endpoint.as_ref().map(|e| (e.clone(), "")); + } + + let segments = path_segments(path); + let mut current = &self.root; + let mut remaining = segments.as_slice(); + let mut handler_path = ""; + + while !remaining.is_empty() { + if let Some(child) = current.children.get(&remaining[0].to_string()) { + current = child; + remaining = &remaining[1..]; + handler_path = ¤t.path; + } else { + break; + } + } + + current.endpoint.as_ref().map(|e| (e.clone(), handler_path)) + } + + /// List child nodes at a given path + pub fn list_nodes(&self, path: &str) -> Result, String> { + let (_, matched_path) = self.find_handler(path) + .ok_or_else(|| format!("path not found: {}", path))?; + + let segments = path_segments(matched_path); + let mut current = &self.root; + + for segment in &segments { + if let Some(child) = current.children.get(segment) { + current = child; + } + } + + Ok(current.child_names()) + } + + /// List all endpoints at a given path + pub fn list_endpoints(&self, path: &str) -> Result, String> { + let (_, matched_path) = self.find_handler(path) + .ok_or_else(|| format!("path not found: {}", path))?; + + let segments = path_segments(matched_path); + let mut current = &self.root; + + for segment in &segments { + if let Some(child) = current.children.get(segment) { + current = child; + } + } + + Ok(current.endpoint_names()) + } + + /// List all leaf paths in the tree + pub fn list_leaves(&self) -> Vec { + self.root.leaf_paths() + } + + /// Get information about a node at the given path + pub fn get_info(&self, path: &str) -> Result { + let segments = path_segments(path); + let mut current = &self.root; + + for segment in &segments { + if let Some(child) = current.children.get(segment) { + current = child; + } else { + return Err(format!("path not found: {}", path)); + } + } + + Ok(current.node_info()) + } + + /// Open a stream to an endpoint at the given path + /// + /// # Arguments + /// * `path` - The path to open stream to + /// * `src_path` - The source path for the stream + /// + /// # Returns + /// The stream ID on success + pub fn open_stream(&mut self, path: &str, src_path: &str) -> Result { + // First find the handler and matched path + let (handler, matched_path) = self.find_handler(path) + .ok_or_else(|| format!("path not found: {}", path))?; + + let segments = path_segments(matched_path); + + // Collect segment names first, then use indices to navigate + // This avoids borrow issues by not holding references across operations + let mut path_indices: Vec = Vec::new(); + + { + let mut current = &self.root; + for segment in &segments { + if let Some(child) = current.children.get(segment) { + path_indices.push(segment.clone()); + current = child; + } else { + return Err(format!("node not found: {}", segment)); + } + } + } + + // Now navigate again with indices and get next_stream_id + let stream_id = { + let mut current = &mut self.root; + for segment in &path_indices { + current = current.children.get_mut(segment).unwrap(); + } + let sid = current.next_stream_id; + current.next_stream_id = current.next_stream_id.wrapping_add(1); + sid + }; + + // Call handler's on_stream_open with locked mutex + let stream_id = { + let mut handler = handler.lock().map_err(|e| e.to_string())?; + handler.on_stream_open(stream_id, src_path) + .ok_or_else(|| "endpoint rejected stream".to_string())? + }; + + // Store stream info in the node + { + let mut current = &mut self.root; + for segment in &path_indices { + current = current.children.get_mut(segment).unwrap(); + } + current.streams.insert(stream_id, Stream { + stream_id, + dst_path: path.to_string(), + src_path: src_path.to_string(), + }); + } + + Ok(stream_id) + } + + #[allow(dead_code)] + /// Find the index path to a node given segment names + fn find_node_index(&self, segments: &[String]) -> Result, String> { + let mut current = &self.root; + let mut path = Vec::new(); + + for segment in segments { + if let Some(child) = current.children.get(segment) { + path.push(segment.clone()); + current = child; + } else { + return Err(format!("segment not found: {}", segment)); + } + } + + Ok(path) + } + + /// Get a mutable reference to a node at the given path + #[allow(dead_code)] + fn get_node_mut(&mut self, path: &[String]) -> Result<&mut Node, String> { + let mut current = &mut self.root; + + for segment in path { + if let Some(child) = current.children.get_mut(segment) { + current = child; + } else { + return Err(format!("node not found: {}", segment)); + } + } + + Ok(current) + } + + /// Route stream data to the appropriate handler + pub fn route_stream_data(&mut self, header: &FrameHeader, data: &[u8]) -> Result<(), String> { + let stream_id = header.stream_id.ok_or("no stream_id")?; + + // Find the node containing this stream + fn find_stream_handler(node: &mut Node, sid: u16) -> Option>>> { + if node.streams.get(&sid).is_some() { + return node.endpoint.clone(); + } + + for child in node.children.values_mut() { + if let Some(h) = find_stream_handler(child, sid) { + return Some(h); + } + } + + None + } + + if let Some(handler) = find_stream_handler(&mut self.root, stream_id) { + if let Ok(mut h) = handler.lock() { + h.on_stream_data(stream_id, data); + } + } + + Ok(()) + } + + /// Close a stream + pub fn close_stream(&mut self, stream_id: u16) -> Result<(), String> { + fn find_and_close(node: &mut Node, sid: u16) -> bool { + if node.streams.remove(&sid).is_some() { + if let Some(ref ep) = node.endpoint { + if let Ok(mut h) = ep.lock() { + h.on_stream_close(sid); + } + return true; + } + } + + for child in node.children.values_mut() { + if find_and_close(child, sid) { + return true; + } + } + + false + } + + find_and_close(&mut self.root, stream_id) + .then_some(()) + .ok_or_else(|| format!("stream not found: {}", stream_id)) + } +} + +/// Split a path into segments +/// +/// # Example +/// ``` +/// assert_eq!(path_segments("/foo/bar"), vec!["foo", "bar"]); +/// assert_eq!(path_segments("/"), vec![]); +/// ``` +fn path_segments(path: &str) -> Vec { + path.split('/') + .filter(|s| !s.is_empty()) + .map(String::from) + .collect() +} \ No newline at end of file