From d99fa340de033892e26254aa8f8e10703e4c6ef3 Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Mon, 16 Feb 2026 11:21:44 -0700 Subject: [PATCH] Prompt 2 --- Cargo.lock | 2 + Cargo.toml | 2 + src/tree/branch.rs | 15 ++ src/tree/component.rs | 199 +++++++++++--- src/tree/message.rs | 248 ++++++++++++----- src/tree/mod.rs | 1 + src/tree/protocols/base64.rs | 126 +++++++++ src/tree/protocols/http.rs | 223 ++++++++++++++++ src/tree/protocols/mod.rs | 42 +++ src/tree/protocols/stack.rs | 503 +++++++++++++++++++++++++++++++++++ src/tree/tcp/client.rs | 274 ++++++++++++++++--- src/tree/tcp/config.rs | 4 +- src/tree/tcp/server.rs | 331 +++++++++++++++++++++-- ush-payload/src/main.rs | 208 +++++++++++++-- 14 files changed, 1993 insertions(+), 185 deletions(-) create mode 100644 src/tree/protocols/base64.rs create mode 100644 src/tree/protocols/http.rs create mode 100644 src/tree/protocols/mod.rs create mode 100644 src/tree/protocols/stack.rs diff --git a/Cargo.lock b/Cargo.lock index bad64ac..e3d135b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4011,11 +4011,13 @@ checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" name = "unshell" version = "0.0.0" dependencies = [ + "base64", "chrono", "crossbeam-channel", "serde", "serde_json", "static_init", + "thiserror 2.0.18", "ush-obfuscate", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index 3a92cf2..59dc2cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,8 @@ serde_json = { workspace = true } uuid = { workspace = true } crossbeam-channel = "0.5.15" +thiserror = "2.0" +base64 = "0.22" ush-obfuscate = { path = "./ush-obfuscate" } static_init.workspace = true diff --git a/src/tree/branch.rs b/src/tree/branch.rs index b017731..8de9dde 100644 --- a/src/tree/branch.rs +++ b/src/tree/branch.rs @@ -14,6 +14,21 @@ pub struct Branch { branch_type: &'static str, } +impl Default for Branch { + fn default() -> Self { + Self::new("default") + } +} + +impl std::fmt::Debug for Branch { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Branch") + .field("branch_type", &self.branch_type) + .field("children", &self.children.keys().collect::>()) + .finish() + } +} + impl Branch { pub fn new(branch_type: &'static str) -> Self { Self { diff --git a/src/tree/component.rs b/src/tree/component.rs index aa103d3..5c99666 100644 --- a/src/tree/component.rs +++ b/src/tree/component.rs @@ -31,34 +31,51 @@ impl ComponentWrapper { pub fn new(component: Box) -> Self { Self { component } } + + fn handle_rpc(&mut self, payload: &Value) -> Value { + let method = match payload.get("method").and_then(|m| m.as_str()) { + Some(m) => m, + None => return json!({"success": false, "error": "missing method"}), + }; + + match method { + "status" => self.component.status(), + "shutdown" => match self.component.shutdown() { + Ok(_) => json!({"success": true}), + Err(e) => json!({"success": false, "error": e}), + }, + _ => json!({"success": false, "error": format!("unknown method: {}", method)}), + } + } } impl TreeElement for ComponentWrapper { fn get_type(&self) -> Value { - serde_json::json!(["component", self.component.name()]) + json!(["component", self.component.name()]) } - fn send_message(&mut self, target: Value, message: Value) -> Value { - match target { - Value::Null => { - if let Some(cmd) = message.as_str() { - match cmd { - "Status" => self.component.status(), - "Init" => { - // Would need config from message - json!({"error": "Init requires config payload"}) - } - "Shutdown" => match self.component.shutdown() { - Ok(_) => json!({"success": true}), - Err(e) => json!({"success": false, "error": e}), - }, - _ => json!({"error": "Unknown command"}), - } - } else { - json!({"error": "Invalid command"}) - } + fn send_message(&mut self, _target: Value, message: Value) -> Value { + // Handle RPC call format + if let Some(obj) = message.as_object() { + if let Some(_) = obj.get("method") { + return self.handle_rpc(&message); } - _ => json!({"error": "Invalid target"}), + } + + // Legacy string commands + if let Some(cmd) = message.as_str() { + match cmd { + "Status" => self.component.status(), + "Init" => json!({"error": "Init requires config payload"}), + "Shutdown" => match self.component.shutdown() { + Ok(_) => json!({"success": true}), + Err(e) => json!({"success": false, "error": e}), + }, + "GetChildren" => json!([self.component.name()]), + _ => json!({"error": "Unknown command"}), + } + } else { + json!({"error": "Invalid command"}) } } } @@ -75,11 +92,9 @@ impl ComponentRegistry { } } - /// Register a new component (consumes the component) pub fn register(&mut self, component: Box) -> Result<(), String> { let name = component.name().to_string(); - // Check if already exists by trying to get it if self.branch.get_child(&name).is_some() { return Err(format!("Component '{}' already registered", name)); } @@ -89,37 +104,63 @@ impl ComponentRegistry { Ok(()) } - /// Get a component by name (via branch) + pub fn register_element(&mut self, name: impl Into, element: Box) { + self.branch.add_child(name.into(), element); + } + pub fn get(&mut self, name: &str) -> Option<&mut Box> { self.branch.get_child(name) } - /// Remove a component - pub fn remove(&mut self, name: &str) -> bool { - // Note: This is tricky with current Branch API - // For now, just return false - let _ = name; + pub fn has(&self, name: &str) -> bool { + self.branch.children().contains_key(name) + } + + pub fn remove(&mut self, _name: &str) -> bool { false } - /// List all component names pub fn list(&self) -> Vec { self.branch.children().keys().cloned().collect() } - /// Get the branch for tree integration pub fn branch(&self) -> &Branch { &self.branch } - /// Get mutable branch for tree integration pub fn branch_mut(&mut self) -> &mut Branch { &mut self.branch } - /// Shutdown all components - pub fn shutdown_all(&mut self) { - // Would need to iterate through and call shutdown on each + pub fn send_to_component(&mut self, component_name: &str, message: Value) -> Value { + if let Some(component) = self.branch.get_child(component_name) { + component.send_message(json!(null), message) + } else { + let err_msg = format!("Component '{}' not found", component_name); + json!({"error": err_msg}) + } + } + + pub fn broadcast(&mut self, message: Value) -> Vec<(String, Value)> { + let names: Vec = self.branch.children().keys().cloned().collect(); + + names + .iter() + .filter_map(|name| { + if let Some(component) = self.branch.get_child(name) { + Some(( + name.clone(), + component.send_message(json!(null), message.clone()), + )) + } else { + None + } + }) + .collect() + } + + pub fn shutdown_all(&mut self) -> Vec<(String, Result<(), String>)> { + Vec::new() } } @@ -131,10 +172,96 @@ impl Default for ComponentRegistry { impl TreeElement for ComponentRegistry { fn get_type(&self) -> Value { - serde_json::json!("Components") + json!("Components") } fn send_message(&mut self, target: Value, message: Value) -> Value { + // Handle RPC-style component access + if let Some(target_str) = target.as_str() { + if target_str.starts_with("rpc.") { + let component_name = target_str.strip_prefix("rpc.").unwrap_or(target_str); + return self.send_to_component(component_name, message); + } + } + self.branch.send_message(target, message) } } + +/// Helper trait for convenient component registration +pub trait IntoComponent: Component + Sized { + fn into_boxed(self) -> Box; +} + +impl IntoComponent for T { + fn into_boxed(self) -> Box { + Box::new(self) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + struct TestComponent { + name: String, + value: i32, + } + + impl TestComponent { + fn new(name: &str) -> Self { + Self { + name: name.to_string(), + value: 0, + } + } + } + + impl Component for TestComponent { + fn name(&self) -> &str { + &self.name + } + + fn status(&self) -> Value { + json!({"name": self.name, "value": self.value}) + } + + fn init(&mut self, config: Value) -> Result<(), String> { + if let Some(v) = config.get("value").and_then(|v| v.as_i64()) { + self.value = v as i32; + } + Ok(()) + } + + fn shutdown(&mut self) -> Result<(), String> { + Ok(()) + } + } + + #[test] + fn test_component_registry() { + let mut registry = ComponentRegistry::new(); + + let comp = Box::new(TestComponent::new("test")); + registry.register(comp).unwrap(); + + assert!(registry.has("test")); + assert!(!registry.has("other")); + + let list = registry.list(); + assert_eq!(list, vec!["test"]); + } + + #[test] + fn test_rpc_call() { + let mut registry = ComponentRegistry::new(); + + let comp = Box::new(TestComponent::new("test")); + registry.register(comp).unwrap(); + + let result = registry.send_to_component("test", json!({"method": "status"})); + + let obj = result.as_object().unwrap(); + assert!(obj.contains_key("name")); + } +} diff --git a/src/tree/message.rs b/src/tree/message.rs index 2e991e0..5df93cf 100644 --- a/src/tree/message.rs +++ b/src/tree/message.rs @@ -2,11 +2,12 @@ //! //! This module defines the message structure used for all tree communications. //! The format is designed to be simple, extensible, and protocol-agnostic. +//! Based on SPEC.md - supports RPC, streams, events, and P2P pivoting. use serde::{Deserialize, Serialize}; use serde_json::Value; -/// Message types for tree communication +/// Message types for transport-level distinction #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "lowercase")] pub enum MessageType { @@ -20,96 +21,198 @@ pub enum MessageType { Stream, } -/// Core message structure for all tree communications +impl Default for MessageType { + fn default() -> Self { + Self::Req + } +} + +/// Core message structure for all tree communications. +/// +/// This structure follows SPEC.md with loose typing for extensibility. +/// All fields are optional except where noted. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TreeMessage { /// Unique identifier for message correlation - pub id: String, - /// Type of message (request, response, event, stream) - #[serde(rename = "type")] - pub msg_type: MessageType, - /// Target path in the tree (for routing) - #[serde(default)] - pub target: Vec, - /// Action to perform (Get, Set, Invoke, etc.) - #[serde(default)] - pub action: String, - /// Payload/data for the action + /// Used to match requests with responses + #[serde(default, skip_serializing_if = "Option::is_none")] + pub id: Option, + + /// Origin path for routing responses + #[serde(default, skip_serializing_if = "Option::is_none")] + pub source: Option, + + /// Destination path for routing + #[serde(default, skip_serializing_if = "Option::is_none")] + pub target: Option, + + /// Operation to perform (e.g., "rpc.call", "stream.data", "event") + /// This is the primary field - must be present + pub action: Value, + + /// Data for the action - interpretation depends on action type #[serde(default)] pub payload: Value, + + /// P2P/pivoting metadata + #[serde(default, skip_serializing_if = "Option::is_none")] + pub routing: Option, + + /// Extensible metadata (timing, transport hints, etc.) + #[serde(default, skip_serializing_if = "Option::is_none")] + pub meta: Option, + + /// Type wrapper for transport-level distinction + #[serde(default, rename = "type")] + pub msg_type: MessageType, + /// ID of the message this is a response to #[serde(default, skip_serializing_if = "Option::is_none")] pub response_to: Option, + /// Stream ID for streaming communications #[serde(default, skip_serializing_if = "Option::is_none")] pub stream_id: Option, } impl TreeMessage { - /// Create a new request message - pub fn new_req(id: impl Into, target: Vec, action: impl Into) -> Self { + /// Create a new request message with minimal fields + pub fn new(action: impl Into) -> Self { Self { - id: id.into(), - msg_type: MessageType::Req, - target, + id: Some(uuid::Uuid::new_v4().to_string()), + source: None, + target: None, action: action.into(), payload: Value::Null, + routing: None, + meta: None, + msg_type: MessageType::Req, response_to: None, stream_id: None, } } + /// Create a new request with target path + pub fn to_target(mut self, target: impl Into) -> Self { + self.target = Some(target.into()); + self + } + + /// Create a new request with source path + pub fn from_source(mut self, source: impl Into) -> Self { + self.source = Some(source.into()); + self + } + + /// Create a new request with payload + pub fn with_payload(mut self, payload: impl Into) -> Self { + self.payload = payload.into(); + self + } + + /// Create a new request with routing info + pub fn with_routing(mut self, routing: impl Into) -> Self { + self.routing = Some(routing.into()); + self + } + + /// Create a new request with metadata + pub fn with_meta(mut self, meta: impl Into) -> Self { + self.meta = Some(meta.into()); + self + } + /// Create a response message - pub fn new_resp(id: impl Into, response_to: impl Into, payload: Value) -> Self { - Self { - id: id.into(), - msg_type: MessageType::Resp, - target: vec![], - action: String::new(), - payload, - response_to: Some(response_to.into()), - stream_id: None, - } + pub fn response(mut self, response_to: impl Into) -> Self { + self.msg_type = MessageType::Resp; + self.response_to = Some(response_to.into()); + self.id = Some(uuid::Uuid::new_v4().to_string()); + self } /// Create an event message - pub fn new_event(id: impl Into, target: Vec, payload: Value) -> Self { - Self { - id: id.into(), - msg_type: MessageType::Event, - target, - action: String::new(), - payload, - response_to: None, - stream_id: None, - } + pub fn event(mut self) -> Self { + self.msg_type = MessageType::Event; + self } /// Create a stream message - pub fn new_stream(id: impl Into, stream_id: impl Into, payload: Value) -> Self { - Self { - id: id.into(), - msg_type: MessageType::Stream, - target: vec![], - action: String::new(), - payload, - response_to: None, - stream_id: Some(stream_id.into()), + pub fn stream(mut self, stream_id: impl Into) -> Self { + self.msg_type = MessageType::Stream; + self.stream_id = Some(stream_id.into()); + self + } + + /// Check if action matches a pattern (simple string or namespaced) + pub fn action_is(&self, action: &str) -> bool { + match &self.action { + Value::String(s) => s == action || s.ends_with(&format!(".{}", action)), + _ => false, } } + + /// Get method name from RPC payload + pub fn get_method(&self) -> Option { + self.payload + .get("method") + .and_then(|m| m.as_str()) + .map(String::from) + } + + /// Get stream channel from payload + pub fn get_channel(&self) -> Option { + self.payload + .get("channel") + .and_then(|c| c.as_str()) + .map(String::from) + } + + /// Get target as a path vector + pub fn get_target_path(&self) -> Option> { + self.target.as_ref().and_then(|t| match t { + Value::String(s) => Some(s.split('/').map(String::from).collect()), + Value::Array(arr) => { + let mut path = Vec::new(); + for item in arr { + if let Some(s) = item.as_str() { + path.push(s.to_string()); + } + } + if path.len() == arr.len() { + Some(path) + } else { + None + } + } + _ => None, + }) + } + + /// Get source as a path vector + pub fn get_source_path(&self) -> Option> { + self.source.as_ref().and_then(|s| match s { + Value::String(st) => Some(st.split('/').map(String::from).collect()), + Value::Array(arr) => { + let mut path = Vec::new(); + for item in arr { + if let Some(st) = item.as_str() { + path.push(st.to_string()); + } + } + if path.len() == arr.len() { + Some(path) + } else { + None + } + } + _ => None, + }) + } } impl Default for TreeMessage { fn default() -> Self { - Self { - id: uuid::Uuid::new_v4().to_string(), - msg_type: MessageType::Req, - target: vec![], - action: String::new(), - payload: Value::Null, - response_to: None, - stream_id: None, - } + Self::new("query") } } @@ -119,17 +222,38 @@ mod tests { #[test] fn test_message_serialization() { - let msg = TreeMessage::new_req("test-1", vec!["a".to_string(), "b".to_string()], "Get"); + let msg = TreeMessage::new("query") + .to_target(vec!["a", "b"]) + .with_payload(json!({"key": "value"})); let json = serde_json::to_string(&msg).unwrap(); - assert!(json.contains("\"type\":\"req\"")); + assert!(json.contains("\"action\":\"query\"")); assert!(json.contains("\"target\":[\"a\",\"b\"]")); - assert!(json.contains("\"action\":\"Get\"")); } #[test] - fn test_response_message() { - let msg = TreeMessage::new_resp("resp-1", "test-1", json!("value")); + fn test_action_matching() { + let msg = TreeMessage::new("rpc.call"); + assert!(msg.action_is("rpc.call")); + assert!(msg.action_is("call")); + assert!(!msg.action_is("stream.data")); + } + + #[test] + fn test_rpc_payload() { + let msg = TreeMessage::new("rpc.call") + .to_target(["components", "tcp-client"]) + .with_payload(json!({ + "method": "connect", + "params": {"address": "127.0.0.1", "port": 443} + })); + + assert_eq!(msg.get_method(), Some("connect".to_string())); + } + + #[test] + fn test_response() { + let msg = TreeMessage::new("response").with_payload(json!({"success": true, "result": {}})); + assert_eq!(msg.msg_type, MessageType::Resp); - assert_eq!(msg.response_to, Some("test-1".to_string())); } } diff --git a/src/tree/mod.rs b/src/tree/mod.rs index d4a9175..239f3e0 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -9,6 +9,7 @@ pub mod connection; pub mod endpoint; pub mod log; pub mod message; +pub mod protocols; pub mod queue; pub mod readonly; pub mod symbols; diff --git a/src/tree/protocols/base64.rs b/src/tree/protocols/base64.rs new file mode 100644 index 0000000..e6041f7 --- /dev/null +++ b/src/tree/protocols/base64.rs @@ -0,0 +1,126 @@ +//! Base64 encoding/decoding protocol. + +use crate::tree::protocols::stack::{Base64Config, Protocol, ProtocolError}; +use serde_json::Value; + +/// Base64 encoding protocol +pub struct Base64Protocol { + config: Base64Config, +} + +impl Base64Protocol { + pub fn new(config: Base64Config) -> Self { + Self { config } + } +} + +impl Protocol for Base64Protocol { + fn name(&self) -> &'static str { + "base64" + } + + fn encode(&self, data: &[u8]) -> Result, ProtocolError> { + let encoded = if self.config.url_safe { + base64::Engine::encode(&base64::engine::general_purpose::URL_SAFE_NO_PAD, data) + } else if self.config.padding { + base64::Engine::encode(&base64::engine::general_purpose::STANDARD, data) + } else { + base64::Engine::encode(&base64::engine::general_purpose::URL_SAFE_NO_PAD, data) + }; + + Ok(encoded.into_bytes()) + } + + fn decode(&self, data: &[u8]) -> Result, ProtocolError> { + let data_str = String::from_utf8(data.to_vec()) + .map_err(|e| ProtocolError::DecodeError(e.to_string()))?; + + let decoded = if self.config.url_safe { + base64::Engine::decode(&base64::engine::general_purpose::URL_SAFE_NO_PAD, &data_str) + } else if self.config.padding { + base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &data_str) + } else { + // Try standard first, then URL-safe + base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &data_str).or_else( + |_| { + base64::Engine::decode( + &base64::engine::general_purpose::URL_SAFE_NO_PAD, + &data_str, + ) + }, + ) + }; + + decoded.map_err(|e| ProtocolError::DecodeError(e.to_string())) + } + + fn status(&self) -> Value { + serde_json::json!({ + "protocol": "base64", + "url_safe": self.config.url_safe, + "padding": self.config.padding, + }) + } +} + +/// Identity (pass-through) protocol +pub struct IdentityProtocol; + +impl IdentityProtocol { + pub fn new() -> Self { + Self + } +} + +impl Default for IdentityProtocol { + fn default() -> Self { + Self::new() + } +} + +impl Protocol for IdentityProtocol { + fn name(&self) -> &'static str { + "identity" + } + + fn encode(&self, data: &[u8]) -> Result, ProtocolError> { + Ok(data.to_vec()) + } + + fn decode(&self, data: &[u8]) -> Result, ProtocolError> { + Ok(data.to_vec()) + } + + fn status(&self) -> Value { + serde_json::json!({ + "protocol": "identity", + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_base64_encode() { + let proto = Base64Protocol::new(Default::default()); + let data = b"Hello, World!"; + let encoded = proto.encode(data).unwrap(); + let decoded = proto.decode(&encoded).unwrap(); + assert_eq!(decoded, data); + } + + #[test] + fn test_base64_url_safe() { + let proto = Base64Protocol::new(Base64Config { + url_safe: true, + padding: false, + }); + let data = b"test+/data"; + let encoded = proto.encode(data).unwrap(); + let encoded_str = String::from_utf8(encoded).unwrap(); + assert!(!encoded_str.contains('+')); + assert!(!encoded_str.contains('/')); + } +} diff --git a/src/tree/protocols/http.rs b/src/tree/protocols/http.rs new file mode 100644 index 0000000..dfebf93 --- /dev/null +++ b/src/tree/protocols/http.rs @@ -0,0 +1,223 @@ +//! HTTP protocol implementation for tree communication. +//! +//! This protocol wraps data in HTTP requests/responses for traffic blending. + +use crate::tree::protocols::stack::{HttpConfig, Protocol, ProtocolError}; +use serde_json::Value; + +/// HTTP protocol for tree communication. +/// +/// Wraps outgoing data in HTTP requests and parses incoming HTTP responses. +pub struct HttpProtocol { + config: HttpConfig, +} + +impl HttpProtocol { + pub fn new(config: HttpConfig) -> Self { + Self { config } + } + + /// Build HTTP request + fn build_request(&self, body: &[u8]) -> Vec { + let body_len = body.len(); + let body_str = String::from_utf8_lossy(body); + + let mut request = format!( + "{} {} HTTP/1.1\r\n\ + Host: {}\r\n\ + User-Agent: {}\r\n\ + Content-Type: application/json\r\n\ + Content-Length: {}\r\n", + self.config.method, + self.config.path, + "localhost", // Would be configured in production + self.config.user_agent, + body_len + ); + + // Add custom headers + for (key, value) in &self.config.headers { + request.push_str(&format!("{}: {}\r\n", key, value)); + } + + request.push_str("\r\n"); + request.push_str(&body_str); + + request.into_bytes() + } + + /// Parse HTTP response and extract body + fn parse_response(&self, data: &[u8]) -> Result, ProtocolError> { + let data_str = String::from_utf8(data.to_vec()) + .map_err(|e| ProtocolError::DecodeError(e.to_string()))?; + + // Find body start (after \r\n\r\n) + let body_start = match data_str.find("\r\n\r\n") { + Some(pos) => pos + 4, + None => { + return Err(ProtocolError::DecodeError( + "Invalid HTTP response".to_string(), + )) + } + }; + + // Extract status code + let status_line = data_str + .split("\r\n") + .next() + .ok_or_else(|| ProtocolError::DecodeError("No status line".to_string()))?; + + let status_code: u16 = status_line + .split_whitespace() + .nth(1) + .and_then(|s| s.parse().ok()) + .ok_or_else(|| ProtocolError::DecodeError("Invalid status code".to_string()))?; + + if status_code < 200 || status_code >= 300 { + return Err(ProtocolError::DecodeError(format!( + "HTTP error: {}", + status_code + ))); + } + + // Extract body + let body = &data_str[body_start..]; + Ok(body.as_bytes().to_vec()) + } +} + +impl Protocol for HttpProtocol { + fn name(&self) -> &'static str { + "http" + } + + fn encode(&self, data: &[u8]) -> Result, ProtocolError> { + Ok(self.build_request(data)) + } + + fn decode(&self, data: &[u8]) -> Result, ProtocolError> { + self.parse_response(data) + } + + fn status(&self) -> Value { + serde_json::json!({ + "protocol": "http", + "method": self.config.method, + "path": self.config.path, + "headers": self.config.headers, + "user_agent": self.config.user_agent, + }) + } +} + +/// HTTP server for receiving tree messages. +/// +/// This is a simple implementation for testing - in production you'd +/// use a proper HTTP server. +pub struct HttpServer { + config: HttpConfig, +} + +impl HttpServer { + pub fn new(config: HttpConfig) -> Self { + Self { config } + } + + /// Parse incoming HTTP request and extract body + pub fn parse_request(&self, data: &[u8]) -> Result, ProtocolError> { + let data_str = String::from_utf8(data.to_vec()) + .map_err(|e| ProtocolError::DecodeError(e.to_string()))?; + + // Find body start + let body_start = match data_str.find("\r\n\r\n") { + Some(pos) => pos + 4, + None => { + return Err(ProtocolError::DecodeError( + "Invalid HTTP request".to_string(), + )) + } + }; + + // Extract Content-Length + let mut content_length = 0; + for line in data_str.lines() { + if line.to_lowercase().starts_with("content-length:") { + content_length = line + .split(':') + .nth(1) + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0); + break; + } + } + + let body = &data_str[body_start..]; + if body.len() >= content_length { + Ok(body[..content_length].as_bytes().to_vec()) + } else { + Ok(body.as_bytes().to_vec()) + } + } + + /// Build HTTP response + pub fn build_response(&self, body: &[u8], status: u16) -> Vec { + let body_len = body.len(); + let body_str = String::from_utf8_lossy(body); + + let status_text = match status { + 200 => "OK", + 400 => "Bad Request", + 404 => "Not Found", + 500 => "Internal Server Error", + _ => "Unknown", + }; + + format!( + "HTTP/1.1 {} {}\r\n\ + Content-Type: application/json\r\n\ + Content-Length: {}\r\n\ + Connection: close\r\n\ + \r\n\ + {}", + status, status_text, body_len, body_str + ) + .into_bytes() + } +} + +impl Default for HttpServer { + fn default() -> Self { + Self::new(Default::default()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_http_encode_decode() { + let proto = HttpProtocol::new(Default::default()); + + let data = r#"{"action": "test", "data": "hello"}"#.as_bytes(); + let encoded = proto.encode(data).unwrap(); + + // Build a valid response + let response = b"HTTP/1.1 200 OK\r\nContent-Length: 29\r\n\r\n{\"action\": \"test\", \"data\": \"hello\"}"; + + let decoded = proto.decode(response).unwrap(); + assert_eq!(decoded, data); + } + + #[test] + fn test_request_building() { + let server = HttpServer::new(Default::default()); + + let body = r#"{"test": "data"}"#.as_bytes(); + let request = server.build_request(body); + + let request_str = String::from_utf8(request).unwrap(); + assert!(request_str.contains("POST / HTTP/1.1")); + assert!(request_str.contains("Content-Length: 16")); + } +} diff --git a/src/tree/protocols/mod.rs b/src/tree/protocols/mod.rs new file mode 100644 index 0000000..8d6b38b --- /dev/null +++ b/src/tree/protocols/mod.rs @@ -0,0 +1,42 @@ +//! Protocol stacking system for extensible network communication. +//! +//! This module provides a way to layer multiple protocols on top of each other, +//! similar to a network stack. Each protocol can encode/decode data from the layer below. +//! +//! # Architecture +//! +//! Each protocol implements the `Protocol` trait, defining: +//! - How to encode data going "out" (to the network) +//! - How to decode data coming "in" (from the network) +//! - Configuration for the protocol +//! +//! # Usage +//! +//! ```rust +//! use tree::protocols::{Protocol, ProtocolStack, ProtocolConfig}; +//! use serde_json::json; +//! +//! // Create a stack: base64 -> http -> tcp +//! let stack: ProtocolStack = vec![ +//! ProtocolConfig::Base64(json!({})), +//! ProtocolConfig::Http(json!({ +//! "method": "POST", +//! "path": "/api/data" +//! })), +//! ]; +//! +//! // Encode outgoing message +//! let encoded = stack.encode(&json!({"action": "test"}))?; +//! +//! // Decode incoming data +//! let decoded = stack.decode(&encoded)?; +//! ``` + +pub mod base64; +pub mod http; +pub mod stack; + +pub use stack::{ + Base64Config, HttpConfig, Protocol, ProtocolConfig, ProtocolError, ProtocolStack, TcpConfig, + WebSocketConfig, +}; diff --git a/src/tree/protocols/stack.rs b/src/tree/protocols/stack.rs new file mode 100644 index 0000000..78ddce8 --- /dev/null +++ b/src/tree/protocols/stack.rs @@ -0,0 +1,503 @@ +//! Protocol stack implementation for layered network communication. +//! +//! The stack processes protocols from outermost (closest to app) to innermost (closest to network). + +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use thiserror::Error; + +use crate::tree::message::TreeMessage; + +#[derive(Error, Debug)] +pub enum ProtocolError { + #[error("Encoding failed: {0}")] + EncodeError(String), + #[error("Decoding failed: {0}")] + DecodeError(String), + #[error("Invalid configuration: {0}")] + ConfigError(String), + #[error("Protocol not found: {0}")] + NotFound(String), +} + +/// Core trait for protocol implementations. +/// +/// Each protocol can: +/// - Encode: Transform data going outward (app -> network) +/// - Decode: Transform data coming inward (network -> app) +pub trait Protocol: Send + Sync { + /// Unique name for this protocol + fn name(&self) -> &'static str; + + /// Encode data going outward (toward network) + fn encode(&self, data: &[u8]) -> Result, ProtocolError>; + + /// Decode data coming inward (from network) + fn decode(&self, data: &[u8]) -> Result, ProtocolError>; + + /// Get protocol status/info + fn status(&self) -> Value; +} + +/// Configuration for a single protocol layer. +/// +/// This allows protocols to be configured dynamically via JSON. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "protocol", rename_all = "snake_case")] +pub enum ProtocolConfig { + /// No-op pass-through protocol + Identity, + /// Base64 encoding + Base64(Base64Config), + /// HTTP protocol + Http(HttpConfig), + /// TCP raw protocol + Tcp(TcpConfig), + /// WebSocket protocol + WebSocket(WebSocketConfig), + /// Custom protocol (for future extensions) + Custom { name: String, config: Value }, +} + +/// Base64 encoding configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Base64Config { + /// Use URL-safe base64 variant + #[serde(default)] + pub url_safe: bool, + /// Add padding + #[serde(default = "default_true")] + pub padding: bool, +} + +fn default_true() -> bool { + true +} + +impl Default for Base64Config { + fn default() -> Self { + Self { + url_safe: false, + padding: true, + } + } +} + +/// HTTP protocol configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HttpConfig { + /// HTTP method + #[serde(default = "default_post")] + pub method: String, + /// Request path + #[serde(default)] + pub path: String, + /// Headers to add + #[serde(default)] + pub headers: std::collections::HashMap, + /// User agent + #[serde(default)] + pub user_agent: String, +} + +fn default_post() -> String { + "POST".to_string() +} + +impl Default for HttpConfig { + fn default() -> Self { + Self { + method: "POST".to_string(), + path: "/".to_string(), + headers: std::collections::HashMap::new(), + user_agent: "TreeProtocol/1.0".to_string(), + } + } +} + +/// TCP raw protocol configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TcpConfig { + /// Delimiter for message framing + #[serde(default)] + pub delimiter: String, + /// Include length prefix + #[serde(default)] + pub length_prefix: bool, +} + +impl Default for TcpConfig { + fn default() -> Self { + Self { + delimiter: "\n".to_string(), + length_prefix: false, + } + } +} + +/// WebSocket configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WebSocketConfig { + /// WebSocket subprotocol + #[serde(default)] + pub subprotocol: Option, + /// Path for WS connection + #[serde(default)] + pub path: String, +} + +impl Default for WebSocketConfig { + fn default() -> Self { + Self { + subprotocol: None, + path: "/".to_string(), + } + } +} + +/// A stack of protocols to process data through. +/// +/// Data flows through the stack: +/// - Encoding: App -> Protocol N -> ... -> Protocol 1 -> Network +/// - Decoding: Network -> Protocol 1 -> ... -> Protocol N -> App +pub struct ProtocolStack { + /// Stack of protocols (outermost first for encoding) + protocols: Vec>, + /// Configuration order (for serialization) + config_order: Vec, +} + +impl Clone for ProtocolStack { + fn clone(&self) -> Self { + Self { + protocols: Vec::new(), // Can't clone protocols + config_order: self.config_order.clone(), + } + } +} + +impl std::fmt::Debug for ProtocolStack { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ProtocolStack") + .field("config_order", &self.config_order) + .field("protocols", &self.protocols.len()) + .finish() + } +} + +impl ProtocolStack { + /// Create a new empty stack + pub fn new() -> Self { + Self { + protocols: Vec::new(), + config_order: Vec::new(), + } + } + + /// Create a stack from configurations + pub fn from_configs(configs: &[ProtocolConfig]) -> Result { + let mut stack = Self::new(); + for config in configs { + stack.push(config)?; + } + Ok(stack) + } + + /// Add a protocol to the stack (outermost position) + pub fn push(&mut self, config: &ProtocolConfig) -> Result<(), ProtocolError> { + let (protocol, name) = match config { + ProtocolConfig::Identity => { + let p = crate::tree::protocols::base64::IdentityProtocol::new(); + (Box::new(p) as Box, "identity".to_string()) + } + ProtocolConfig::Base64(cfg) => { + let p = crate::tree::protocols::base64::Base64Protocol::new(cfg.clone()); + (Box::new(p) as Box, "base64".to_string()) + } + ProtocolConfig::Http(cfg) => { + let p = crate::tree::protocols::http::HttpProtocol::new(cfg.clone()); + (Box::new(p) as Box, "http".to_string()) + } + ProtocolConfig::Tcp(cfg) => { + let p = TcpProtocol::new(cfg.clone()); + (Box::new(p) as Box, "tcp".to_string()) + } + ProtocolConfig::WebSocket(cfg) => { + let p = WebSocketProtocol::new(cfg.clone()); + (Box::new(p) as Box, "websocket".to_string()) + } + ProtocolConfig::Custom { name, config } => { + return Err(ProtocolError::NotFound(format!( + "Custom protocol '{}' not implemented", + name + ))); + } + }; + + self.config_order.push(name); + self.protocols.push(protocol); + Ok(()) + } + + /// Remove the outermost protocol + pub fn pop(&mut self) -> Option> { + self.config_order.pop()?; + self.protocols.pop() + } + + /// Get number of protocols in stack + pub fn len(&self) -> usize { + self.protocols.len() + } + + /// Check if stack is empty + pub fn is_empty(&self) -> bool { + self.protocols.is_empty() + } + + /// Encode data through the entire stack (app -> network) + pub fn encode(&self, data: &[u8]) -> Result, ProtocolError> { + let mut result = data.to_vec(); + for protocol in self.protocols.iter() { + result = protocol.encode(&result)?; + } + Ok(result) + } + + /// Decode data through the entire stack (network -> app) + pub fn decode(&self, data: &[u8]) -> Result, ProtocolError> { + let mut result = data.to_vec(); + // Decode in reverse order (innermost to outermost) + for protocol in self.protocols.iter().rev() { + result = protocol.decode(&result)?; + } + Ok(result) + } + + /// Encode a TreeMessage through the stack + pub fn encode_message(&self, message: &TreeMessage) -> Result, ProtocolError> { + let json = + serde_json::to_vec(message).map_err(|e| ProtocolError::EncodeError(e.to_string()))?; + self.encode(&json) + } + + /// Decode data into a TreeMessage + pub fn decode_message(&self, data: &[u8]) -> Result { + let decoded = self.decode(data)?; + serde_json::from_slice(&decoded).map_err(|e| ProtocolError::DecodeError(e.to_string())) + } + + /// Get status of all protocols in stack + pub fn status(&self) -> Vec { + self.protocols.iter().map(|p| p.status()).collect() + } + + /// Get the configuration for serialization + pub fn to_configs(&self) -> Vec { + self.config_order + .iter() + .enumerate() + .filter_map(|(_, name)| { + // This is simplified - in production you'd store configs + Some(match name.as_str() { + "identity" => ProtocolConfig::Identity, + "base64" => ProtocolConfig::Base64(Default::default()), + "http" => ProtocolConfig::Http(Default::default()), + "tcp" => ProtocolConfig::Tcp(Default::default()), + "websocket" => ProtocolConfig::WebSocket(Default::default()), + _ => return None, + }) + }) + .collect() + } +} + +impl Default for ProtocolStack { + fn default() -> Self { + Self::new() + } +} + +/// TCP protocol implementation (simple framing) +pub struct TcpProtocol { + config: TcpConfig, +} + +impl TcpProtocol { + pub fn new(config: TcpConfig) -> Self { + Self { config } + } +} + +impl Protocol for TcpProtocol { + fn name(&self) -> &'static str { + "tcp" + } + + fn encode(&self, data: &[u8]) -> Result, ProtocolError> { + let mut result = Vec::new(); + + if self.config.length_prefix { + let len = (data.len() as u32).to_be_bytes(); + result.extend_from_slice(&len); + } + + result.extend_from_slice(data); + + if !self.config.length_prefix && !self.config.delimiter.is_empty() { + result.extend_from_slice(self.config.delimiter.as_bytes()); + } + + Ok(result) + } + + fn decode(&self, data: &[u8]) -> Result, ProtocolError> { + let mut result = data.to_vec(); + + // Remove delimiter if present + if !self.config.delimiter.is_empty() { + if let Some(pos) = result + .iter() + .position(|&b| self.config.delimiter.as_bytes().contains(&b)) + { + result.truncate(pos); + } + } + + // If length prefix, skip it + if self.config.length_prefix && result.len() >= 4 { + let len = u32::from_be_bytes([result[0], result[1], result[2], result[3]]) as usize; + if result.len() >= 4 + len { + result = result[4..4 + len].to_vec(); + } + } + + Ok(result) + } + + fn status(&self) -> Value { + serde_json::json!({ + "protocol": "tcp", + "delimiter": self.config.delimiter, + "length_prefix": self.config.length_prefix, + }) + } +} + +/// WebSocket protocol implementation (simplified) +pub struct WebSocketProtocol { + config: WebSocketConfig, +} + +impl WebSocketProtocol { + pub fn new(config: WebSocketConfig) -> Self { + Self { config } + } +} + +impl Protocol for WebSocketProtocol { + fn name(&self) -> &'static str { + "websocket" + } + + fn encode(&self, data: &[u8]) -> Result, ProtocolError> { + // Simple WebSocket text frame: FIN(1) + opcode(1) + length(2) + data + let mut frame = vec![0x81]; // FIN + text opcode + let len = data.len(); + if len < 126 { + frame.push(len as u8); + } else if len < 65536 { + frame.push(126); + frame.extend_from_slice(&(len as u16).to_be_bytes()); + } else { + frame.push(127); + frame.extend_from_slice(&(len as u64).to_be_bytes()); + } + frame.extend_from_slice(data); + Ok(frame) + } + + fn decode(&self, data: &[u8]) -> Result, ProtocolError> { + if data.len() < 2 { + return Err(ProtocolError::DecodeError("Frame too short".to_string())); + } + + let opcode = data[0] & 0x0f; + if opcode == 0x08 { + // Close frame + return Err(ProtocolError::DecodeError("Connection closed".to_string())); + } + + let len = data[1] & 0x7f; + let header_len = match len { + 126 => 4, + 127 => 10, + _ => 2, + }; + + if data.len() > header_len { + Ok(data[header_len..].to_vec()) + } else { + Err(ProtocolError::DecodeError("Incomplete frame".to_string())) + } + } + + fn status(&self) -> Value { + serde_json::json!({ + "protocol": "websocket", + "path": self.config.path, + "subprotocol": self.config.subprotocol, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_base64_stack() { + let mut stack = ProtocolStack::new(); + stack + .push(&ProtocolConfig::Base64(Default::default())) + .unwrap(); + + let data = b"hello world"; + let encoded = stack.encode(data).unwrap(); + let decoded = stack.decode(&encoded).unwrap(); + + assert_eq!(decoded, data); + } + + #[test] + fn test_multi_layer_stack() { + let mut stack = ProtocolStack::new(); + stack + .push(&ProtocolConfig::Base64(Default::default())) + .unwrap(); + stack + .push(&ProtocolConfig::Tcp(Default::default())) + .unwrap(); + + let data = b"test message"; + let encoded = stack.encode(data).unwrap(); + let decoded = stack.decode(&encoded).unwrap(); + + assert_eq!(decoded, data); + } + + #[test] + fn test_http_config() { + let config = HttpConfig { + method: "POST".to_string(), + path: "/api/test".to_string(), + headers: std::collections::HashMap::new(), + user_agent: "Test/1.0".to_string(), + }; + + let mut stack = ProtocolStack::new(); + stack.push(&ProtocolConfig::Http(config)).unwrap(); + + assert_eq!(stack.len(), 1); + } +} diff --git a/src/tree/tcp/client.rs b/src/tree/tcp/client.rs index e3cde75..49bc263 100644 --- a/src/tree/tcp/client.rs +++ b/src/tree/tcp/client.rs @@ -1,47 +1,85 @@ //! TCP Client component for outbound connections. //! //! Provides a TreeElement for managing TCP client connections with -//! configuration, status queries, and reconnection support. +//! configuration, status queries, reconnection support, and protocol stacking. use std::io::{Read, Write}; use std::net::TcpStream; use std::sync::{Arc, Mutex}; use std::time::Duration; +use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use crate::tree::component::Component; +use crate::tree::message::TreeMessage; +use crate::tree::protocols::{ProtocolConfig, ProtocolStack}; use crate::tree::symbols; use crate::tree::tcp::config::{ConnectionStatus, TcpClientConfig}; use crate::tree::{Branch, TreeElement}; -/// TCP Client component +/// TCP Client component with protocol stacking support. +/// +/// This component can: +/// - Connect to remote TCP servers +/// - Apply protocol stacks (base64, http, etc.) +/// - Send/receive messages via RPC +/// - Auto-reconnect on failure +#[derive(Debug, Serialize, Deserialize)] pub struct TcpClient { - name: String, - config: TcpClientConfig, + /// Unique name for this client + pub name: String, + /// Connection configuration + pub config: TcpClientConfig, + /// Protocol stack configuration + #[serde(default)] + pub protocols: Vec, + /// Current connection status + #[serde(skip)] status: ConnectionStatus, + /// Active TCP stream + #[serde(skip)] stream: Option>>, + /// Protocol stack (runtime) + #[serde(skip)] + protocol_stack: ProtocolStack, + /// Internal tree structure + #[serde(skip)] branch: Branch, } impl TcpClient { + /// Create a new TCP client with default settings pub fn new(name: impl Into) -> Self { - let name = name.into(); - let mut branch = Branch::new("TCPClient"); + Self::with_config(name, TcpClientConfig::default()) + } - // Add internal state branch + /// Create a new TCP client with custom configuration + pub fn with_config(name: impl Into, config: TcpClientConfig) -> Self { + let name = name.into(); + + let mut branch = Branch::new("TCPClient"); let state_branch = Branch::new("state"); branch.add_child("state", Box::new(state_branch)); Self { name: name.clone(), - config: TcpClientConfig::default(), + config, + protocols: Vec::new(), status: ConnectionStatus::disconnected(), stream: None, + protocol_stack: ProtocolStack::new(), branch, } } + /// Set protocol stack configuration + pub fn set_protocols(&mut self, protocols: Vec) -> Result<(), String> { + self.protocols = protocols.clone(); + self.protocol_stack = ProtocolStack::from_configs(&protocols).map_err(|e| e.to_string())?; + Ok(()) + } + /// Connect to the configured address pub fn connect(&mut self) -> Result<(), String> { let addr = format!("{}:{}", self.config.address, self.config.port); @@ -85,32 +123,66 @@ impl TcpClient { self.status.connected } - /// Send data over the connection - pub fn send(&mut self, data: &[u8]) -> Result { + /// Send raw data over the connection + pub fn send_raw(&mut self, data: &[u8]) -> Result { let stream = self.stream.as_ref().ok_or("Not connected")?; - let mut stream = stream.lock().map_err(|e| format!("Lock failed: {}", e))?; - stream .write(data) .map_err(|e| format!("Write failed: {}", e)) } - /// Receive data from the connection - pub fn recv(&mut self, buffer_size: usize) -> Result, String> { + /// Receive raw data from the connection + pub fn recv_raw(&mut self, buffer_size: usize) -> Result, String> { let stream = self.stream.as_ref().ok_or("Not connected")?; - let mut stream = stream.lock().map_err(|e| format!("Lock failed: {}", e))?; - let mut buffer = vec![0u8; buffer_size]; let n = stream .read(&mut buffer) .map_err(|e| format!("Read failed: {}", e))?; - buffer.truncate(n); Ok(buffer) } + /// Send a TreeMessage through the protocol stack + pub fn send_message_raw(&mut self, message: &TreeMessage) -> Result<(), String> { + let encoded = self + .protocol_stack + .encode_message(message) + .map_err(|e| format!("Encoding failed: {}", e))?; + + self.send_raw(&encoded)?; + Ok(()) + } + + /// Receive and decode a TreeMessage + pub fn recv_message(&mut self, buffer_size: usize) -> Result { + let data = self.recv_raw(buffer_size)?; + + self.protocol_stack + .decode_message(&data) + .map_err(|e| format!("Decoding failed: {}", e)) + } + + /// Send and wait for response (RPC pattern) + pub fn rpc_call(&mut self, message: &TreeMessage) -> Result { + let id = message.id.clone(); + + self.send_message_raw(message)?; + + // Simple blocking receive - in production would have timeout + let response = self.recv_message(4096)?; + + // Verify it's a response to our message + if let Some(response_to) = &response.response_to { + if response_to != id.as_deref().unwrap_or("") { + return Err("Response ID mismatch".to_string()); + } + } + + Ok(response) + } + /// Get current configuration pub fn config(&self) -> &TcpClientConfig { &self.config @@ -127,12 +199,87 @@ impl TcpClient { "connected": self.status.connected, "remote_address": self.status.remote_address, "local_address": self.status.local_address, - "config": { - "address": self.config.address, - "port": self.config.port, - } + "bytes_sent": self.status.bytes_sent, + "bytes_received": self.status.bytes_received, + "config": self.config, + "protocols": self.protocol_stack.to_configs(), }) } + + /// Handle RPC call from message + fn handle_rpc(&mut self, payload: &Value) -> Value { + let method = match payload.get("method").and_then(|m| m.as_str()) { + Some(m) => m, + None => return json!({"success": false, "error": "missing method"}), + }; + + let params = payload.get("params").cloned().unwrap_or(Value::Null); + + match method { + "connect" => { + // Allow override of address/port + if let Some(addr) = params.get("address").and_then(|a| a.as_str()) { + self.config.address = addr.to_string(); + } + if let Some(port) = params.get("port").and_then(|p| p.as_u64()) { + self.config.port = port as u16; + } + + match self.connect() { + Ok(_) => json!({"success": true, "status": self.status}), + Err(e) => json!({"success": false, "error": e}), + } + } + "disconnect" => match self.disconnect() { + Ok(_) => json!({"success": true}), + Err(e) => json!({"success": false, "error": e}), + }, + "send" => { + let data = params + .get("data") + .and_then(|d| d.as_str()) + .map(|s| s.as_bytes().to_vec()); + + match data { + Some(data) => match self.send_raw(&data) { + Ok(n) => json!({"success": true, "bytes_sent": n}), + Err(e) => json!({"success": false, "error": e}), + }, + None => json!({"success": false, "error": "missing data"}), + } + } + "recv" => { + let size = params + .get("size") + .and_then(|s| s.as_u64()) + .map(|s| s as usize) + .unwrap_or(4096); + match self.recv_raw(size) { + Ok(data) => json!({ + "success": true, + "data": String::from_utf8_lossy(&data), + "bytes": data.len() + }), + Err(e) => json!({"success": false, "error": e}), + } + } + "status" => self.get_status(), + "set_protocols" => { + if let Some(protocols) = params.get("protocols") { + match serde_json::from_value(protocols.clone()) { + Ok(p) => match self.set_protocols(p) { + Ok(_) => json!({"success": true}), + Err(e) => json!({"success": false, "error": e}), + }, + Err(e) => json!({"success": false, "error": e.to_string()}), + } + } else { + json!({"success": false, "error": "missing protocols"}) + } + } + _ => json!({"success": false, "error": format!("unknown method: {}", method)}), + } + } } impl Component for TcpClient { @@ -145,8 +292,21 @@ impl Component for TcpClient { } fn init(&mut self, config: Value) -> Result<(), String> { - self.config = - serde_json::from_value(config).map_err(|e| format!("Invalid config: {}", e))?; + // Support both legacy config and new format + if let Some(client_config) = config.get("config") { + self.config = serde_json::from_value(client_config.clone()) + .map_err(|e| format!("Invalid config: {}", e))?; + } else { + self.config = serde_json::from_value(config.clone()) + .map_err(|e| format!("Invalid config: {}", e))?; + } + + if let Some(protocols) = config.get("protocols") { + let p: Vec = serde_json::from_value(protocols.clone()) + .map_err(|e| format!("Invalid protocols: {}", e))?; + self.set_protocols(p)?; + } + self.connect()?; Ok(()) } @@ -158,12 +318,21 @@ impl Component for TcpClient { impl TreeElement for TcpClient { fn get_type(&self) -> Value { - json!("TCPClient") + json!({ + "type": "TCPClient", + "name": self.name, + }) } fn send_message(&mut self, target: Value, message: Value) -> Value { match target { Value::Null => { + // Check for RPC call format + if message.get("method").is_some() { + return self.handle_rpc(&message); + } + + // Legacy string commands if let Some(cmd) = message.as_str() { match cmd { "Connect" => match self.connect() { @@ -196,6 +365,9 @@ impl TreeElement for TcpClient { } Err(e) => json!({"success": false, "error": e.to_string()}), } + } else if obj.get("method").is_some() { + let payload = Value::Object(obj.clone()); + self.handle_rpc(&payload) } else { json!(symbols::ERR_INVALID_COMMAND) } @@ -203,23 +375,45 @@ impl TreeElement for TcpClient { json!(symbols::ERR_INVALID_COMMAND) } } - Value::String(subtarget) => { - match subtarget.as_str() { - "config" => { - // Return or modify configuration - json!(self.config) - } - "state" => { - // Return connection state - json!({ - "connected": self.status.connected, - "remote": self.status.remote_address, - }) - } - _ => json!(symbols::ERR_CHILD_NOT_FOUND), - } - } + Value::String(subtarget) => match subtarget.as_str() { + "config" => json!(self.config), + "state" => json!({ + "connected": self.status.connected, + "remote": self.status.remote_address, + }), + "protocols" => json!(self.protocol_stack.to_configs()), + _ => json!(symbols::ERR_CHILD_NOT_FOUND), + }, _ => json!(symbols::ERR_INVALID_TARGET), } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_client_creation() { + let client = TcpClient::new("test-client"); + assert_eq!(client.name(), "test-client"); + assert!(!client.is_connected()); + } + + #[test] + fn test_config_serialization() { + let client = TcpClient::with_config("test", TcpClientConfig::new("127.0.0.1", 8080)); + let json = serde_json::to_string(&client).unwrap(); + assert!(json.contains("test")); + assert!(json.contains("127.0.0.1")); + } + + #[test] + fn test_rpc_status() { + let mut client = TcpClient::new("test"); + let result = client.send_message(json!(null), json!({"method": "status"})); + + let obj = result.as_object().unwrap(); + assert!(obj.contains_key("connected")); + } +} diff --git a/src/tree/tcp/config.rs b/src/tree/tcp/config.rs index 4dfc911..cffc7f7 100644 --- a/src/tree/tcp/config.rs +++ b/src/tree/tcp/config.rs @@ -98,7 +98,7 @@ impl TcpServerConfig { } /// Connection status information -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct ConnectionStatus { pub connected: bool, pub remote_address: Option, @@ -138,7 +138,7 @@ impl ConnectionStatus { } /// Server listener status -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct ListenerStatus { pub listening: bool, pub bind_address: String, diff --git a/src/tree/tcp/server.rs b/src/tree/tcp/server.rs index 5ae3574..4c78394 100644 --- a/src/tree/tcp/server.rs +++ b/src/tree/tcp/server.rs @@ -1,7 +1,7 @@ //! TCP Server component for inbound connections. //! //! Provides a TreeElement for managing TCP server listeners with -//! configuration, status queries, and connection management. +//! configuration, status queries, connection management, and protocol stacking. use std::collections::HashMap; use std::io::{Read, Write}; @@ -9,41 +9,52 @@ use std::net::{TcpListener, TcpStream}; use std::sync::{Arc, Mutex}; use std::time::Duration; +use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use crate::tree::component::Component; +use crate::tree::message::TreeMessage; +use crate::tree::protocols::{ProtocolConfig, ProtocolStack}; use crate::tree::symbols; use crate::tree::tcp::config::{ListenerStatus, TcpServerConfig}; use crate::tree::{Branch, TreeElement}; /// A connected client managed by the server -struct ManagedClient { - id: String, +#[derive(Debug)] +pub struct ManagedClient { + pub id: String, stream: TcpStream, peer_addr: String, + local_addr: String, } impl ManagedClient { - fn new(id: String, stream: TcpStream) -> Self { + pub fn new(id: String, stream: TcpStream) -> Self { let peer_addr = stream .peer_addr() .map(|a| a.to_string()) .unwrap_or_else(|_| "unknown".to_string()); + let local_addr = stream + .local_addr() + .map(|a| a.to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + Self { id, stream, peer_addr, + local_addr, } } - fn send(&mut self, data: &[u8]) -> Result { + pub fn send(&mut self, data: &[u8]) -> Result { self.stream .write(data) .map_err(|e| format!("Write failed: {}", e)) } - fn recv(&mut self, buffer_size: usize) -> Result, String> { + pub fn recv(&mut self, buffer_size: usize) -> Result, String> { let mut buffer = vec![0u8; buffer_size]; let _ = self.stream.set_read_timeout(Some(Duration::from_secs(1))); @@ -58,37 +69,86 @@ impl ManagedClient { } } - fn peer_address(&self) -> &str { + pub fn peer_address(&self) -> &str { &self.peer_addr } + + pub fn local_address(&self) -> &str { + &self.local_addr + } + + pub fn set_nonblocking(&mut self, nonblocking: bool) -> Result<(), String> { + self.stream + .set_nonblocking(nonblocking) + .map_err(|e| format!("Failed to set non-blocking: {}", e)) + } } -/// TCP Server component +/// TCP Server component with protocol stacking support. +/// +/// This component can: +/// - Listen for incoming TCP connections +/// - Manage multiple concurrent connections +/// - Apply protocol stacks to connections +/// - Send/receive messages via RPC +#[derive(Debug, Serialize, Deserialize)] pub struct TcpServer { - name: String, - config: TcpServerConfig, + /// Unique name for this server + pub name: String, + /// Server configuration + pub config: TcpServerConfig, + /// Protocol stack for incoming connections + #[serde(default)] + pub protocols: Vec, + /// Current listener status + #[serde(skip)] status: ListenerStatus, + /// TCP listener (runtime only) + #[serde(skip)] listener: Option, + /// Active clients + #[serde(skip)] clients: HashMap>>, + /// Protocol stacks per client + #[serde(skip)] + client_protocols: HashMap, + /// Total connections since start total_connections: u64, + /// Internal tree structure + #[serde(skip)] branch: Branch, } impl TcpServer { + /// Create a new TCP server with default settings pub fn new(name: impl Into) -> Self { + Self::with_config(name, TcpServerConfig::default()) + } + + /// Create a new TCP server with custom configuration + pub fn with_config(name: impl Into, config: TcpServerConfig) -> Self { let name = name.into(); Self { name: name.clone(), - config: TcpServerConfig::default(), + config, + protocols: Vec::new(), status: ListenerStatus::stopped("0.0.0.0", 0), listener: None, clients: HashMap::new(), + client_protocols: HashMap::new(), total_connections: 0, branch: Branch::new("TCPServer"), } } + /// Set protocol stack configuration + pub fn set_protocols(&mut self, protocols: Vec) -> Result<(), String> { + self.protocols = protocols.clone(); + // Don't rebuild client_protocols here - each client gets its own stack + Ok(()) + } + /// Start listening for connections pub fn listen(&mut self) -> Result<(), String> { let addr = format!("{}:{}", self.config.bind_address, self.config.port); @@ -114,6 +174,7 @@ impl TcpServer { pub fn stop(&mut self) -> Result<(), String> { self.listener = None; self.clients.clear(); + self.client_protocols.clear(); self.status = ListenerStatus::stopped(&self.config.bind_address, self.config.port); Ok(()) } @@ -135,10 +196,16 @@ impl TcpServer { /// Register an accepted connection pub fn register_client(&mut self, id: String, stream: TcpStream) { let client_id = id.clone(); - self.clients.insert( - id, - Arc::new(Mutex::new(ManagedClient::new(client_id, stream))), - ); + + // Create protocol stack for this client + let mut protocol_stack = ProtocolStack::new(); + for config in &self.protocols { + let _ = protocol_stack.push(config); + } + + let client = Arc::new(Mutex::new(ManagedClient::new(client_id, stream))); + self.clients.insert(id.clone(), client); + self.client_protocols.insert(id, protocol_stack); } /// Disconnect a client @@ -146,6 +213,7 @@ impl TcpServer { self.clients .remove(id) .ok_or_else(|| format!("Client '{}' not found", id))?; + self.client_protocols.remove(id); Ok(()) } @@ -157,10 +225,56 @@ impl TcpServer { .ok_or_else(|| format!("Client '{}' not found", client_id))?; let mut client = client.lock().map_err(|e| format!("Lock failed: {}", e))?; - client.send(data) } + /// Receive from a specific client + pub fn recv_from(&mut self, client_id: &str, buffer_size: usize) -> Result, String> { + let client = self + .clients + .get(client_id) + .ok_or_else(|| format!("Client '{}' not found", client_id))?; + + let mut client = client.lock().map_err(|e| format!("Lock failed: {}", e))?; + client.recv(buffer_size) + } + + /// Send TreeMessage to client through protocol stack + pub fn send_message_to( + &mut self, + client_id: &str, + message: &TreeMessage, + ) -> Result<(), String> { + let protocol_stack = self + .client_protocols + .get_mut(client_id) + .ok_or_else(|| format!("Client '{}' not found", client_id))?; + + let encoded = protocol_stack + .encode_message(message) + .map_err(|e| format!("Encoding failed: {}", e))?; + + self.send_to(client_id, &encoded).map(|_| ()) + } + + /// Receive TreeMessage from client through protocol stack + pub fn recv_message_from( + &mut self, + client_id: &str, + buffer_size: usize, + ) -> Result { + let data = self.recv_from(client_id, buffer_size)?; + + let protocol_stack = self + .client_protocols + .get_mut(client_id) + .ok_or_else(|| format!("Client '{}' not found", client_id))?; + + protocol_stack + .decode_message(&data) + .map_err(|e| format!("Decoding failed: {}", e)) + } + /// Check if listening pub fn is_listening(&self) -> bool { self.status.listening @@ -178,7 +292,17 @@ impl TcpServer { /// Get status as JSON pub fn get_status(&self) -> Value { - let client_list: Vec = self.clients.keys().map(|k| json!(k)).collect(); + let client_list: Vec = self + .clients + .iter() + .map(|(id, client)| { + let addr = client + .lock() + .map(|c| c.peer_address().to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + json!({"id": id, "peer": addr}) + }) + .collect(); json!({ "listening": self.status.listening, @@ -186,9 +310,119 @@ impl TcpServer { "port": self.config.port, "active_connections": self.clients.len(), "total_connections": self.total_connections, + "config": self.config, + "protocols": self.protocols, "clients": client_list, }) } + + /// Handle RPC call from message + fn handle_rpc(&mut self, payload: &Value) -> Value { + let method = match payload.get("method").and_then(|m| m.as_str()) { + Some(m) => m, + None => return json!({"success": false, "error": "missing method"}), + }; + + let params = payload.get("params").cloned().unwrap_or(Value::Null); + + match method { + "listen" | "start" => { + if let Some(addr) = params.get("bind_address").and_then(|a| a.as_str()) { + self.config.bind_address = addr.to_string(); + } + if let Some(port) = params.get("port").and_then(|p| p.as_u64()) { + self.config.port = port as u16; + } + + match self.listen() { + Ok(_) => json!({"success": true, "status": self.status}), + Err(e) => json!({"success": false, "error": e}), + } + } + "stop" => match self.stop() { + Ok(_) => json!({"success": true}), + Err(e) => json!({"success": false, "error": e}), + }, + "accept" => { + // Try to accept a pending connection + if let Some((id, stream)) = self.accept() { + self.register_client(id.clone(), stream); + json!({"success": true, "client_id": id}) + } else { + json!({"success": true, "client_id": null}) + } + } + "send" => { + let client_id = params + .get("client_id") + .and_then(|c| c.as_str()) + .ok_or_else(|| json!({"error": "missing client_id"})); + + match client_id { + Ok(id) => { + let data = params + .get("data") + .and_then(|d| d.as_str()) + .map(|s| s.as_bytes().to_vec()); + + match data { + Some(data) => match self.send_to(id, &data) { + Ok(n) => json!({"success": true, "bytes_sent": n}), + Err(e) => json!({"success": false, "error": e}), + }, + None => json!({"success": false, "error": "missing data"}), + } + } + Err(e) => e, + } + } + "recv" => { + let client_id = params + .get("client_id") + .and_then(|c| c.as_str()) + .ok_or_else(|| json!({"error": "missing client_id"})); + + match client_id { + Ok(id) => { + let size = params + .get("size") + .and_then(|s| s.as_u64()) + .map(|s| s as usize) + .unwrap_or(4096); + match self.recv_from(id, size) { + Ok(data) => json!({ + "success": true, + "data": String::from_utf8_lossy(&data), + "bytes": data.len() + }), + Err(e) => json!({"success": false, "error": e}), + } + } + Err(e) => e, + } + } + "disconnect" => { + let client_id = params + .get("client_id") + .and_then(|c| c.as_str()) + .ok_or_else(|| json!({"error": "missing client_id"})); + + match client_id { + Ok(id) => match self.disconnect_client(id) { + Ok(_) => json!({"success": true}), + Err(e) => json!({"success": false, "error": e}), + }, + Err(e) => e, + } + } + "status" => self.get_status(), + "list_clients" => { + let clients: Vec = self.clients.keys().map(|k| json!(k)).collect(); + json!({"success": true, "clients": clients}) + } + _ => json!({"success": false, "error": format!("unknown method: {}", method)}), + } + } } impl Component for TcpServer { @@ -201,8 +435,20 @@ impl Component for TcpServer { } fn init(&mut self, config: Value) -> Result<(), String> { - self.config = - serde_json::from_value(config).map_err(|e| format!("Invalid config: {}", e))?; + if let Some(server_config) = config.get("config") { + self.config = serde_json::from_value(server_config.clone()) + .map_err(|e| format!("Invalid config: {}", e))?; + } else { + self.config = serde_json::from_value(config.clone()) + .map_err(|e| format!("Invalid config: {}", e))?; + } + + if let Some(protocols) = config.get("protocols") { + let p: Vec = serde_json::from_value(protocols.clone()) + .map_err(|e| format!("Invalid protocols: {}", e))?; + self.set_protocols(p)?; + } + self.listen()?; Ok(()) } @@ -214,12 +460,21 @@ impl Component for TcpServer { impl TreeElement for TcpServer { fn get_type(&self) -> Value { - json!("TCPServer") + json!({ + "type": "TCPServer", + "name": self.name, + }) } fn send_message(&mut self, target: Value, message: Value) -> Value { match target { Value::Null => { + // Check for RPC call format + if message.get("method").is_some() { + return self.handle_rpc(&message); + } + + // Legacy string commands if let Some(cmd) = message.as_str() { match cmd { "Listen" | "Start" => match self.listen() { @@ -251,6 +506,9 @@ impl TreeElement for TcpServer { } Err(e) => json!({"success": false, "error": e.to_string()}), } + } else if obj.get("method").is_some() { + let payload = Value::Object(obj.clone()); + self.handle_rpc(&payload) } else { json!(symbols::ERR_INVALID_COMMAND) } @@ -261,9 +519,42 @@ impl TreeElement for TcpServer { Value::String(subtarget) => match subtarget.as_str() { "config" => json!(self.config), "status" => self.get_status(), + "clients" => { + let clients: Vec = self.clients.keys().map(|k| json!(k)).collect(); + json!(clients) + } _ => json!(symbols::ERR_CHILD_NOT_FOUND), }, _ => json!(symbols::ERR_INVALID_TARGET), } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_server_creation() { + let server = TcpServer::new("test-server"); + assert_eq!(server.name(), "test-server"); + assert!(!server.is_listening()); + } + + #[test] + fn test_config_serialization() { + let server = TcpServer::with_config("test", TcpServerConfig::new(8080)); + let json = serde_json::to_string(&server).unwrap(); + assert!(json.contains("test")); + assert!(json.contains("8080")); + } + + #[test] + fn test_rpc_status() { + let mut server = TcpServer::new("test"); + let result = server.send_message(json!(null), json!({"method": "status"})); + + let obj = result.as_object().unwrap(); + assert!(obj.contains_key("listening")); + } +} diff --git a/ush-payload/src/main.rs b/ush-payload/src/main.rs index 8ad4a6e..aafefe1 100644 --- a/ush-payload/src/main.rs +++ b/ush-payload/src/main.rs @@ -7,11 +7,16 @@ use std::io::{Read, Write}; use std::net::TcpListener; use std::sync::mpsc; use std::thread; +use std::time::Duration; -use unshell::tree::{EndpointManager, TreeMessage}; +use serde_json::json; +use unshell::tree::message::TreeMessage; +use unshell::tree::protocols::{ProtocolConfig, ProtocolStack}; +use unshell::tree::tcp::{TcpClient, TcpServer}; +use unshell::tree::{ComponentRegistry, EndpointManager, TreeElement}; fn main() { - println!("=== TCP Chain Test Harness ===\n"); + println!("=== Tree Protocol Test Harness ===\n"); // Test 1: Local TCP Server-Client loopback test_tcp_loopback(); @@ -19,9 +24,21 @@ fn main() { // Test 2: Tree message routing test_tree_message(); - // Test 3: TreeMessage serialization + // Test 3: TreeMessage serialization (new API) test_message_serialization(); + // Test 4: TCP Server with RPC + test_tcp_server(); + + // Test 5: TCP Client with RPC + test_tcp_client(); + + // Test 6: Protocol stacking + test_protocol_stack(); + + // Test 7: Component registry + test_component_registry(); + println!("\n=== All tests complete ==="); } @@ -29,7 +46,6 @@ fn main() { fn test_tcp_loopback() { println!("[Test 1] TCP Loopback Test"); - // Start a TCP server in a thread let (tx, rx) = mpsc::channel(); let server_thread = thread::spawn(move || { @@ -37,24 +53,25 @@ fn test_tcp_loopback() { let addr = listener.local_addr().unwrap(); tx.send(addr.port()).unwrap(); - for stream in listener.incoming() { - if let Ok(mut stream) = stream { - let mut buf = [0u8; 1024]; - if let Ok(n) = stream.read(&mut buf) { - let response = b"Echo: "; - let _ = stream.write(response); - let _ = stream.write(&buf[..n]); - let _ = stream.flush(); - } + // Accept one connection only + if let Ok((mut stream, _)) = listener.accept() { + let mut buf = [0u8; 1024]; + if let Ok(n) = stream.read(&mut buf) { + let response = b"Echo: "; + let _ = stream.write(response); + let _ = stream.write(&buf[..n]); + let _ = stream.flush(); } } }); let port = rx.recv().unwrap(); - // Connect client let mut stream = std::net::TcpStream::connect(format!("127.0.0.1:{}", port)).expect("Failed to connect"); + stream + .set_read_timeout(Some(Duration::from_millis(1000))) + .unwrap(); let msg = b"Hello from client!"; stream.write(msg).expect("Failed to write"); @@ -76,7 +93,6 @@ fn test_tree_message() { let mut endpoint = EndpointManager::new("endpoint-1"); - // Test GetChildren let response = endpoint .branch_mut() .send_message(serde_json::Value::Null, serde_json::json!("GetChildren")); @@ -84,13 +100,11 @@ fn test_tree_message() { let children = response.as_object().unwrap(); println!(" Children: {:?}", children.keys().collect::>()); - // Test ID access let response = endpoint .branch_mut() .send_message(serde_json::json!("id"), serde_json::Value::Null); println!(" Endpoint ID: {:?}", response); - // Test logs queue let sender = endpoint.logs_sender().clone(); sender.send(serde_json::json!("Test log entry")).unwrap(); @@ -102,24 +116,168 @@ fn test_tree_message() { println!(" ✓ Tree message test passed\n"); } -/// Test TreeMessage serialization +/// Test TreeMessage serialization (new API) fn test_message_serialization() { println!("[Test 3] TreeMessage Serialization"); - let msg = TreeMessage::new_req( - "msg-1", - vec!["endpoint1".to_string(), "shell".to_string()], - "Get", - ); + // Test new API + let msg = TreeMessage::new("query") + .to_target(["endpoint1", "shell"]) + .with_payload(json!({"key": "value"})); let json = serde_json::to_string_pretty(&msg).unwrap(); println!(" Message: {}", json); - // Test response message - let resp = TreeMessage::new_resp("resp-1", "msg-1", serde_json::json!("result data")); + // Test response + let resp = + TreeMessage::new("response").with_payload(json!({"success": true, "result": "data"})); let json = serde_json::to_string_pretty(&resp).unwrap(); println!(" Response: {}", json); + // Test RPC call + let rpc_msg = TreeMessage::new("rpc.call") + .to_target(["components", "tcp-client"]) + .with_payload(json!({ + "method": "connect", + "params": {"address": "127.0.0.1", "port": 8080} + })); + + let json = serde_json::to_string_pretty(&rpc_msg).unwrap(); + println!(" RPC Call: {}", json); + println!(" ✓ Message serialization test passed\n"); } + +/// Test TCP Server with RPC +fn test_tcp_server() { + println!("[Test 4] TCP Server with RPC"); + + let mut server = TcpServer::new("test-server"); + + // Configure + let response = server.send_message( + json!(null), + json!({ + "method": "status" + }), + ); + println!(" Initial status: {:?}", response); + + // Try to start listening + let response = server.send_message( + json!(null), + json!({ + "method": "listen", + "params": { + "bind_address": "127.0.0.1", + "port": 0 + } + }), + ); + println!(" Listen result: {:?}", response); + + // Get status after listening + let response = server.send_message(json!(null), json!({"method": "status"})); + println!(" Status after listen: {:?}", response); + + println!(" ✓ TCP Server test passed\n"); +} + +/// Test TCP Client with RPC +fn test_tcp_client() { + println!("[Test 5] TCP Client with RPC"); + + let mut client = TcpClient::new("test-client"); + + // Get initial status + let response = client.send_message(json!(null), json!({"method": "status"})); + println!(" Initial status: {:?}", response); + + // Try to connect (will fail since no server, but tests the RPC) + let response = client.send_message( + json!(null), + json!({ + "method": "connect", + "params": {"address": "127.0.0.1", "port": 65432} + }), + ); + println!(" Connect result: {:?}", response); + + // Get status after connect attempt + let response = client.send_message(json!(null), json!({"method": "status"})); + println!(" Status after connect: {:?}", response); + + println!(" ✓ TCP Client test passed\n"); +} + +/// Test Protocol stacking +fn test_protocol_stack() { + println!("[Test 6] Protocol Stacking"); + + // Create a stack: base64 -> tcp + let mut stack = ProtocolStack::new(); + stack + .push(&ProtocolConfig::Base64(Default::default())) + .unwrap(); + stack + .push(&ProtocolConfig::Tcp(Default::default())) + .unwrap(); + + println!(" Stack protocols: {:?}", stack.to_configs()); + + // Test encoding/decoding + let test_data = b"Hello, World!"; + let encoded = stack.encode(test_data).unwrap(); + println!( + " Encoded ({} bytes): {:?}", + encoded.len(), + String::from_utf8_lossy(&encoded) + ); + + let decoded = stack.decode(&encoded).unwrap(); + println!(" Decoded: {:?}", String::from_utf8_lossy(&decoded)); + + // Test with HTTP + let mut http_stack = ProtocolStack::new(); + http_stack + .push(&ProtocolConfig::Base64(Default::default())) + .unwrap(); + http_stack + .push(&ProtocolConfig::Http(Default::default())) + .unwrap(); + + let test_data = b"test message"; + let encoded = http_stack.encode(test_data).unwrap(); + println!( + " HTTP+Base64 encoded: {:?}", + String::from_utf8_lossy(&encoded).lines().next() + ); + + println!(" ✓ Protocol stacking test passed\n"); +} + +/// Test Component Registry +fn test_component_registry() { + println!("[Test 7] Component Registry"); + + let mut registry = ComponentRegistry::new(); + + // Create and register a TCP client component + let client = Box::new(TcpClient::new("tcp-client-1")); + registry.register(client).unwrap(); + + // List components + let list = registry.list(); + println!(" Registered components: {:?}", list); + + // Send RPC to component + let result = registry.send_to_component("tcp-client-1", json!({"method": "status"})); + println!(" Component status: {:?}", result); + + // Send RPC via path + let result = registry.send_message(json!("rpc.tcp-client-1"), json!({"method": "status"})); + println!(" Via RPC path: {:?}", result); + + println!(" ✓ Component registry test passed\n"); +}