diff --git a/src/tree/branch.rs b/src/tree/branch.rs index 26ed80c..b017731 100644 --- a/src/tree/branch.rs +++ b/src/tree/branch.rs @@ -1,9 +1,14 @@ +//! Branch - A TreeElement with child elements for hierarchical routing. + use std::collections::HashMap; use serde_json::{json, Value}; use crate::tree::symbols; +use crate::tree::TreeElement; +/// A branch node in the tree that can contain child elements. +/// Supports path-based routing for multi-hop communication (pivoting). pub struct Branch { children: HashMap>, branch_type: &'static str, @@ -101,5 +106,3 @@ impl TreeElement for Branch { self.handle_local_message(target, message) } } - -use crate::tree::TreeElement; diff --git a/src/tree/connection.rs b/src/tree/connection.rs new file mode 100644 index 0000000..97f2669 --- /dev/null +++ b/src/tree/connection.rs @@ -0,0 +1,161 @@ +//! Connection management for peer-to-peer communication between endpoints. +//! Uses crossbeam channels to simulate bidirectional TCP-like connections. + +use std::collections::HashMap; + +use crossbeam_channel::{Receiver, Sender}; +use serde_json::{json, Value}; + +use crate::tree::symbols::{self, TYPE_CONNECTION, TYPE_CONNECTIONS}; +use crate::tree::{Branch, TreeElement}; + +/// A bidirectional connection to another endpoint. +/// Wraps sender/receiver channels for message passing. +pub struct Connection { + id: String, + peer_id: String, + sender: Sender, + receiver: Receiver, +} + +impl Connection { + pub fn new( + id: String, + peer_id: String, + sender: Sender, + receiver: Receiver, + ) -> Self { + Self { + id, + peer_id, + sender, + receiver, + } + } + + pub fn id(&self) -> &str { + &self.id + } + + pub fn peer_id(&self) -> &str { + &self.peer_id + } + + pub fn send(&self, message: Value) { + let _ = self.sender.send(message); + } + + pub fn try_recv(&self) -> Option { + self.receiver.try_recv().ok() + } + + pub fn recv(&self) -> Option { + self.receiver.recv().ok() + } +} + +impl TreeElement for Connection { + fn get_type(&self) -> Value { + json!(TYPE_CONNECTION) + } + + fn send_message(&mut self, target: Value, message: Value) -> Value { + match target { + Value::Null => { + if let Some(cmd) = message.as_str() { + match cmd { + "Send" => json!(symbols::ERR_MISSING_ARGS), + "Recv" => self.recv().unwrap_or(json!(Value::Null)), + "GetPeerId" => json!(self.peer_id), + symbols::CMD_GET_LENGTH => json!(0), + _ => json!(symbols::ERR_UNSUPPORTED_METHOD), + } + } else { + json!(symbols::ERR_INVALID_COMMAND) + } + } + Value::String(cmd) if cmd == "Send" => json!(symbols::ERR_MISSING_ARGS), + _ => json!(symbols::ERR_INVALID_TARGET), + } + } +} + +/// Container for managing multiple connections. +pub struct Connections { + connections: HashMap, + branch: Branch, +} + +impl Connections { + pub fn new() -> Self { + Self { + connections: HashMap::new(), + branch: Branch::new(TYPE_CONNECTIONS), + } + } + + pub fn add(&mut self, id: String, connection: Connection) { + self.connections.insert(id.clone(), connection); + self.branch + .add_child(id.clone(), Box::new(ConnectionStub { id })); + } + + pub fn get(&mut self, id: &str) -> Option<&mut Connection> { + self.connections.get_mut(id) + } + + pub fn remove(&mut self, id: &str) -> Option { + self.connections.remove(id) + } + + pub fn branch(&self) -> &Branch { + &self.branch + } + + pub fn branch_mut(&mut self) -> &mut Branch { + &mut self.branch + } +} + +impl Default for Connections { + fn default() -> Self { + Self::new() + } +} + +impl TreeElement for Connections { + fn get_type(&self) -> Value { + self.branch.get_type() + } + + fn send_message(&mut self, target: Value, message: Value) -> Value { + self.branch.send_message(target, message) + } +} + +struct ConnectionStub { + #[allow(dead_code)] + id: String, +} + +impl TreeElement for ConnectionStub { + fn get_type(&self) -> Value { + json!(TYPE_CONNECTION) + } + + fn send_message(&mut self, _target: Value, _message: Value) -> Value { + json!(symbols::ERR_UNSUPPORTED_METHOD) + } +} + +/// Creates a pair of connected channels for simulating TCP connections. +/// Returns ((sender_a, receiver_a), (sender_b, receiver_b)). +/// Messages sent on sender_a are received on receiver_b and vice versa. +pub fn create_channel_pair() -> ( + (Sender, Receiver), + (Sender, Receiver), +) { + let (tx1, rx1) = crossbeam_channel::unbounded::(); + let (tx2, rx2) = crossbeam_channel::unbounded::(); + ((tx1, rx2), (tx2, rx1)) +} diff --git a/src/tree/endpoint.rs b/src/tree/endpoint.rs new file mode 100644 index 0000000..d8d0861 --- /dev/null +++ b/src/tree/endpoint.rs @@ -0,0 +1,170 @@ +//! EndpointManager - Root element for endpoint management. +//! +//! Provides a standardized tree structure for all endpoints with: +//! - id: Read-only endpoint identifier +//! - logs: Queue for log messages +//! - connections: Container for peer connections + +use crossbeam_channel::Sender; +use serde_json::{json, Value}; + +use crate::tree::connection::{create_channel_pair, Connection, Connections}; +use crate::tree::queue::Queue; +use crate::tree::readonly::ReadOnly; +use crate::tree::symbols::TYPE_ENDPOINT; +use crate::tree::{Branch, TreeElement}; + +pub struct EndpointManager { + branch: Branch, + logs_sender: Sender, +} + +impl EndpointManager { + pub fn new(endpoint_id: impl Into) -> Self { + let endpoint_id = endpoint_id.into(); + + let (logs_sender, logs_receiver) = crossbeam_channel::unbounded(); + let logs = Queue::new(logs_sender.clone(), logs_receiver); + + let connections = Connections::new(); + + let mut branch = Branch::new(TYPE_ENDPOINT); + branch.add_child("id", Box::new(ReadOnly::new(&endpoint_id, TYPE_ENDPOINT))); + branch.add_child("logs", Box::new(logs)); + branch.add_child("connections", Box::new(connections)); + + Self { + branch, + logs_sender, + } + } + + pub fn logs_sender(&self) -> &Sender { + &self.logs_sender + } + + pub fn branch(&self) -> &Branch { + &self.branch + } + + pub fn branch_mut(&mut self) -> &mut Branch { + &mut self.branch + } + + pub fn add_connection(&mut self, id: String, peer_id: String) -> Connection { + let ((tx_local, rx_remote), (tx_remote, rx_local)) = create_channel_pair(); + + let conn_a = Connection::new(id.clone(), peer_id.clone(), tx_remote, rx_local); + let conn_b = Connection::new(id.clone(), peer_id, tx_local, rx_remote); + + if let Some(connections) = self.branch.get_child("connections") { + connections.send_message(Value::String(id), json!({ "Add": conn_a.id() })); + } + + conn_b + } +} + +impl TreeElement for EndpointManager { + fn get_type(&self) -> Value { + json!(TYPE_ENDPOINT) + } + + fn send_message(&mut self, target: Value, message: Value) -> Value { + self.branch.send_message(target, message) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_endpoint_id() { + let mut endpoint = EndpointManager::new("test-endpoint-1"); + + let response = endpoint.branch.send_message(json!("id"), json!(null)); + + assert_eq!(response, json!("test-endpoint-1")); + } + + #[test] + fn test_endpoint_get_children() { + let mut endpoint = EndpointManager::new("test-endpoint"); + + let response = endpoint + .branch + .send_message(json!(null), json!("GetChildren")); + + let children = response.as_object().unwrap(); + assert!(children.contains_key("id")); + assert!(children.contains_key("logs")); + assert!(children.contains_key("connections")); + } + + #[test] + fn test_logs_queue() { + let mut endpoint = EndpointManager::new("test-endpoint"); + let sender = endpoint.logs_sender.clone(); + + sender.send(json!("log message 1")).unwrap(); + sender.send(json!("log message 2")).unwrap(); + + let response = endpoint + .branch + .send_message(json!("logs"), json!("GetLength")); + + assert_eq!(response, json!(2)); + } + + #[test] + fn test_logs_read() { + let mut endpoint = EndpointManager::new("test-endpoint"); + let sender = endpoint.logs_sender.clone(); + + sender.send(json!("log message 1")).unwrap(); + sender.send(json!("log message 2")).unwrap(); + + let response1 = endpoint.branch.send_message(json!("logs"), json!("Get")); + let response2 = endpoint.branch.send_message(json!("logs"), json!("Get")); + + assert_eq!(response1, json!("log message 1")); + assert_eq!(response2, json!("log message 2")); + } + + #[test] + fn test_simulated_tcp_connection() { + let ((tx_a_to_b, rx_a_to_b), (tx_b_to_a, rx_b_to_a)) = create_channel_pair(); + + let conn_a = Connection::new( + "conn-1".to_string(), + "endpoint-b".to_string(), + tx_a_to_b, + rx_b_to_a, + ); + + let conn_b = Connection::new( + "conn-1".to_string(), + "endpoint-a".to_string(), + tx_b_to_a, + rx_a_to_b, + ); + + conn_a.send(json!("Hello from A")); + + let response = conn_b.recv(); + assert_eq!(response, Some(json!("Hello from A"))); + } + + #[test] + fn test_pivot_routing() { + let mut endpoint_gateway = EndpointManager::new("gateway"); + + let response = endpoint_gateway + .branch + .send_message(json!(["connections", "internal"]), json!("GetChildren")); + + assert!(response.is_string()); + } +} diff --git a/src/tree/log.rs b/src/tree/log.rs index b277882..6ba0b4b 100644 --- a/src/tree/log.rs +++ b/src/tree/log.rs @@ -1,28 +1,24 @@ /// Implement logging for the manager use crossbeam_channel::{Receiver, Sender}; -use serde_json::{json, Value}; +use serde_json::{Value, json}; use crate::{ logger::{Logger, Record}, - tree::{symbols, Tree, TreeElement}, + tree::{Branch, TreeElement, symbols}, }; struct LoggerTX(Sender); struct LoggerRX(Receiver); -impl Tree { +impl Branch { /// Initiate the unshell logger for the local binary, piped through the manager /// This will allow access to the logs through the tree pub fn init_logger(&mut self) { - // Create the logger through the TX element of the manager - let (tx, rx) = crossbeam_channel::unbounded(); let (tx, rx) = (LoggerTX(tx), LoggerRX(rx)); - // Set the logger through unshell crate::logger::set_logger_box(Box::new(tx)); - // Add the logger to the tree - self.add_element(symbols::LOGGER.to_string(), Box::new(rx)); + self.add_child(symbols::LOGGER.to_string(), Box::new(rx)); } } diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 76b271b..6da36a4 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -1,80 +1,25 @@ -use std::collections::HashMap; +//! Tree system for hierarchical message routing between endpoints. +//! +//! The tree provides a modular IPC mechanism where components expose +//! a tree of messageable elements. Used for C2 communication and pivoting. -use serde_json::{json, Value}; - -mod branch; +pub mod branch; +pub mod connection; +pub mod endpoint; pub mod log; +pub mod queue; +pub mod readonly; pub mod symbols; pub use branch::Branch; +pub use endpoint::EndpointManager; +pub use readonly::{ReadOnly, TreeVariable}; -pub trait TreeElement { +pub use symbols::*; + +use serde_json::Value; + +pub trait TreeElement: Send + Sync { fn get_type(&self) -> Value; fn send_message(&mut self, target: Value, message: Value) -> Value; } - -pub struct Tree { - elements: HashMap>, -} - -impl Tree { - pub fn new() -> Self { - Self { - elements: HashMap::new(), - } - } - - pub fn add_element(&mut self, name: String, element: Box) { - self.elements.insert(name, element); - } -} - -impl TreeElement for Tree { - fn get_type(&self) -> Value { - json!(symbols::TYPE_TREE) - } - fn send_message(&mut self, target: Value, message: Value) -> Value { - match target { - Value::Null => { - if let Some(message) = message.as_str() { - match message { - symbols::CMD_GET_CHILDREN => { - let children = self - .elements - .iter() - .map(|(k, v)| (Value::String(k.clone()), v.get_type())) - .collect::>(); - json!(children) - } - _ => json!(symbols::ERR_UNSUPPORTED_METHOD), - } - } else { - json!(symbols::ERR_UNSUPPORTED_METHOD) - } - } - Value::Array(mut path) => { - if path.is_empty() { - return json!(symbols::ERR_INVALID_PATH); - } - let next = path.remove(0); - if let Value::String(next_name) = next { - if let Some(child) = self.elements.get_mut(&next_name) { - child.send_message(Value::Array(path), message) - } else { - json!(symbols::ERR_CHILD_NOT_FOUND) - } - } else { - json!(symbols::ERR_INVALID_PATH) - } - } - Value::String(target) => { - if let Some(child) = self.elements.get_mut(&target) { - child.send_message(Value::Null, message) - } else { - json!(symbols::ERR_CHILD_NOT_FOUND) - } - } - _ => json!(symbols::ERR_INVALID_TARGET), - } - } -} diff --git a/src/tree/queue.rs b/src/tree/queue.rs new file mode 100644 index 0000000..831f365 --- /dev/null +++ b/src/tree/queue.rs @@ -0,0 +1,67 @@ +//! Queue - A TreeElement wrapper around crossbeam channels for message queuing. + +use crossbeam_channel::{Receiver, Sender}; +use serde_json::{json, Value}; + +use crate::tree::symbols::{self, TYPE_QUEUE}; +use crate::tree::TreeElement; + +/// Generic queue wrapping crossbeam channels. +/// Provides Get, Poll, and GetLength commands via the tree interface. +pub struct Queue { + sender: Sender, + receiver: Receiver, +} + +impl Queue { + pub fn new(sender: Sender, receiver: Receiver) -> Self { + Self { sender, receiver } + } + + pub fn channel() -> (Sender, Self) { + let (tx, rx) = crossbeam_channel::unbounded(); + let queue = Self::new(tx.clone(), rx); + (tx, queue) + } + + pub fn sender(&self) -> &Sender { + &self.sender + } + + pub fn len(&self) -> usize { + self.receiver.len() + } + + pub fn is_empty(&self) -> bool { + self.receiver.is_empty() + } + + pub fn try_recv(&self) -> Option { + self.receiver.try_recv().ok() + } + + pub fn recv(&self) -> Option { + self.receiver.recv().ok() + } +} + +impl TreeElement for Queue { + fn get_type(&self) -> Value { + json!(TYPE_QUEUE) + } + + fn send_message(&mut self, target: Value, message: Value) -> Value { + match (target, message) { + (Value::Null, Value::String(cmd)) => match cmd.as_ref() { + symbols::CMD_GET => self.recv().map(|v| json!(v)).unwrap_or(json!(Value::Null)), + symbols::CMD_POLL => self + .try_recv() + .map(|v| json!(v)) + .unwrap_or(json!(Value::Null)), + symbols::CMD_GET_LENGTH => json!(self.receiver.len()), + _ => json!(symbols::ERR_INVALID_COMMAND), + }, + _ => json!(symbols::ERR_INVALID_TARGET), + } + } +} diff --git a/src/tree/readonly.rs b/src/tree/readonly.rs new file mode 100644 index 0000000..6357eaa --- /dev/null +++ b/src/tree/readonly.rs @@ -0,0 +1,111 @@ +//! TreeVariable - A TreeElement with getters and setters. +//! +//! ReadOnly - A wrapper around TreeVariable that ignores setters. + +use serde_json::{json, Value}; + +use crate::tree::symbols; +use crate::tree::TreeElement; + +/// A variable with getters and setters exposed through the tree. +pub struct TreeVariable { + value: String, + value_type: &'static str, +} + +impl TreeVariable { + pub fn new(value: impl Into, value_type: &'static str) -> Self { + Self { + value: value.into(), + value_type, + } + } + + pub fn get(&self) -> &str { + &self.value + } + + pub fn set(&mut self, value: impl Into) { + self.value = value.into(); + } +} + +impl TreeElement for TreeVariable { + fn get_type(&self) -> Value { + json!(self.value_type) + } + + fn send_message(&mut self, target: Value, message: Value) -> Value { + match target { + Value::Null => { + if let Some(cmd) = message.as_str() { + match cmd { + symbols::CMD_GET => json!(self.value.clone()), + "Set" => json!(symbols::ERR_MISSING_ARGS), + _ => json!(symbols::ERR_UNSUPPORTED_METHOD), + } + } else { + json!(symbols::ERR_INVALID_COMMAND) + } + } + Value::String(cmd) if cmd == "Set" => { + if let Some(new_value) = message.as_str() { + self.value = new_value.to_string(); + json!(true) + } else { + json!(symbols::ERR_INVALID_COMMAND) + } + } + _ => json!(symbols::ERR_INVALID_TARGET), + } + } +} + +/// A read-only wrapper around TreeVariable that ignores setters. +pub struct ReadOnly { + inner: TreeVariable, +} + +impl ReadOnly { + pub fn new(value: impl Into, value_type: &'static str) -> Self { + Self { + inner: TreeVariable::new(value, value_type), + } + } + + pub fn get(&self) -> &str { + self.inner.get() + } + + pub fn inner(&self) -> &TreeVariable { + &self.inner + } + + pub fn inner_mut(&mut self) -> &mut TreeVariable { + &mut self.inner + } +} + +impl TreeElement for ReadOnly { + fn get_type(&self) -> Value { + self.inner.get_type() + } + + fn send_message(&mut self, target: Value, message: Value) -> Value { + match target { + Value::Null => { + if let Some(cmd) = message.as_str() { + match cmd { + symbols::CMD_GET => json!(self.inner.get()), + "Set" => json!(symbols::ERR_READONLY), + _ => json!(symbols::ERR_UNSUPPORTED_METHOD), + } + } else { + json!(symbols::ERR_INVALID_COMMAND) + } + } + Value::String(cmd) if cmd == "Set" => json!(symbols::ERR_READONLY), + _ => json!(symbols::ERR_INVALID_TARGET), + } + } +} diff --git a/src/tree/symbols.rs b/src/tree/symbols.rs index 8c83e24..8b4f680 100644 --- a/src/tree/symbols.rs +++ b/src/tree/symbols.rs @@ -4,6 +4,9 @@ pub const LOGGER: &'static str = symbol!("Logger"); pub const TYPE_TREE: &'static str = symbol!("Tree"); pub const TYPE_QUEUE: &'static str = symbol!("Queue"); +pub const TYPE_ENDPOINT: &'static str = symbol!("Endpoint"); +pub const TYPE_CONNECTIONS: &'static str = symbol!("Connections"); +pub const TYPE_CONNECTION: &'static str = symbol!("Connection"); pub const CMD_GET: &'static str = symbol!("Get"); pub const CMD_POLL: &'static str = symbol!("Poll"); @@ -16,3 +19,6 @@ pub const ERR_INVALID_CHILD: &'static str = symbol!("InvalidChild"); pub const ERR_INVALID_TARGET: &'static str = symbol!("InvalidTarget"); pub const ERR_CHILD_NOT_FOUND: &'static str = symbol!("ChildNotFound"); pub const ERR_INVALID_PATH: &'static str = symbol!("InvalidPath"); +pub const ERR_MISSING_ARGS: &'static str = symbol!("MissingArgs"); +pub const ERR_INVALID_STATE: &'static str = symbol!("InvalidState"); +pub const ERR_READONLY: &'static str = symbol!("ReadOnly"); diff --git a/ush-payload/src/main.rs b/ush-payload/src/main.rs index e37c2bd..389ac8f 100644 --- a/ush-payload/src/main.rs +++ b/ush-payload/src/main.rs @@ -1,7 +1,7 @@ -use unshell::{info, tree::Tree}; +use unshell::{info, tree::Branch}; fn main() { - let mut manager = Tree::new(); + let mut manager = Branch::new(); manager.init_logger(); info!("Test thing!");