From 02f2f20f9f5ed389a2004746e7487d45cb393fc5 Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Mon, 16 Feb 2026 10:52:27 -0700 Subject: [PATCH] spec.md --- Cargo.lock | 218 +++++++++++++++++++++- Cargo.toml | 3 + src/tree/SPEC.md | 392 ++++++++++++++++++++++++++++++++++++++++ src/tree/component.rs | 140 ++++++++++++++ src/tree/endpoint.rs | 6 +- src/tree/log.rs | 4 +- src/tree/message.rs | 135 ++++++++++++++ src/tree/mod.rs | 5 + src/tree/tcp/client.rs | 225 +++++++++++++++++++++++ src/tree/tcp/config.rs | 170 +++++++++++++++++ src/tree/tcp/mod.rs | 12 ++ src/tree/tcp/server.rs | 269 +++++++++++++++++++++++++++ ush-payload/src/main.rs | 166 ++++++++++------- 13 files changed, 1671 insertions(+), 74 deletions(-) create mode 100644 src/tree/SPEC.md create mode 100644 src/tree/component.rs create mode 100644 src/tree/message.rs create mode 100644 src/tree/tcp/client.rs create mode 100644 src/tree/tcp/config.rs create mode 100644 src/tree/tcp/mod.rs create mode 100644 src/tree/tcp/server.rs diff --git a/Cargo.lock b/Cargo.lock index 332045e..bad64ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -154,6 +154,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "anyhow" +version = "1.0.101" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" + [[package]] name = "arboard" version = "3.6.1" @@ -1204,6 +1210,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "foldhash" version = "0.2.0" @@ -1369,6 +1381,19 @@ dependencies = [ "wasip2", ] +[[package]] +name = "getrandom" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", + "wasip3", +] + [[package]] name = "gl_generator" version = "0.14.0" @@ -1489,13 +1514,22 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash 0.1.5", +] + [[package]] name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" dependencies = [ - "foldhash", + "foldhash 0.2.0", ] [[package]] @@ -1801,6 +1835,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + [[package]] name = "idna" version = "1.1.0" @@ -1843,7 +1883,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.16.1", + "serde", + "serde_core", ] [[package]] @@ -1967,6 +2009,12 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2db585e1d738fc771bf08a151420d3ed193d9d895a36df7f6f8a9456b911ddc" +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "libc" version = "0.2.180" @@ -2131,7 +2179,7 @@ dependencies = [ "cfg_aliases 0.2.1", "codespan-reporting", "half", - "hashbrown", + "hashbrown 0.16.1", "hexf-parse", "indexmap", "libm", @@ -2811,6 +2859,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn 2.0.114", +] + [[package]] name = "proc-macro-crate" version = "3.4.0" @@ -3201,6 +3259,12 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + [[package]] name = "serde" version = "1.0.228" @@ -3937,6 +4001,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "unshell" version = "0.0.0" @@ -3947,6 +4017,7 @@ dependencies = [ "serde_json", "static_init", "ush-obfuscate", + "uuid", ] [[package]] @@ -4068,6 +4139,18 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b672338555252d43fd2240c714dc444b8c6fb0a5c5335e65a07bba7742735ddb" +dependencies = [ + "getrandom 0.4.1", + "js-sys", + "serde_core", + "wasm-bindgen", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -4120,6 +4203,15 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "wasm-bindgen" version = "0.2.108" @@ -4179,6 +4271,40 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags 2.10.0", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + [[package]] name = "wayland-backend" version = "0.3.12" @@ -4367,7 +4493,7 @@ dependencies = [ "cfg-if", "cfg_aliases 0.2.1", "document-features", - "hashbrown", + "hashbrown 0.16.1", "log", "portable-atomic", "profiling", @@ -4392,7 +4518,7 @@ dependencies = [ "bytemuck", "cfg_aliases 0.2.1", "document-features", - "hashbrown", + "hashbrown 0.16.1", "indexmap", "log", "naga", @@ -4849,6 +4975,88 @@ name = "wit-bindgen" version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn 2.0.114", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.114", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags 2.10.0", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] [[package]] name = "writeable" diff --git a/Cargo.toml b/Cargo.toml index a71794c..3a92cf2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ obfuscate = ["ush-obfuscate/obfuscate"] chrono = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +uuid = { workspace = true } crossbeam-channel = "0.5.15" @@ -52,6 +53,8 @@ chrono = "0.4.42" serde = {version = "1.0.228", features = ["derive"]} serde_json = "1.0.145" +uuid = { version = "1.0", features = ["v4", "serde"] } + static_init = "1.0.4" toml = "0.9.9" diff --git a/src/tree/SPEC.md b/src/tree/SPEC.md new file mode 100644 index 0000000..e97e9ce --- /dev/null +++ b/src/tree/SPEC.md @@ -0,0 +1,392 @@ +# Tree Protocol Specification + +## Overview + +The Tree Protocol is a lightweight, extensible message format designed for hierarchical message routing in a modular C2 (Command & Control) system. It provides RPC-like interactions, streaming support, event notifications, and peer-to-peer pivoting capabilities. + +## Design Principles + +1. **Loose typing** - Actions and payloads are flexible JSON values, not strict enums +2. **Namespacing** - Use dot notation for action categorization (e.g., `rpc.call`, `stream.data`) +3. **Extensibility** - Add new capabilities without modifying core structure +4. **Simplicity** - Minimal required fields, optional metadata for flexibility + +## Message Structure + +```json +{ + "id": "uuid-string", + "source": "path/to/sender", + "target": "path/to/recipient", + "action": "action.name", + "payload": {}, + "routing": {}, + "meta": {} +} +``` + +### Field Definitions + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `id` | string | No | Unique message identifier for correlation | +| `source` | string/array | No | Origin path for responses | +| `target` | string/array | No | Destination path for routing | +| `action` | value | Yes | Operation to perform | +| `payload` | any | No | Data for the action | +| `routing` | object | No | P2P/pivoting metadata | +| `meta` | object | No | Extensible metadata | + +## Action System + +Actions are loose values - any JSON that components can interpret: + +```json +"action": "query" +"action": "rpc.call" +"action": "stream.open" +"action": {"name": "custom", "version": 1} +``` + +### Common Action Categories + +| Category | Prefix | Purpose | +|----------|--------|---------| +| Core | `query`, `create`, `delete`, `update` | CRUD operations | +| RPC | `rpc.call`, `rpc.response` | Remote procedure calls | +| Streams | `stream.open`, `stream.data`, `stream.close` | Bidirectional streams | +| Events | `subscribe`, `event` | Event notifications | +| Network | `connect`, `disconnect`, `forward` | Connection handling | + +## Payload Formats + +Payloads vary by action - components interpret them flexibly: + +### RPC Payload + +```json +{ + "action": "rpc.call", + "target": ["endpoints", "ep1", "components", "tcp-client"], + "payload": { + "method": "connect", + "params": {"address": "127.0.0.1", "port": 443} + } +} +``` + +```json +{ + "action": "rpc.response", + "payload": { + "success": true, + "result": {"connected": true} + } +} +``` + +### Stream Payload + +```json +{ + "action": "stream.open", + "payload": { + "channel": "stdio", + "mode": "bidirectional" + } +} +``` + +```json +{ + "action": "stream.data", + "payload": { + "channel": "stdout", + "data": "SGVsbG8gd29ybGQ=", // base64 encoded + "chunk": 0, + "total": 1 + } +} +``` + +```json +{ + "action": "stream.close", + "payload": { + "channel": "stdio", + "reason": "eof" + } +} +``` + +### Event Payload + +```json +{ + "action": "subscribe", + "target": ["endpoints", "ep1", "logs"], + "payload": { + "event": "new_entry", + "callback": "components/event-handler" + } +} +``` + +```json +{ + "action": "event", + "source": ["endpoints", "ep1", "logs"], + "payload": { + "event": "new_entry", + "data": "some log message" + } +} +``` + +### P2P Routing Payload + +```json +{ + "action": "forward", + "target": ["peers", "peer-2", "endpoints", "ep2"], + "routing": { + "via": "peer-1", + "hop": 1, + "max_hops": 3 + }, + "payload": {...} +} +``` + +## Response Pattern + +All responses use a generic format: + +```json +{ + "id": "req-123", + "action": "response", + "source": ["endpoints", "ep1"], + "payload": { + "success": true, + "result": {"connected": true}, + "error": null + } +} +``` + +```json +{ + "id": "req-123", + "action": "response", + "payload": { + "success": false, + "error": { + "code": 404, + "message": "not found" + } + } +} +``` + +## Path Format + +Paths can be represented in multiple ways: + +```json +"target": "components/tcp-client" +"target": ["components", "tcp-client"] +"target": ["endpoints", "ep1", "connections", "conn-1"] +``` + +## Message Types (Optional Wrapper) + +For transport-level distinction: + +| Type | Description | +|------|-------------| +| `req` | Request - expecting a response | +| `resp` | Response - reply to a request | +| `event` | Unsolicited notification | +| `stream` | Stream data message | + +Example with type wrapper: + +```json +{ + "id": "msg-123", + "type": "req", + "target": ["components", "tcp-client"], + "action": "rpc.call", + "payload": {"method": "connect", "params": {...}} +} +``` + +## Binary Framing (Optional) + +For bandwidth-constrained links: + +``` +[4 bytes: length][1 byte: flags][id (16 bytes, optional)][payload...] +``` + +- Length: Big-endian u32 +- Flags: 0x01 = compressed, 0x02 = encrypted +- ID: UUID v4 (optional, for correlation) + +## Examples + +### Full RPC Call + +```json +{ + "id": "req-123", + "source": "server", + "target": ["endpoints", "ep1", "components", "tcp-client"], + "action": "rpc.call", + "payload": { + "method": "connect", + "params": {"address": "127.0.0.1", "port": 8080} + } +} +``` + +### Response + +```json +{ + "id": "resp-123", + "source": ["endpoints", "ep1"], + "target": "server", + "action": "response", + "payload": { + "success": true, + "result": { + "connected": true, + "local_addr": "192.168.1.100:12345", + "remote_addr": "127.0.0.1:8080" + } + } +} +``` + +### Log Subscription + Events + +```json +{ + "id": "sub-1", + "source": "server", + "target": ["endpoints", "ep1", "logs"], + "action": "subscribe", + "payload": { + "event": "new_entry", + "callback": ["components", "log-forwarder"] + } +} +``` + +```json +{ + "id": "evt-1", + "source": ["endpoints", "ep1", "logs"], + "target": ["components", "log-forwarder"], + "action": "event", + "payload": { + "event": "new_entry", + "data": "2024-01-15 10:30:45 - Connection established" + } +} +``` + +### P2P Forward + +```json +{ + "id": "req-456", + "source": "server", + "target": ["endpoints", "ep2", "components", "shell"], + "action": "rpc.call", + "routing": { + "via": "peer-1", + "hop": 1, + "max_hops": 3 + }, + "payload": { + "method": "execute", + "params": {"command": "whoami"} + } +} +``` + +## Implementation Notes + +### Action Matching + +```rust +impl TreeMessage { + /// Check if action matches a pattern + pub fn action_is(&self, action: &str) -> bool { + match &self.action { + Value::String(s) => s == action || s.ends_with(&format!(".{}", action)), + _ => false, + } + } + + /// Get method name from RPC payload + pub fn get_method(&self) -> Option { + self.payload.get("method") + .and_then(|m| m.as_str()) + .map(String::from) + } + + /// Get stream channel from payload + pub fn get_channel(&self) -> Option { + self.payload.get("channel") + .and_then(|c| c.as_str()) + .map(String::from) + } +} +``` + +### Component Interpretation + +Components define their own action handlers: + +```rust +impl TreeElement for TcpClient { + fn send_message(&mut self, target: Value, message: Value) -> Value { + match message.get("method").and_then(|m| m.as_str()) { + Some("connect") => self.handle_connect(message), + Some("send") => self.handle_send(message), + Some("recv") => self.handle_recv(message), + _ => json!({"error": "unknown method"}), + } + } +} +``` + +## Comparison to Other Protocols + +| Aspect | Tree Protocol | OpenC2 | OST-C2 | Mythic | +|--------|--------------|--------|--------|--------| +| Typing | Loose/JSON | Strict enum | Binary enum | JSON | +| Actions | Namespaced strings | Fixed enum | Type/Code | Action strings | +| RPC | Yes | No | Limited | Yes | +| Streams | Yes | No | No | No | +| Events | Yes | No | No | Yes | +| P2P | Via paths | No | Yes | Via delegates | +| Extensibility | High | Medium | Low | Medium | + +## Extensibility Points + +1. **New actions**: Add any namespaced string - no code changes needed +2. **Payload structure**: Any JSON - components interpret as needed +3. **Meta field**: Add transport, timing, or custom metadata +4. **Routing field**: Add P2P-specific metadata as needed +5. **Namespacing**: Use prefixes like `openc2.`, `custom.`, `vendor.` for compatibility + +## Serialization + +- Default: JSON (human-readable, debuggable) +- Optional: CBOR (binary, compact) +- Custom: Any format via `meta.serialization` hint diff --git a/src/tree/component.rs b/src/tree/component.rs new file mode 100644 index 0000000..aa103d3 --- /dev/null +++ b/src/tree/component.rs @@ -0,0 +1,140 @@ +//! Component system for extensible modular architecture. +//! +//! Components are TreeElements that can be dynamically added to endpoints +//! and expose configuration and RPC methods. + +use serde_json::{json, Value}; + +use crate::tree::{Branch, TreeElement}; + +/// Trait for component lifecycle management +pub trait Component: Send + Sync { + /// Get the component's unique name + fn name(&self) -> &str; + + /// Get component status information + fn status(&self) -> Value; + + /// Initialize component with configuration + fn init(&mut self, config: Value) -> Result<(), String>; + + /// Shutdown component gracefully + fn shutdown(&mut self) -> Result<(), String>; +} + +/// Adapter to make any Component work as a TreeElement +pub struct ComponentWrapper { + component: Box, +} + +impl ComponentWrapper { + pub fn new(component: Box) -> Self { + Self { component } + } +} + +impl TreeElement for ComponentWrapper { + fn get_type(&self) -> Value { + serde_json::json!(["component", self.component.name()]) + } + + fn send_message(&mut self, target: Value, message: Value) -> Value { + match target { + Value::Null => { + if let Some(cmd) = message.as_str() { + match cmd { + "Status" => self.component.status(), + "Init" => { + // Would need config from message + json!({"error": "Init requires config payload"}) + } + "Shutdown" => match self.component.shutdown() { + Ok(_) => json!({"success": true}), + Err(e) => json!({"success": false, "error": e}), + }, + _ => json!({"error": "Unknown command"}), + } + } else { + json!({"error": "Invalid command"}) + } + } + _ => json!({"error": "Invalid target"}), + } + } +} + +/// Component registration and management +pub struct ComponentRegistry { + branch: Branch, +} + +impl ComponentRegistry { + pub fn new() -> Self { + Self { + branch: Branch::new("Components"), + } + } + + /// Register a new component (consumes the component) + pub fn register(&mut self, component: Box) -> Result<(), String> { + let name = component.name().to_string(); + + // Check if already exists by trying to get it + if self.branch.get_child(&name).is_some() { + return Err(format!("Component '{}' already registered", name)); + } + + let wrapper = ComponentWrapper::new(component); + self.branch.add_child(name, Box::new(wrapper)); + Ok(()) + } + + /// Get a component by name (via branch) + pub fn get(&mut self, name: &str) -> Option<&mut Box> { + self.branch.get_child(name) + } + + /// Remove a component + pub fn remove(&mut self, name: &str) -> bool { + // Note: This is tricky with current Branch API + // For now, just return false + let _ = name; + false + } + + /// List all component names + pub fn list(&self) -> Vec { + self.branch.children().keys().cloned().collect() + } + + /// Get the branch for tree integration + pub fn branch(&self) -> &Branch { + &self.branch + } + + /// Get mutable branch for tree integration + pub fn branch_mut(&mut self) -> &mut Branch { + &mut self.branch + } + + /// Shutdown all components + pub fn shutdown_all(&mut self) { + // Would need to iterate through and call shutdown on each + } +} + +impl Default for ComponentRegistry { + fn default() -> Self { + Self::new() + } +} + +impl TreeElement for ComponentRegistry { + fn get_type(&self) -> Value { + serde_json::json!("Components") + } + + fn send_message(&mut self, target: Value, message: Value) -> Value { + self.branch.send_message(target, message) + } +} diff --git a/src/tree/endpoint.rs b/src/tree/endpoint.rs index d8d0861..f967e39 100644 --- a/src/tree/endpoint.rs +++ b/src/tree/endpoint.rs @@ -4,10 +4,12 @@ //! - id: Read-only endpoint identifier //! - logs: Queue for log messages //! - connections: Container for peer connections +//! - components: Extensible component system (accessed via tree messages) use crossbeam_channel::Sender; use serde_json::{json, Value}; +use crate::tree::component::ComponentRegistry; use crate::tree::connection::{create_channel_pair, Connection, Connections}; use crate::tree::queue::Queue; use crate::tree::readonly::ReadOnly; @@ -27,11 +29,13 @@ impl EndpointManager { let logs = Queue::new(logs_sender.clone(), logs_receiver); let connections = Connections::new(); + let components = ComponentRegistry::new(); let mut branch = Branch::new(TYPE_ENDPOINT); branch.add_child("id", Box::new(ReadOnly::new(&endpoint_id, TYPE_ENDPOINT))); branch.add_child("logs", Box::new(logs)); branch.add_child("connections", Box::new(connections)); + branch.add_child("components", Box::new(components)); Self { branch, @@ -58,7 +62,7 @@ impl EndpointManager { let conn_b = Connection::new(id.clone(), peer_id, tx_local, rx_remote); if let Some(connections) = self.branch.get_child("connections") { - connections.send_message(Value::String(id), json!({ "Add": conn_a.id() })); + let _ = connections.send_message(Value::String(id), json!({ "Add": conn_a.id() })); } conn_b diff --git a/src/tree/log.rs b/src/tree/log.rs index 6ba0b4b..d076ca9 100644 --- a/src/tree/log.rs +++ b/src/tree/log.rs @@ -1,10 +1,10 @@ /// Implement logging for the manager use crossbeam_channel::{Receiver, Sender}; -use serde_json::{Value, json}; +use serde_json::{json, Value}; use crate::{ logger::{Logger, Record}, - tree::{Branch, TreeElement, symbols}, + tree::{symbols, Branch, TreeElement}, }; struct LoggerTX(Sender); diff --git a/src/tree/message.rs b/src/tree/message.rs new file mode 100644 index 0000000..2e991e0 --- /dev/null +++ b/src/tree/message.rs @@ -0,0 +1,135 @@ +//! TreeMessage - Serializable message format for network communication. +//! +//! This module defines the message structure used for all tree communications. +//! The format is designed to be simple, extensible, and protocol-agnostic. + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +/// Message types for tree communication +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum MessageType { + /// Request message - expecting a response + Req, + /// Response message - reply to a request + Resp, + /// Event message - unsolicited notification + Event, + /// Stream message - bidirectional data flow + Stream, +} + +/// Core message structure for all tree communications +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TreeMessage { + /// Unique identifier for message correlation + pub id: String, + /// Type of message (request, response, event, stream) + #[serde(rename = "type")] + pub msg_type: MessageType, + /// Target path in the tree (for routing) + #[serde(default)] + pub target: Vec, + /// Action to perform (Get, Set, Invoke, etc.) + #[serde(default)] + pub action: String, + /// Payload/data for the action + #[serde(default)] + pub payload: Value, + /// ID of the message this is a response to + #[serde(default, skip_serializing_if = "Option::is_none")] + pub response_to: Option, + /// Stream ID for streaming communications + #[serde(default, skip_serializing_if = "Option::is_none")] + pub stream_id: Option, +} + +impl TreeMessage { + /// Create a new request message + pub fn new_req(id: impl Into, target: Vec, action: impl Into) -> Self { + Self { + id: id.into(), + msg_type: MessageType::Req, + target, + action: action.into(), + payload: Value::Null, + response_to: None, + stream_id: None, + } + } + + /// Create a response message + pub fn new_resp(id: impl Into, response_to: impl Into, payload: Value) -> Self { + Self { + id: id.into(), + msg_type: MessageType::Resp, + target: vec![], + action: String::new(), + payload, + response_to: Some(response_to.into()), + stream_id: None, + } + } + + /// Create an event message + pub fn new_event(id: impl Into, target: Vec, payload: Value) -> Self { + Self { + id: id.into(), + msg_type: MessageType::Event, + target, + action: String::new(), + payload, + response_to: None, + stream_id: None, + } + } + + /// Create a stream message + pub fn new_stream(id: impl Into, stream_id: impl Into, payload: Value) -> Self { + Self { + id: id.into(), + msg_type: MessageType::Stream, + target: vec![], + action: String::new(), + payload, + response_to: None, + stream_id: Some(stream_id.into()), + } + } +} + +impl Default for TreeMessage { + fn default() -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + msg_type: MessageType::Req, + target: vec![], + action: String::new(), + payload: Value::Null, + response_to: None, + stream_id: None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_message_serialization() { + let msg = TreeMessage::new_req("test-1", vec!["a".to_string(), "b".to_string()], "Get"); + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains("\"type\":\"req\"")); + assert!(json.contains("\"target\":[\"a\",\"b\"]")); + assert!(json.contains("\"action\":\"Get\"")); + } + + #[test] + fn test_response_message() { + let msg = TreeMessage::new_resp("resp-1", "test-1", json!("value")); + assert_eq!(msg.msg_type, MessageType::Resp); + assert_eq!(msg.response_to, Some("test-1".to_string())); + } +} diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 6da36a4..d4a9175 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -4,15 +4,20 @@ //! a tree of messageable elements. Used for C2 communication and pivoting. pub mod branch; +pub mod component; pub mod connection; pub mod endpoint; pub mod log; +pub mod message; pub mod queue; pub mod readonly; pub mod symbols; +pub mod tcp; pub use branch::Branch; +pub use component::ComponentRegistry; pub use endpoint::EndpointManager; +pub use message::TreeMessage; pub use readonly::{ReadOnly, TreeVariable}; pub use symbols::*; diff --git a/src/tree/tcp/client.rs b/src/tree/tcp/client.rs new file mode 100644 index 0000000..e3cde75 --- /dev/null +++ b/src/tree/tcp/client.rs @@ -0,0 +1,225 @@ +//! TCP Client component for outbound connections. +//! +//! Provides a TreeElement for managing TCP client connections with +//! configuration, status queries, and reconnection support. + +use std::io::{Read, Write}; +use std::net::TcpStream; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use serde_json::{json, Value}; + +use crate::tree::component::Component; +use crate::tree::symbols; +use crate::tree::tcp::config::{ConnectionStatus, TcpClientConfig}; +use crate::tree::{Branch, TreeElement}; + +/// TCP Client component +pub struct TcpClient { + name: String, + config: TcpClientConfig, + status: ConnectionStatus, + stream: Option>>, + branch: Branch, +} + +impl TcpClient { + pub fn new(name: impl Into) -> Self { + let name = name.into(); + let mut branch = Branch::new("TCPClient"); + + // Add internal state branch + let state_branch = Branch::new("state"); + branch.add_child("state", Box::new(state_branch)); + + Self { + name: name.clone(), + config: TcpClientConfig::default(), + status: ConnectionStatus::disconnected(), + stream: None, + branch, + } + } + + /// Connect to the configured address + pub fn connect(&mut self) -> Result<(), String> { + let addr = format!("{}:{}", self.config.address, self.config.port); + + let stream = TcpStream::connect_timeout( + &addr + .parse() + .map_err(|e| format!("Invalid address: {}", e))?, + Duration::from_millis(self.config.timeout_ms), + ) + .map_err(|e| format!("Connection failed: {}", e))?; + + stream + .set_nonblocking(false) + .map_err(|e| format!("Failed to set blocking: {}", e))?; + + let local = stream + .local_addr() + .map(|a| a.to_string()) + .unwrap_or_default(); + let remote = stream + .peer_addr() + .map(|a| a.to_string()) + .unwrap_or_default(); + + self.status = ConnectionStatus::connected(remote, local); + self.stream = Some(Arc::new(Mutex::new(stream))); + + Ok(()) + } + + /// Disconnect from server + pub fn disconnect(&mut self) -> Result<(), String> { + self.stream = None; + self.status = ConnectionStatus::disconnected(); + Ok(()) + } + + /// Check if connected + pub fn is_connected(&self) -> bool { + self.status.connected + } + + /// Send data over the connection + pub fn send(&mut self, data: &[u8]) -> Result { + let stream = self.stream.as_ref().ok_or("Not connected")?; + + let mut stream = stream.lock().map_err(|e| format!("Lock failed: {}", e))?; + + stream + .write(data) + .map_err(|e| format!("Write failed: {}", e)) + } + + /// Receive data from the connection + pub fn recv(&mut self, buffer_size: usize) -> Result, String> { + let stream = self.stream.as_ref().ok_or("Not connected")?; + + let mut stream = stream.lock().map_err(|e| format!("Lock failed: {}", e))?; + + let mut buffer = vec![0u8; buffer_size]; + let n = stream + .read(&mut buffer) + .map_err(|e| format!("Read failed: {}", e))?; + + buffer.truncate(n); + Ok(buffer) + } + + /// Get current configuration + pub fn config(&self) -> &TcpClientConfig { + &self.config + } + + /// Get mutable configuration + pub fn config_mut(&mut self) -> &mut TcpClientConfig { + &mut self.config + } + + /// Get status as JSON + pub fn get_status(&self) -> Value { + json!({ + "connected": self.status.connected, + "remote_address": self.status.remote_address, + "local_address": self.status.local_address, + "config": { + "address": self.config.address, + "port": self.config.port, + } + }) + } +} + +impl Component for TcpClient { + fn name(&self) -> &str { + &self.name + } + + fn status(&self) -> Value { + self.get_status() + } + + fn init(&mut self, config: Value) -> Result<(), String> { + self.config = + serde_json::from_value(config).map_err(|e| format!("Invalid config: {}", e))?; + self.connect()?; + Ok(()) + } + + fn shutdown(&mut self) -> Result<(), String> { + self.disconnect() + } +} + +impl TreeElement for TcpClient { + fn get_type(&self) -> Value { + json!("TCPClient") + } + + fn send_message(&mut self, target: Value, message: Value) -> Value { + match target { + Value::Null => { + if let Some(cmd) = message.as_str() { + match cmd { + "Connect" => match self.connect() { + Ok(_) => json!({"success": true}), + Err(e) => json!({"success": false, "error": e}), + }, + "Disconnect" => match self.disconnect() { + Ok(_) => json!({"success": true}), + Err(e) => json!({"success": false, "error": e}), + }, + "Status" => self.get_status(), + symbols::CMD_GET_CHILDREN => { + let children = self + .branch + .children() + .keys() + .map(|k| json!(k)) + .collect::>(); + json!(children) + } + _ => json!(symbols::ERR_UNSUPPORTED_METHOD), + } + } else if let Value::Object(obj) = message { + // Handle configuration changes + if let Some(config) = obj.get("config") { + match serde_json::from_value(config.clone()) { + Ok(cfg) => { + self.config = cfg; + json!({"success": true}) + } + Err(e) => json!({"success": false, "error": e.to_string()}), + } + } else { + json!(symbols::ERR_INVALID_COMMAND) + } + } else { + json!(symbols::ERR_INVALID_COMMAND) + } + } + Value::String(subtarget) => { + match subtarget.as_str() { + "config" => { + // Return or modify configuration + json!(self.config) + } + "state" => { + // Return connection state + json!({ + "connected": self.status.connected, + "remote": self.status.remote_address, + }) + } + _ => json!(symbols::ERR_CHILD_NOT_FOUND), + } + } + _ => json!(symbols::ERR_INVALID_TARGET), + } + } +} diff --git a/src/tree/tcp/config.rs b/src/tree/tcp/config.rs new file mode 100644 index 0000000..4dfc911 --- /dev/null +++ b/src/tree/tcp/config.rs @@ -0,0 +1,170 @@ +//! TCP configuration structures for network components. + +use serde::{Deserialize, Serialize}; + +/// Configuration for TCP client connections +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TcpClientConfig { + /// Remote IP address or hostname + pub address: String, + /// Remote port number + pub port: u16, + /// Connection timeout in milliseconds + #[serde(default = "default_timeout")] + pub timeout_ms: u64, + /// Enable automatic reconnection + #[serde(default)] + pub auto_reconnect: bool, + /// Reconnection delay in seconds + #[serde(default = "default_reconnect_delay")] + pub reconnect_delay_secs: u64, +} + +fn default_timeout() -> u64 { + 5000 +} +fn default_reconnect_delay() -> u64 { + 5 +} + +impl Default for TcpClientConfig { + fn default() -> Self { + Self { + address: "127.0.0.1".to_string(), + port: 8080, + timeout_ms: 5000, + auto_reconnect: false, + reconnect_delay_secs: 5, + } + } +} + +impl TcpClientConfig { + pub fn new(address: impl Into, port: u16) -> Self { + Self { + address: address.into(), + port, + ..Default::default() + } + } +} + +/// Configuration for TCP server listeners +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TcpServerConfig { + /// Local IP address to bind to + #[serde(default = "default_bind_address")] + pub bind_address: String, + /// Local port to listen on + pub port: u16, + /// Maximum number of concurrent connections + #[serde(default = "default_max_connections")] + pub max_connections: u32, + /// Connection timeout in milliseconds + #[serde(default = "default_timeout")] + pub timeout_ms: u64, +} + +fn default_bind_address() -> String { + "0.0.0.0".to_string() +} +fn default_max_connections() -> u32 { + 10 +} + +impl Default for TcpServerConfig { + fn default() -> Self { + Self { + bind_address: "0.0.0.0".to_string(), + port: 8080, + max_connections: 10, + timeout_ms: 5000, + } + } +} + +impl TcpServerConfig { + pub fn new(port: u16) -> Self { + Self { + port, + ..Default::default() + } + } + + pub fn bind_address(mut self, addr: impl Into) -> Self { + self.bind_address = addr.into(); + self + } +} + +/// Connection status information +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConnectionStatus { + pub connected: bool, + pub remote_address: Option, + pub local_address: Option, + pub bytes_sent: u64, + pub bytes_received: u64, + pub connected_at: Option, +} + +impl ConnectionStatus { + pub fn disconnected() -> Self { + Self { + connected: false, + remote_address: None, + local_address: None, + bytes_sent: 0, + bytes_received: 0, + connected_at: None, + } + } + + pub fn connected(remote: impl Into, local: impl Into) -> Self { + Self { + connected: true, + remote_address: Some(remote.into()), + local_address: Some(local.into()), + bytes_sent: 0, + bytes_received: 0, + connected_at: Some( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + ), + } + } +} + +/// Server listener status +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ListenerStatus { + pub listening: bool, + pub bind_address: String, + pub port: u16, + pub active_connections: usize, + pub total_connections: u64, +} + +impl ListenerStatus { + pub fn stopped(addr: impl Into, port: u16) -> Self { + Self { + listening: false, + bind_address: addr.into(), + port, + active_connections: 0, + total_connections: 0, + } + } + + pub fn listening(addr: impl Into, port: u16, connections: usize, total: u64) -> Self { + Self { + listening: true, + bind_address: addr.into(), + port, + active_connections: connections, + total_connections: total, + } + } +} diff --git a/src/tree/tcp/mod.rs b/src/tree/tcp/mod.rs new file mode 100644 index 0000000..323efd0 --- /dev/null +++ b/src/tree/tcp/mod.rs @@ -0,0 +1,12 @@ +//! TCP networking components for tree-based communication. +//! +//! This module provides TCP client and server components that can be +//! added to endpoints for network communication. + +pub mod client; +pub mod config; +pub mod server; + +pub use client::TcpClient; +pub use config::{ConnectionStatus, ListenerStatus, TcpClientConfig, TcpServerConfig}; +pub use server::TcpServer; diff --git a/src/tree/tcp/server.rs b/src/tree/tcp/server.rs new file mode 100644 index 0000000..5ae3574 --- /dev/null +++ b/src/tree/tcp/server.rs @@ -0,0 +1,269 @@ +//! TCP Server component for inbound connections. +//! +//! Provides a TreeElement for managing TCP server listeners with +//! configuration, status queries, and connection management. + +use std::collections::HashMap; +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use serde_json::{json, Value}; + +use crate::tree::component::Component; +use crate::tree::symbols; +use crate::tree::tcp::config::{ListenerStatus, TcpServerConfig}; +use crate::tree::{Branch, TreeElement}; + +/// A connected client managed by the server +struct ManagedClient { + id: String, + stream: TcpStream, + peer_addr: String, +} + +impl ManagedClient { + fn new(id: String, stream: TcpStream) -> Self { + let peer_addr = stream + .peer_addr() + .map(|a| a.to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + + Self { + id, + stream, + peer_addr, + } + } + + fn send(&mut self, data: &[u8]) -> Result { + self.stream + .write(data) + .map_err(|e| format!("Write failed: {}", e)) + } + + fn recv(&mut self, buffer_size: usize) -> Result, String> { + let mut buffer = vec![0u8; buffer_size]; + let _ = self.stream.set_read_timeout(Some(Duration::from_secs(1))); + + match self.stream.read(&mut buffer) { + Ok(0) => Err("Connection closed".to_string()), + Ok(n) => { + buffer.truncate(n); + Ok(buffer) + } + Err(e) if e.kind() == std::io::ErrorKind::TimedOut => Ok(vec![]), + Err(e) => Err(format!("Read failed: {}", e)), + } + } + + fn peer_address(&self) -> &str { + &self.peer_addr + } +} + +/// TCP Server component +pub struct TcpServer { + name: String, + config: TcpServerConfig, + status: ListenerStatus, + listener: Option, + clients: HashMap>>, + total_connections: u64, + branch: Branch, +} + +impl TcpServer { + pub fn new(name: impl Into) -> Self { + let name = name.into(); + + Self { + name: name.clone(), + config: TcpServerConfig::default(), + status: ListenerStatus::stopped("0.0.0.0", 0), + listener: None, + clients: HashMap::new(), + total_connections: 0, + branch: Branch::new("TCPServer"), + } + } + + /// Start listening for connections + pub fn listen(&mut self) -> Result<(), String> { + let addr = format!("{}:{}", self.config.bind_address, self.config.port); + + let listener = TcpListener::bind(&addr).map_err(|e| format!("Bind failed: {}", e))?; + + listener + .set_nonblocking(true) + .map_err(|e| format!("Failed to set non-blocking: {}", e))?; + + self.listener = Some(listener); + self.status = ListenerStatus::listening( + &self.config.bind_address, + self.config.port, + 0, + self.total_connections, + ); + + Ok(()) + } + + /// Stop listening + pub fn stop(&mut self) -> Result<(), String> { + self.listener = None; + self.clients.clear(); + self.status = ListenerStatus::stopped(&self.config.bind_address, self.config.port); + Ok(()) + } + + /// Accept a new connection (non-blocking) + pub fn accept(&mut self) -> Option<(String, TcpStream)> { + let listener = self.listener.as_ref()?; + + match listener.accept() { + Ok((stream, _addr)) => { + self.total_connections += 1; + let id = format!("conn-{}", self.total_connections); + Some((id, stream)) + } + Err(_) => None, + } + } + + /// Register an accepted connection + pub fn register_client(&mut self, id: String, stream: TcpStream) { + let client_id = id.clone(); + self.clients.insert( + id, + Arc::new(Mutex::new(ManagedClient::new(client_id, stream))), + ); + } + + /// Disconnect a client + pub fn disconnect_client(&mut self, id: &str) -> Result<(), String> { + self.clients + .remove(id) + .ok_or_else(|| format!("Client '{}' not found", id))?; + Ok(()) + } + + /// Send to a specific client + pub fn send_to(&mut self, client_id: &str, data: &[u8]) -> Result { + let client = self + .clients + .get(client_id) + .ok_or_else(|| format!("Client '{}' not found", client_id))?; + + let mut client = client.lock().map_err(|e| format!("Lock failed: {}", e))?; + + client.send(data) + } + + /// Check if listening + pub fn is_listening(&self) -> bool { + self.status.listening + } + + /// Get current configuration + pub fn config(&self) -> &TcpServerConfig { + &self.config + } + + /// Get mutable configuration + pub fn config_mut(&mut self) -> &mut TcpServerConfig { + &mut self.config + } + + /// Get status as JSON + pub fn get_status(&self) -> Value { + let client_list: Vec = self.clients.keys().map(|k| json!(k)).collect(); + + json!({ + "listening": self.status.listening, + "bind_address": self.config.bind_address, + "port": self.config.port, + "active_connections": self.clients.len(), + "total_connections": self.total_connections, + "clients": client_list, + }) + } +} + +impl Component for TcpServer { + fn name(&self) -> &str { + &self.name + } + + fn status(&self) -> Value { + self.get_status() + } + + fn init(&mut self, config: Value) -> Result<(), String> { + self.config = + serde_json::from_value(config).map_err(|e| format!("Invalid config: {}", e))?; + self.listen()?; + Ok(()) + } + + fn shutdown(&mut self) -> Result<(), String> { + self.stop() + } +} + +impl TreeElement for TcpServer { + fn get_type(&self) -> Value { + json!("TCPServer") + } + + fn send_message(&mut self, target: Value, message: Value) -> Value { + match target { + Value::Null => { + if let Some(cmd) = message.as_str() { + match cmd { + "Listen" | "Start" => match self.listen() { + Ok(_) => json!({"success": true}), + Err(e) => json!({"success": false, "error": e}), + }, + "Stop" => match self.stop() { + Ok(_) => json!({"success": true}), + Err(e) => json!({"success": false, "error": e}), + }, + "Status" => self.get_status(), + symbols::CMD_GET_CHILDREN => { + let children = self + .branch + .children() + .keys() + .map(|k| json!(k)) + .collect::>(); + json!(children) + } + _ => json!(symbols::ERR_UNSUPPORTED_METHOD), + } + } else if let Value::Object(obj) = message { + if let Some(config) = obj.get("config") { + match serde_json::from_value(config.clone()) { + Ok(cfg) => { + self.config = cfg; + json!({"success": true}) + } + Err(e) => json!({"success": false, "error": e.to_string()}), + } + } else { + json!(symbols::ERR_INVALID_COMMAND) + } + } else { + json!(symbols::ERR_INVALID_COMMAND) + } + } + Value::String(subtarget) => match subtarget.as_str() { + "config" => json!(self.config), + "status" => self.get_status(), + _ => json!(symbols::ERR_CHILD_NOT_FOUND), + }, + _ => json!(symbols::ERR_INVALID_TARGET), + } + } +} diff --git a/ush-payload/src/main.rs b/ush-payload/src/main.rs index 389ac8f..8ad4a6e 100644 --- a/ush-payload/src/main.rs +++ b/ush-payload/src/main.rs @@ -1,91 +1,125 @@ -use unshell::{info, tree::Branch}; +//! TCP Chain CLI Test Harness +//! +//! Demonstrates multi-layer TCP connections for testing pivoting. +//! Creates a chain of endpoints connected via TCP. + +use std::io::{Read, Write}; +use std::net::TcpListener; +use std::sync::mpsc; +use std::thread; + +use unshell::tree::{EndpointManager, TreeMessage}; fn main() { - let mut manager = Branch::new(); - manager.init_logger(); + println!("=== TCP Chain Test Harness ===\n"); - info!("Test thing!"); - info!("Test thing!"); + // Test 1: Local TCP Server-Client loopback + test_tcp_loopback(); - // loop { - // if test123(&mut manager) { - // break; - // } - // } + // Test 2: Tree message routing + test_tree_message(); - // println!("Test"); + // Test 3: TreeMessage serialization + test_message_serialization(); + + println!("\n=== All tests complete ==="); } -// use std::{any::Any, collections::HashMap, fs::File, io::Read}; +/// Test basic TCP server/client communication +fn test_tcp_loopback() { + println!("[Test 1] TCP Loopback Test"); -// use static_init::dynamic; -// use unshell_lib::{ -// ModuleError, -// config::{PayloadConfig, RuntimeConfig}, -// module::{Manager, Module}, -// }; -// use unshell_obfuscate::{obs, symbol}; + // Start a TCP server in a thread + let (tx, rx) = mpsc::channel(); -// #[macro_use] -// extern crate unshell_lib; + let server_thread = thread::spawn(move || { + let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind"); + let addr = listener.local_addr().unwrap(); + tx.send(addr.port()).unwrap(); -// // The main and initial 'configuration' for a payload + for stream in listener.incoming() { + if let Ok(mut stream) = stream { + let mut buf = [0u8; 1024]; + if let Ok(n) = stream.read(&mut buf) { + let response = b"Echo: "; + let _ = stream.write(response); + let _ = stream.write(&buf[..n]); + let _ = stream.flush(); + } + } + } + }); -// #[dynamic] -// static PAYLOAD_CONFIG: PayloadConfig = PayloadConfig { -// id: symbol!("Test ID"), -// components: Vec::new(), -// runtime_config: vec![RuntimeConfig { -// parent_component: symbol!("client").to_string(), -// name: symbol!("client runtime").to_string(), -// config: HashMap::from([ -// (symbol!("host").to_string(), obs!("localhost:1234")), -// (symbol!("retry").to_string(), obs!("1000")), -// ]), -// }], -// }; + let port = rx.recv().unwrap(); -// fn main() { + // Connect client + let mut stream = + std::net::TcpStream::connect(format!("127.0.0.1:{}", port)).expect("Failed to connect"); -// debug!("Initialized"); + let msg = b"Hello from client!"; + stream.write(msg).expect("Failed to write"); -// match run() { -// Ok(_) => {} -// Err(e) => { -// error!("ERROR! '{:?}'", e); -// } -// } -// } + let mut buf = [0u8; 1024]; + let n = stream.read(&mut buf).expect("Failed to read"); + let response = String::from_utf8_lossy(&buf[..n]); -// fn run() -> Result<(), Box> { -// let args = std::env::args(); + println!(" Client sent: {:?}", msg); + println!(" Server responded: {:?}", response); -// // TEMPORARY, load the module paths from command line args. -// let mut modules = Vec::new(); -// for arg in args.skip(1) { -// // debug!("Loading module: {}", arg); + server_thread.join().unwrap(); + println!(" ✓ Loopback test passed\n"); +} -// // let mut file = File::open(arg).map_err(|e| ModuleError::Error(e.to_string().into()))?; -// // let mut buffer = Vec::new(); -// // file.read_to_end(&mut buffer) -// // .map_err(|e| ModuleError::Error(e.to_string().into()))?; +/// Test the tree message routing +fn test_tree_message() { + println!("[Test 2] Tree Message Routing"); -// debug!("Initializing module: {}", arg); -// let module = Module::new(&arg)?; + let mut endpoint = EndpointManager::new("endpoint-1"); -// modules.push(module); + // Test GetChildren + let response = endpoint + .branch_mut() + .send_message(serde_json::Value::Null, serde_json::json!("GetChildren")); -// // modules.push(Module::new(&arg)?) -// } + let children = response.as_object().unwrap(); + println!(" Children: {:?}", children.keys().collect::>()); -// // let modules = vec + // Test ID access + let response = endpoint + .branch_mut() + .send_message(serde_json::json!("id"), serde_json::Value::Null); + println!(" Endpoint ID: {:?}", response); -// debug!("Starting manager..."); + // Test logs queue + let sender = endpoint.logs_sender().clone(); + sender.send(serde_json::json!("Test log entry")).unwrap(); -// // Run the manager, this is blocking. -// let manager = Manager::start(&PAYLOAD_CONFIG, modules); + let response = endpoint + .branch_mut() + .send_message(serde_json::json!("logs"), serde_json::json!("GetLength")); + println!(" Log queue length: {:?}", response); -// Manager::join(manager); + println!(" ✓ Tree message test passed\n"); +} -// Ok(()) -// } +/// Test TreeMessage serialization +fn test_message_serialization() { + println!("[Test 3] TreeMessage Serialization"); + + let msg = TreeMessage::new_req( + "msg-1", + vec!["endpoint1".to_string(), "shell".to_string()], + "Get", + ); + + let json = serde_json::to_string_pretty(&msg).unwrap(); + println!(" Message: {}", json); + + // Test response message + let resp = TreeMessage::new_resp("resp-1", "msg-1", serde_json::json!("result data")); + + let json = serde_json::to_string_pretty(&resp).unwrap(); + println!(" Response: {}", json); + + println!(" ✓ Message serialization test passed\n"); +}