This commit is contained in:
Michael Mikovsky
2026-02-16 11:21:44 -07:00
parent 02f2f20f9f
commit d99fa340de
14 changed files with 1993 additions and 185 deletions
Generated
+2
View File
@@ -4011,11 +4011,13 @@ checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853"
name = "unshell" name = "unshell"
version = "0.0.0" version = "0.0.0"
dependencies = [ dependencies = [
"base64",
"chrono", "chrono",
"crossbeam-channel", "crossbeam-channel",
"serde", "serde",
"serde_json", "serde_json",
"static_init", "static_init",
"thiserror 2.0.18",
"ush-obfuscate", "ush-obfuscate",
"uuid", "uuid",
] ]
+2
View File
@@ -40,6 +40,8 @@ serde_json = { workspace = true }
uuid = { workspace = true } uuid = { workspace = true }
crossbeam-channel = "0.5.15" crossbeam-channel = "0.5.15"
thiserror = "2.0"
base64 = "0.22"
ush-obfuscate = { path = "./ush-obfuscate" } ush-obfuscate = { path = "./ush-obfuscate" }
static_init.workspace = true static_init.workspace = true
+15
View File
@@ -14,6 +14,21 @@ pub struct Branch {
branch_type: &'static str, branch_type: &'static str,
} }
impl Default for Branch {
fn default() -> Self {
Self::new("default")
}
}
impl std::fmt::Debug for Branch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Branch")
.field("branch_type", &self.branch_type)
.field("children", &self.children.keys().collect::<Vec<_>>())
.finish()
}
}
impl Branch { impl Branch {
pub fn new(branch_type: &'static str) -> Self { pub fn new(branch_type: &'static str) -> Self {
Self { Self {
+163 -36
View File
@@ -31,34 +31,51 @@ impl ComponentWrapper {
pub fn new(component: Box<dyn Component>) -> Self { pub fn new(component: Box<dyn Component>) -> Self {
Self { component } Self { component }
} }
fn handle_rpc(&mut self, payload: &Value) -> Value {
let method = match payload.get("method").and_then(|m| m.as_str()) {
Some(m) => m,
None => return json!({"success": false, "error": "missing method"}),
};
match method {
"status" => self.component.status(),
"shutdown" => match self.component.shutdown() {
Ok(_) => json!({"success": true}),
Err(e) => json!({"success": false, "error": e}),
},
_ => json!({"success": false, "error": format!("unknown method: {}", method)}),
}
}
} }
impl TreeElement for ComponentWrapper { impl TreeElement for ComponentWrapper {
fn get_type(&self) -> Value { fn get_type(&self) -> Value {
serde_json::json!(["component", self.component.name()]) json!(["component", self.component.name()])
} }
fn send_message(&mut self, target: Value, message: Value) -> Value { fn send_message(&mut self, _target: Value, message: Value) -> Value {
match target { // Handle RPC call format
Value::Null => { if let Some(obj) = message.as_object() {
if let Some(cmd) = message.as_str() { if let Some(_) = obj.get("method") {
match cmd { return self.handle_rpc(&message);
"Status" => self.component.status(),
"Init" => {
// Would need config from message
json!({"error": "Init requires config payload"})
}
"Shutdown" => match self.component.shutdown() {
Ok(_) => json!({"success": true}),
Err(e) => json!({"success": false, "error": e}),
},
_ => json!({"error": "Unknown command"}),
}
} else {
json!({"error": "Invalid command"})
}
} }
_ => json!({"error": "Invalid target"}), }
// Legacy string commands
if let Some(cmd) = message.as_str() {
match cmd {
"Status" => self.component.status(),
"Init" => json!({"error": "Init requires config payload"}),
"Shutdown" => match self.component.shutdown() {
Ok(_) => json!({"success": true}),
Err(e) => json!({"success": false, "error": e}),
},
"GetChildren" => json!([self.component.name()]),
_ => json!({"error": "Unknown command"}),
}
} else {
json!({"error": "Invalid command"})
} }
} }
} }
@@ -75,11 +92,9 @@ impl ComponentRegistry {
} }
} }
/// Register a new component (consumes the component)
pub fn register(&mut self, component: Box<dyn Component>) -> Result<(), String> { pub fn register(&mut self, component: Box<dyn Component>) -> Result<(), String> {
let name = component.name().to_string(); let name = component.name().to_string();
// Check if already exists by trying to get it
if self.branch.get_child(&name).is_some() { if self.branch.get_child(&name).is_some() {
return Err(format!("Component '{}' already registered", name)); return Err(format!("Component '{}' already registered", name));
} }
@@ -89,37 +104,63 @@ impl ComponentRegistry {
Ok(()) Ok(())
} }
/// Get a component by name (via branch) pub fn register_element(&mut self, name: impl Into<String>, element: Box<dyn TreeElement>) {
self.branch.add_child(name.into(), element);
}
pub fn get(&mut self, name: &str) -> Option<&mut Box<dyn TreeElement>> { pub fn get(&mut self, name: &str) -> Option<&mut Box<dyn TreeElement>> {
self.branch.get_child(name) self.branch.get_child(name)
} }
/// Remove a component pub fn has(&self, name: &str) -> bool {
pub fn remove(&mut self, name: &str) -> bool { self.branch.children().contains_key(name)
// Note: This is tricky with current Branch API }
// For now, just return false
let _ = name; pub fn remove(&mut self, _name: &str) -> bool {
false false
} }
/// List all component names
pub fn list(&self) -> Vec<String> { pub fn list(&self) -> Vec<String> {
self.branch.children().keys().cloned().collect() self.branch.children().keys().cloned().collect()
} }
/// Get the branch for tree integration
pub fn branch(&self) -> &Branch { pub fn branch(&self) -> &Branch {
&self.branch &self.branch
} }
/// Get mutable branch for tree integration
pub fn branch_mut(&mut self) -> &mut Branch { pub fn branch_mut(&mut self) -> &mut Branch {
&mut self.branch &mut self.branch
} }
/// Shutdown all components pub fn send_to_component(&mut self, component_name: &str, message: Value) -> Value {
pub fn shutdown_all(&mut self) { if let Some(component) = self.branch.get_child(component_name) {
// Would need to iterate through and call shutdown on each component.send_message(json!(null), message)
} else {
let err_msg = format!("Component '{}' not found", component_name);
json!({"error": err_msg})
}
}
pub fn broadcast(&mut self, message: Value) -> Vec<(String, Value)> {
let names: Vec<String> = self.branch.children().keys().cloned().collect();
names
.iter()
.filter_map(|name| {
if let Some(component) = self.branch.get_child(name) {
Some((
name.clone(),
component.send_message(json!(null), message.clone()),
))
} else {
None
}
})
.collect()
}
pub fn shutdown_all(&mut self) -> Vec<(String, Result<(), String>)> {
Vec::new()
} }
} }
@@ -131,10 +172,96 @@ impl Default for ComponentRegistry {
impl TreeElement for ComponentRegistry { impl TreeElement for ComponentRegistry {
fn get_type(&self) -> Value { fn get_type(&self) -> Value {
serde_json::json!("Components") json!("Components")
} }
fn send_message(&mut self, target: Value, message: Value) -> Value { fn send_message(&mut self, target: Value, message: Value) -> Value {
// Handle RPC-style component access
if let Some(target_str) = target.as_str() {
if target_str.starts_with("rpc.") {
let component_name = target_str.strip_prefix("rpc.").unwrap_or(target_str);
return self.send_to_component(component_name, message);
}
}
self.branch.send_message(target, message) self.branch.send_message(target, message)
} }
} }
/// Helper trait for convenient component registration
pub trait IntoComponent: Component + Sized {
fn into_boxed(self) -> Box<dyn Component>;
}
impl<T: Component + Sized + 'static> IntoComponent for T {
fn into_boxed(self) -> Box<dyn Component> {
Box::new(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
struct TestComponent {
name: String,
value: i32,
}
impl TestComponent {
fn new(name: &str) -> Self {
Self {
name: name.to_string(),
value: 0,
}
}
}
impl Component for TestComponent {
fn name(&self) -> &str {
&self.name
}
fn status(&self) -> Value {
json!({"name": self.name, "value": self.value})
}
fn init(&mut self, config: Value) -> Result<(), String> {
if let Some(v) = config.get("value").and_then(|v| v.as_i64()) {
self.value = v as i32;
}
Ok(())
}
fn shutdown(&mut self) -> Result<(), String> {
Ok(())
}
}
#[test]
fn test_component_registry() {
let mut registry = ComponentRegistry::new();
let comp = Box::new(TestComponent::new("test"));
registry.register(comp).unwrap();
assert!(registry.has("test"));
assert!(!registry.has("other"));
let list = registry.list();
assert_eq!(list, vec!["test"]);
}
#[test]
fn test_rpc_call() {
let mut registry = ComponentRegistry::new();
let comp = Box::new(TestComponent::new("test"));
registry.register(comp).unwrap();
let result = registry.send_to_component("test", json!({"method": "status"}));
let obj = result.as_object().unwrap();
assert!(obj.contains_key("name"));
}
}
+186 -62
View File
@@ -2,11 +2,12 @@
//! //!
//! This module defines the message structure used for all tree communications. //! This module defines the message structure used for all tree communications.
//! The format is designed to be simple, extensible, and protocol-agnostic. //! The format is designed to be simple, extensible, and protocol-agnostic.
//! Based on SPEC.md - supports RPC, streams, events, and P2P pivoting.
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
/// Message types for tree communication /// Message types for transport-level distinction
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum MessageType { pub enum MessageType {
@@ -20,96 +21,198 @@ pub enum MessageType {
Stream, Stream,
} }
/// Core message structure for all tree communications impl Default for MessageType {
fn default() -> Self {
Self::Req
}
}
/// Core message structure for all tree communications.
///
/// This structure follows SPEC.md with loose typing for extensibility.
/// All fields are optional except where noted.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TreeMessage { pub struct TreeMessage {
/// Unique identifier for message correlation /// Unique identifier for message correlation
pub id: String, /// Used to match requests with responses
/// Type of message (request, response, event, stream) #[serde(default, skip_serializing_if = "Option::is_none")]
#[serde(rename = "type")] pub id: Option<String>,
pub msg_type: MessageType,
/// Target path in the tree (for routing) /// Origin path for routing responses
#[serde(default)] #[serde(default, skip_serializing_if = "Option::is_none")]
pub target: Vec<String>, pub source: Option<Value>,
/// Action to perform (Get, Set, Invoke, etc.)
#[serde(default)] /// Destination path for routing
pub action: String, #[serde(default, skip_serializing_if = "Option::is_none")]
/// Payload/data for the action pub target: Option<Value>,
/// Operation to perform (e.g., "rpc.call", "stream.data", "event")
/// This is the primary field - must be present
pub action: Value,
/// Data for the action - interpretation depends on action type
#[serde(default)] #[serde(default)]
pub payload: Value, pub payload: Value,
/// P2P/pivoting metadata
#[serde(default, skip_serializing_if = "Option::is_none")]
pub routing: Option<Value>,
/// Extensible metadata (timing, transport hints, etc.)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub meta: Option<Value>,
/// Type wrapper for transport-level distinction
#[serde(default, rename = "type")]
pub msg_type: MessageType,
/// ID of the message this is a response to /// ID of the message this is a response to
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub response_to: Option<String>, pub response_to: Option<String>,
/// Stream ID for streaming communications /// Stream ID for streaming communications
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub stream_id: Option<String>, pub stream_id: Option<String>,
} }
impl TreeMessage { impl TreeMessage {
/// Create a new request message /// Create a new request message with minimal fields
pub fn new_req(id: impl Into<String>, target: Vec<String>, action: impl Into<String>) -> Self { pub fn new(action: impl Into<Value>) -> Self {
Self { Self {
id: id.into(), id: Some(uuid::Uuid::new_v4().to_string()),
msg_type: MessageType::Req, source: None,
target, target: None,
action: action.into(), action: action.into(),
payload: Value::Null, payload: Value::Null,
routing: None,
meta: None,
msg_type: MessageType::Req,
response_to: None, response_to: None,
stream_id: None, stream_id: None,
} }
} }
/// Create a new request with target path
pub fn to_target(mut self, target: impl Into<Value>) -> Self {
self.target = Some(target.into());
self
}
/// Create a new request with source path
pub fn from_source(mut self, source: impl Into<Value>) -> Self {
self.source = Some(source.into());
self
}
/// Create a new request with payload
pub fn with_payload(mut self, payload: impl Into<Value>) -> Self {
self.payload = payload.into();
self
}
/// Create a new request with routing info
pub fn with_routing(mut self, routing: impl Into<Value>) -> Self {
self.routing = Some(routing.into());
self
}
/// Create a new request with metadata
pub fn with_meta(mut self, meta: impl Into<Value>) -> Self {
self.meta = Some(meta.into());
self
}
/// Create a response message /// Create a response message
pub fn new_resp(id: impl Into<String>, response_to: impl Into<String>, payload: Value) -> Self { pub fn response(mut self, response_to: impl Into<String>) -> Self {
Self { self.msg_type = MessageType::Resp;
id: id.into(), self.response_to = Some(response_to.into());
msg_type: MessageType::Resp, self.id = Some(uuid::Uuid::new_v4().to_string());
target: vec![], self
action: String::new(),
payload,
response_to: Some(response_to.into()),
stream_id: None,
}
} }
/// Create an event message /// Create an event message
pub fn new_event(id: impl Into<String>, target: Vec<String>, payload: Value) -> Self { pub fn event(mut self) -> Self {
Self { self.msg_type = MessageType::Event;
id: id.into(), self
msg_type: MessageType::Event,
target,
action: String::new(),
payload,
response_to: None,
stream_id: None,
}
} }
/// Create a stream message /// Create a stream message
pub fn new_stream(id: impl Into<String>, stream_id: impl Into<String>, payload: Value) -> Self { pub fn stream(mut self, stream_id: impl Into<String>) -> Self {
Self { self.msg_type = MessageType::Stream;
id: id.into(), self.stream_id = Some(stream_id.into());
msg_type: MessageType::Stream, self
target: vec![], }
action: String::new(),
payload, /// Check if action matches a pattern (simple string or namespaced)
response_to: None, pub fn action_is(&self, action: &str) -> bool {
stream_id: Some(stream_id.into()), match &self.action {
Value::String(s) => s == action || s.ends_with(&format!(".{}", action)),
_ => false,
} }
} }
/// Get method name from RPC payload
pub fn get_method(&self) -> Option<String> {
self.payload
.get("method")
.and_then(|m| m.as_str())
.map(String::from)
}
/// Get stream channel from payload
pub fn get_channel(&self) -> Option<String> {
self.payload
.get("channel")
.and_then(|c| c.as_str())
.map(String::from)
}
/// Get target as a path vector
pub fn get_target_path(&self) -> Option<Vec<String>> {
self.target.as_ref().and_then(|t| match t {
Value::String(s) => Some(s.split('/').map(String::from).collect()),
Value::Array(arr) => {
let mut path = Vec::new();
for item in arr {
if let Some(s) = item.as_str() {
path.push(s.to_string());
}
}
if path.len() == arr.len() {
Some(path)
} else {
None
}
}
_ => None,
})
}
/// Get source as a path vector
pub fn get_source_path(&self) -> Option<Vec<String>> {
self.source.as_ref().and_then(|s| match s {
Value::String(st) => Some(st.split('/').map(String::from).collect()),
Value::Array(arr) => {
let mut path = Vec::new();
for item in arr {
if let Some(st) = item.as_str() {
path.push(st.to_string());
}
}
if path.len() == arr.len() {
Some(path)
} else {
None
}
}
_ => None,
})
}
} }
impl Default for TreeMessage { impl Default for TreeMessage {
fn default() -> Self { fn default() -> Self {
Self { Self::new("query")
id: uuid::Uuid::new_v4().to_string(),
msg_type: MessageType::Req,
target: vec![],
action: String::new(),
payload: Value::Null,
response_to: None,
stream_id: None,
}
} }
} }
@@ -119,17 +222,38 @@ mod tests {
#[test] #[test]
fn test_message_serialization() { fn test_message_serialization() {
let msg = TreeMessage::new_req("test-1", vec!["a".to_string(), "b".to_string()], "Get"); let msg = TreeMessage::new("query")
.to_target(vec!["a", "b"])
.with_payload(json!({"key": "value"}));
let json = serde_json::to_string(&msg).unwrap(); let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("\"type\":\"req\"")); assert!(json.contains("\"action\":\"query\""));
assert!(json.contains("\"target\":[\"a\",\"b\"]")); assert!(json.contains("\"target\":[\"a\",\"b\"]"));
assert!(json.contains("\"action\":\"Get\""));
} }
#[test] #[test]
fn test_response_message() { fn test_action_matching() {
let msg = TreeMessage::new_resp("resp-1", "test-1", json!("value")); let msg = TreeMessage::new("rpc.call");
assert!(msg.action_is("rpc.call"));
assert!(msg.action_is("call"));
assert!(!msg.action_is("stream.data"));
}
#[test]
fn test_rpc_payload() {
let msg = TreeMessage::new("rpc.call")
.to_target(["components", "tcp-client"])
.with_payload(json!({
"method": "connect",
"params": {"address": "127.0.0.1", "port": 443}
}));
assert_eq!(msg.get_method(), Some("connect".to_string()));
}
#[test]
fn test_response() {
let msg = TreeMessage::new("response").with_payload(json!({"success": true, "result": {}}));
assert_eq!(msg.msg_type, MessageType::Resp); assert_eq!(msg.msg_type, MessageType::Resp);
assert_eq!(msg.response_to, Some("test-1".to_string()));
} }
} }
+1
View File
@@ -9,6 +9,7 @@ pub mod connection;
pub mod endpoint; pub mod endpoint;
pub mod log; pub mod log;
pub mod message; pub mod message;
pub mod protocols;
pub mod queue; pub mod queue;
pub mod readonly; pub mod readonly;
pub mod symbols; pub mod symbols;
+126
View File
@@ -0,0 +1,126 @@
//! Base64 encoding/decoding protocol.
use crate::tree::protocols::stack::{Base64Config, Protocol, ProtocolError};
use serde_json::Value;
/// Base64 encoding protocol
pub struct Base64Protocol {
config: Base64Config,
}
impl Base64Protocol {
pub fn new(config: Base64Config) -> Self {
Self { config }
}
}
impl Protocol for Base64Protocol {
fn name(&self) -> &'static str {
"base64"
}
fn encode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
let encoded = if self.config.url_safe {
base64::Engine::encode(&base64::engine::general_purpose::URL_SAFE_NO_PAD, data)
} else if self.config.padding {
base64::Engine::encode(&base64::engine::general_purpose::STANDARD, data)
} else {
base64::Engine::encode(&base64::engine::general_purpose::URL_SAFE_NO_PAD, data)
};
Ok(encoded.into_bytes())
}
fn decode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
let data_str = String::from_utf8(data.to_vec())
.map_err(|e| ProtocolError::DecodeError(e.to_string()))?;
let decoded = if self.config.url_safe {
base64::Engine::decode(&base64::engine::general_purpose::URL_SAFE_NO_PAD, &data_str)
} else if self.config.padding {
base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &data_str)
} else {
// Try standard first, then URL-safe
base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &data_str).or_else(
|_| {
base64::Engine::decode(
&base64::engine::general_purpose::URL_SAFE_NO_PAD,
&data_str,
)
},
)
};
decoded.map_err(|e| ProtocolError::DecodeError(e.to_string()))
}
fn status(&self) -> Value {
serde_json::json!({
"protocol": "base64",
"url_safe": self.config.url_safe,
"padding": self.config.padding,
})
}
}
/// Identity (pass-through) protocol
pub struct IdentityProtocol;
impl IdentityProtocol {
pub fn new() -> Self {
Self
}
}
impl Default for IdentityProtocol {
fn default() -> Self {
Self::new()
}
}
impl Protocol for IdentityProtocol {
fn name(&self) -> &'static str {
"identity"
}
fn encode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
Ok(data.to_vec())
}
fn decode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
Ok(data.to_vec())
}
fn status(&self) -> Value {
serde_json::json!({
"protocol": "identity",
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_base64_encode() {
let proto = Base64Protocol::new(Default::default());
let data = b"Hello, World!";
let encoded = proto.encode(data).unwrap();
let decoded = proto.decode(&encoded).unwrap();
assert_eq!(decoded, data);
}
#[test]
fn test_base64_url_safe() {
let proto = Base64Protocol::new(Base64Config {
url_safe: true,
padding: false,
});
let data = b"test+/data";
let encoded = proto.encode(data).unwrap();
let encoded_str = String::from_utf8(encoded).unwrap();
assert!(!encoded_str.contains('+'));
assert!(!encoded_str.contains('/'));
}
}
+223
View File
@@ -0,0 +1,223 @@
//! HTTP protocol implementation for tree communication.
//!
//! This protocol wraps data in HTTP requests/responses for traffic blending.
use crate::tree::protocols::stack::{HttpConfig, Protocol, ProtocolError};
use serde_json::Value;
/// HTTP protocol for tree communication.
///
/// Wraps outgoing data in HTTP requests and parses incoming HTTP responses.
pub struct HttpProtocol {
config: HttpConfig,
}
impl HttpProtocol {
pub fn new(config: HttpConfig) -> Self {
Self { config }
}
/// Build HTTP request
fn build_request(&self, body: &[u8]) -> Vec<u8> {
let body_len = body.len();
let body_str = String::from_utf8_lossy(body);
let mut request = format!(
"{} {} HTTP/1.1\r\n\
Host: {}\r\n\
User-Agent: {}\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n",
self.config.method,
self.config.path,
"localhost", // Would be configured in production
self.config.user_agent,
body_len
);
// Add custom headers
for (key, value) in &self.config.headers {
request.push_str(&format!("{}: {}\r\n", key, value));
}
request.push_str("\r\n");
request.push_str(&body_str);
request.into_bytes()
}
/// Parse HTTP response and extract body
fn parse_response(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
let data_str = String::from_utf8(data.to_vec())
.map_err(|e| ProtocolError::DecodeError(e.to_string()))?;
// Find body start (after \r\n\r\n)
let body_start = match data_str.find("\r\n\r\n") {
Some(pos) => pos + 4,
None => {
return Err(ProtocolError::DecodeError(
"Invalid HTTP response".to_string(),
))
}
};
// Extract status code
let status_line = data_str
.split("\r\n")
.next()
.ok_or_else(|| ProtocolError::DecodeError("No status line".to_string()))?;
let status_code: u16 = status_line
.split_whitespace()
.nth(1)
.and_then(|s| s.parse().ok())
.ok_or_else(|| ProtocolError::DecodeError("Invalid status code".to_string()))?;
if status_code < 200 || status_code >= 300 {
return Err(ProtocolError::DecodeError(format!(
"HTTP error: {}",
status_code
)));
}
// Extract body
let body = &data_str[body_start..];
Ok(body.as_bytes().to_vec())
}
}
impl Protocol for HttpProtocol {
fn name(&self) -> &'static str {
"http"
}
fn encode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
Ok(self.build_request(data))
}
fn decode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
self.parse_response(data)
}
fn status(&self) -> Value {
serde_json::json!({
"protocol": "http",
"method": self.config.method,
"path": self.config.path,
"headers": self.config.headers,
"user_agent": self.config.user_agent,
})
}
}
/// HTTP server for receiving tree messages.
///
/// This is a simple implementation for testing - in production you'd
/// use a proper HTTP server.
pub struct HttpServer {
config: HttpConfig,
}
impl HttpServer {
pub fn new(config: HttpConfig) -> Self {
Self { config }
}
/// Parse incoming HTTP request and extract body
pub fn parse_request(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
let data_str = String::from_utf8(data.to_vec())
.map_err(|e| ProtocolError::DecodeError(e.to_string()))?;
// Find body start
let body_start = match data_str.find("\r\n\r\n") {
Some(pos) => pos + 4,
None => {
return Err(ProtocolError::DecodeError(
"Invalid HTTP request".to_string(),
))
}
};
// Extract Content-Length
let mut content_length = 0;
for line in data_str.lines() {
if line.to_lowercase().starts_with("content-length:") {
content_length = line
.split(':')
.nth(1)
.and_then(|s| s.trim().parse().ok())
.unwrap_or(0);
break;
}
}
let body = &data_str[body_start..];
if body.len() >= content_length {
Ok(body[..content_length].as_bytes().to_vec())
} else {
Ok(body.as_bytes().to_vec())
}
}
/// Build HTTP response
pub fn build_response(&self, body: &[u8], status: u16) -> Vec<u8> {
let body_len = body.len();
let body_str = String::from_utf8_lossy(body);
let status_text = match status {
200 => "OK",
400 => "Bad Request",
404 => "Not Found",
500 => "Internal Server Error",
_ => "Unknown",
};
format!(
"HTTP/1.1 {} {}\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n\
Connection: close\r\n\
\r\n\
{}",
status, status_text, body_len, body_str
)
.into_bytes()
}
}
impl Default for HttpServer {
fn default() -> Self {
Self::new(Default::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_http_encode_decode() {
let proto = HttpProtocol::new(Default::default());
let data = r#"{"action": "test", "data": "hello"}"#.as_bytes();
let encoded = proto.encode(data).unwrap();
// Build a valid response
let response = b"HTTP/1.1 200 OK\r\nContent-Length: 29\r\n\r\n{\"action\": \"test\", \"data\": \"hello\"}";
let decoded = proto.decode(response).unwrap();
assert_eq!(decoded, data);
}
#[test]
fn test_request_building() {
let server = HttpServer::new(Default::default());
let body = r#"{"test": "data"}"#.as_bytes();
let request = server.build_request(body);
let request_str = String::from_utf8(request).unwrap();
assert!(request_str.contains("POST / HTTP/1.1"));
assert!(request_str.contains("Content-Length: 16"));
}
}
+42
View File
@@ -0,0 +1,42 @@
//! Protocol stacking system for extensible network communication.
//!
//! This module provides a way to layer multiple protocols on top of each other,
//! similar to a network stack. Each protocol can encode/decode data from the layer below.
//!
//! # Architecture
//!
//! Each protocol implements the `Protocol` trait, defining:
//! - How to encode data going "out" (to the network)
//! - How to decode data coming "in" (from the network)
//! - Configuration for the protocol
//!
//! # Usage
//!
//! ```rust
//! use tree::protocols::{Protocol, ProtocolStack, ProtocolConfig};
//! use serde_json::json;
//!
//! // Create a stack: base64 -> http -> tcp
//! let stack: ProtocolStack = vec![
//! ProtocolConfig::Base64(json!({})),
//! ProtocolConfig::Http(json!({
//! "method": "POST",
//! "path": "/api/data"
//! })),
//! ];
//!
//! // Encode outgoing message
//! let encoded = stack.encode(&json!({"action": "test"}))?;
//!
//! // Decode incoming data
//! let decoded = stack.decode(&encoded)?;
//! ```
pub mod base64;
pub mod http;
pub mod stack;
pub use stack::{
Base64Config, HttpConfig, Protocol, ProtocolConfig, ProtocolError, ProtocolStack, TcpConfig,
WebSocketConfig,
};
+503
View File
@@ -0,0 +1,503 @@
//! Protocol stack implementation for layered network communication.
//!
//! The stack processes protocols from outermost (closest to app) to innermost (closest to network).
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
use crate::tree::message::TreeMessage;
#[derive(Error, Debug)]
pub enum ProtocolError {
#[error("Encoding failed: {0}")]
EncodeError(String),
#[error("Decoding failed: {0}")]
DecodeError(String),
#[error("Invalid configuration: {0}")]
ConfigError(String),
#[error("Protocol not found: {0}")]
NotFound(String),
}
/// Core trait for protocol implementations.
///
/// Each protocol can:
/// - Encode: Transform data going outward (app -> network)
/// - Decode: Transform data coming inward (network -> app)
pub trait Protocol: Send + Sync {
/// Unique name for this protocol
fn name(&self) -> &'static str;
/// Encode data going outward (toward network)
fn encode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError>;
/// Decode data coming inward (from network)
fn decode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError>;
/// Get protocol status/info
fn status(&self) -> Value;
}
/// Configuration for a single protocol layer.
///
/// This allows protocols to be configured dynamically via JSON.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "protocol", rename_all = "snake_case")]
pub enum ProtocolConfig {
/// No-op pass-through protocol
Identity,
/// Base64 encoding
Base64(Base64Config),
/// HTTP protocol
Http(HttpConfig),
/// TCP raw protocol
Tcp(TcpConfig),
/// WebSocket protocol
WebSocket(WebSocketConfig),
/// Custom protocol (for future extensions)
Custom { name: String, config: Value },
}
/// Base64 encoding configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Base64Config {
/// Use URL-safe base64 variant
#[serde(default)]
pub url_safe: bool,
/// Add padding
#[serde(default = "default_true")]
pub padding: bool,
}
fn default_true() -> bool {
true
}
impl Default for Base64Config {
fn default() -> Self {
Self {
url_safe: false,
padding: true,
}
}
}
/// HTTP protocol configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpConfig {
/// HTTP method
#[serde(default = "default_post")]
pub method: String,
/// Request path
#[serde(default)]
pub path: String,
/// Headers to add
#[serde(default)]
pub headers: std::collections::HashMap<String, String>,
/// User agent
#[serde(default)]
pub user_agent: String,
}
fn default_post() -> String {
"POST".to_string()
}
impl Default for HttpConfig {
fn default() -> Self {
Self {
method: "POST".to_string(),
path: "/".to_string(),
headers: std::collections::HashMap::new(),
user_agent: "TreeProtocol/1.0".to_string(),
}
}
}
/// TCP raw protocol configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TcpConfig {
/// Delimiter for message framing
#[serde(default)]
pub delimiter: String,
/// Include length prefix
#[serde(default)]
pub length_prefix: bool,
}
impl Default for TcpConfig {
fn default() -> Self {
Self {
delimiter: "\n".to_string(),
length_prefix: false,
}
}
}
/// WebSocket configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebSocketConfig {
/// WebSocket subprotocol
#[serde(default)]
pub subprotocol: Option<String>,
/// Path for WS connection
#[serde(default)]
pub path: String,
}
impl Default for WebSocketConfig {
fn default() -> Self {
Self {
subprotocol: None,
path: "/".to_string(),
}
}
}
/// A stack of protocols to process data through.
///
/// Data flows through the stack:
/// - Encoding: App -> Protocol N -> ... -> Protocol 1 -> Network
/// - Decoding: Network -> Protocol 1 -> ... -> Protocol N -> App
pub struct ProtocolStack {
/// Stack of protocols (outermost first for encoding)
protocols: Vec<Box<dyn Protocol>>,
/// Configuration order (for serialization)
config_order: Vec<String>,
}
impl Clone for ProtocolStack {
fn clone(&self) -> Self {
Self {
protocols: Vec::new(), // Can't clone protocols
config_order: self.config_order.clone(),
}
}
}
impl std::fmt::Debug for ProtocolStack {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProtocolStack")
.field("config_order", &self.config_order)
.field("protocols", &self.protocols.len())
.finish()
}
}
impl ProtocolStack {
/// Create a new empty stack
pub fn new() -> Self {
Self {
protocols: Vec::new(),
config_order: Vec::new(),
}
}
/// Create a stack from configurations
pub fn from_configs(configs: &[ProtocolConfig]) -> Result<Self, ProtocolError> {
let mut stack = Self::new();
for config in configs {
stack.push(config)?;
}
Ok(stack)
}
/// Add a protocol to the stack (outermost position)
pub fn push(&mut self, config: &ProtocolConfig) -> Result<(), ProtocolError> {
let (protocol, name) = match config {
ProtocolConfig::Identity => {
let p = crate::tree::protocols::base64::IdentityProtocol::new();
(Box::new(p) as Box<dyn Protocol>, "identity".to_string())
}
ProtocolConfig::Base64(cfg) => {
let p = crate::tree::protocols::base64::Base64Protocol::new(cfg.clone());
(Box::new(p) as Box<dyn Protocol>, "base64".to_string())
}
ProtocolConfig::Http(cfg) => {
let p = crate::tree::protocols::http::HttpProtocol::new(cfg.clone());
(Box::new(p) as Box<dyn Protocol>, "http".to_string())
}
ProtocolConfig::Tcp(cfg) => {
let p = TcpProtocol::new(cfg.clone());
(Box::new(p) as Box<dyn Protocol>, "tcp".to_string())
}
ProtocolConfig::WebSocket(cfg) => {
let p = WebSocketProtocol::new(cfg.clone());
(Box::new(p) as Box<dyn Protocol>, "websocket".to_string())
}
ProtocolConfig::Custom { name, config } => {
return Err(ProtocolError::NotFound(format!(
"Custom protocol '{}' not implemented",
name
)));
}
};
self.config_order.push(name);
self.protocols.push(protocol);
Ok(())
}
/// Remove the outermost protocol
pub fn pop(&mut self) -> Option<Box<dyn Protocol>> {
self.config_order.pop()?;
self.protocols.pop()
}
/// Get number of protocols in stack
pub fn len(&self) -> usize {
self.protocols.len()
}
/// Check if stack is empty
pub fn is_empty(&self) -> bool {
self.protocols.is_empty()
}
/// Encode data through the entire stack (app -> network)
pub fn encode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
let mut result = data.to_vec();
for protocol in self.protocols.iter() {
result = protocol.encode(&result)?;
}
Ok(result)
}
/// Decode data through the entire stack (network -> app)
pub fn decode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
let mut result = data.to_vec();
// Decode in reverse order (innermost to outermost)
for protocol in self.protocols.iter().rev() {
result = protocol.decode(&result)?;
}
Ok(result)
}
/// Encode a TreeMessage through the stack
pub fn encode_message(&self, message: &TreeMessage) -> Result<Vec<u8>, ProtocolError> {
let json =
serde_json::to_vec(message).map_err(|e| ProtocolError::EncodeError(e.to_string()))?;
self.encode(&json)
}
/// Decode data into a TreeMessage
pub fn decode_message(&self, data: &[u8]) -> Result<TreeMessage, ProtocolError> {
let decoded = self.decode(data)?;
serde_json::from_slice(&decoded).map_err(|e| ProtocolError::DecodeError(e.to_string()))
}
/// Get status of all protocols in stack
pub fn status(&self) -> Vec<Value> {
self.protocols.iter().map(|p| p.status()).collect()
}
/// Get the configuration for serialization
pub fn to_configs(&self) -> Vec<ProtocolConfig> {
self.config_order
.iter()
.enumerate()
.filter_map(|(_, name)| {
// This is simplified - in production you'd store configs
Some(match name.as_str() {
"identity" => ProtocolConfig::Identity,
"base64" => ProtocolConfig::Base64(Default::default()),
"http" => ProtocolConfig::Http(Default::default()),
"tcp" => ProtocolConfig::Tcp(Default::default()),
"websocket" => ProtocolConfig::WebSocket(Default::default()),
_ => return None,
})
})
.collect()
}
}
impl Default for ProtocolStack {
fn default() -> Self {
Self::new()
}
}
/// TCP protocol implementation (simple framing)
pub struct TcpProtocol {
config: TcpConfig,
}
impl TcpProtocol {
pub fn new(config: TcpConfig) -> Self {
Self { config }
}
}
impl Protocol for TcpProtocol {
fn name(&self) -> &'static str {
"tcp"
}
fn encode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
let mut result = Vec::new();
if self.config.length_prefix {
let len = (data.len() as u32).to_be_bytes();
result.extend_from_slice(&len);
}
result.extend_from_slice(data);
if !self.config.length_prefix && !self.config.delimiter.is_empty() {
result.extend_from_slice(self.config.delimiter.as_bytes());
}
Ok(result)
}
fn decode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
let mut result = data.to_vec();
// Remove delimiter if present
if !self.config.delimiter.is_empty() {
if let Some(pos) = result
.iter()
.position(|&b| self.config.delimiter.as_bytes().contains(&b))
{
result.truncate(pos);
}
}
// If length prefix, skip it
if self.config.length_prefix && result.len() >= 4 {
let len = u32::from_be_bytes([result[0], result[1], result[2], result[3]]) as usize;
if result.len() >= 4 + len {
result = result[4..4 + len].to_vec();
}
}
Ok(result)
}
fn status(&self) -> Value {
serde_json::json!({
"protocol": "tcp",
"delimiter": self.config.delimiter,
"length_prefix": self.config.length_prefix,
})
}
}
/// WebSocket protocol implementation (simplified)
pub struct WebSocketProtocol {
config: WebSocketConfig,
}
impl WebSocketProtocol {
pub fn new(config: WebSocketConfig) -> Self {
Self { config }
}
}
impl Protocol for WebSocketProtocol {
fn name(&self) -> &'static str {
"websocket"
}
fn encode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
// Simple WebSocket text frame: FIN(1) + opcode(1) + length(2) + data
let mut frame = vec![0x81]; // FIN + text opcode
let len = data.len();
if len < 126 {
frame.push(len as u8);
} else if len < 65536 {
frame.push(126);
frame.extend_from_slice(&(len as u16).to_be_bytes());
} else {
frame.push(127);
frame.extend_from_slice(&(len as u64).to_be_bytes());
}
frame.extend_from_slice(data);
Ok(frame)
}
fn decode(&self, data: &[u8]) -> Result<Vec<u8>, ProtocolError> {
if data.len() < 2 {
return Err(ProtocolError::DecodeError("Frame too short".to_string()));
}
let opcode = data[0] & 0x0f;
if opcode == 0x08 {
// Close frame
return Err(ProtocolError::DecodeError("Connection closed".to_string()));
}
let len = data[1] & 0x7f;
let header_len = match len {
126 => 4,
127 => 10,
_ => 2,
};
if data.len() > header_len {
Ok(data[header_len..].to_vec())
} else {
Err(ProtocolError::DecodeError("Incomplete frame".to_string()))
}
}
fn status(&self) -> Value {
serde_json::json!({
"protocol": "websocket",
"path": self.config.path,
"subprotocol": self.config.subprotocol,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_base64_stack() {
let mut stack = ProtocolStack::new();
stack
.push(&ProtocolConfig::Base64(Default::default()))
.unwrap();
let data = b"hello world";
let encoded = stack.encode(data).unwrap();
let decoded = stack.decode(&encoded).unwrap();
assert_eq!(decoded, data);
}
#[test]
fn test_multi_layer_stack() {
let mut stack = ProtocolStack::new();
stack
.push(&ProtocolConfig::Base64(Default::default()))
.unwrap();
stack
.push(&ProtocolConfig::Tcp(Default::default()))
.unwrap();
let data = b"test message";
let encoded = stack.encode(data).unwrap();
let decoded = stack.decode(&encoded).unwrap();
assert_eq!(decoded, data);
}
#[test]
fn test_http_config() {
let config = HttpConfig {
method: "POST".to_string(),
path: "/api/test".to_string(),
headers: std::collections::HashMap::new(),
user_agent: "Test/1.0".to_string(),
};
let mut stack = ProtocolStack::new();
stack.push(&ProtocolConfig::Http(config)).unwrap();
assert_eq!(stack.len(), 1);
}
}
+234 -40
View File
@@ -1,47 +1,85 @@
//! TCP Client component for outbound connections. //! TCP Client component for outbound connections.
//! //!
//! Provides a TreeElement for managing TCP client connections with //! Provides a TreeElement for managing TCP client connections with
//! configuration, status queries, and reconnection support. //! configuration, status queries, reconnection support, and protocol stacking.
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::TcpStream; use std::net::TcpStream;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value}; use serde_json::{json, Value};
use crate::tree::component::Component; use crate::tree::component::Component;
use crate::tree::message::TreeMessage;
use crate::tree::protocols::{ProtocolConfig, ProtocolStack};
use crate::tree::symbols; use crate::tree::symbols;
use crate::tree::tcp::config::{ConnectionStatus, TcpClientConfig}; use crate::tree::tcp::config::{ConnectionStatus, TcpClientConfig};
use crate::tree::{Branch, TreeElement}; use crate::tree::{Branch, TreeElement};
/// TCP Client component /// TCP Client component with protocol stacking support.
///
/// This component can:
/// - Connect to remote TCP servers
/// - Apply protocol stacks (base64, http, etc.)
/// - Send/receive messages via RPC
/// - Auto-reconnect on failure
#[derive(Debug, Serialize, Deserialize)]
pub struct TcpClient { pub struct TcpClient {
name: String, /// Unique name for this client
config: TcpClientConfig, pub name: String,
/// Connection configuration
pub config: TcpClientConfig,
/// Protocol stack configuration
#[serde(default)]
pub protocols: Vec<ProtocolConfig>,
/// Current connection status
#[serde(skip)]
status: ConnectionStatus, status: ConnectionStatus,
/// Active TCP stream
#[serde(skip)]
stream: Option<Arc<Mutex<TcpStream>>>, stream: Option<Arc<Mutex<TcpStream>>>,
/// Protocol stack (runtime)
#[serde(skip)]
protocol_stack: ProtocolStack,
/// Internal tree structure
#[serde(skip)]
branch: Branch, branch: Branch,
} }
impl TcpClient { impl TcpClient {
/// Create a new TCP client with default settings
pub fn new(name: impl Into<String>) -> Self { pub fn new(name: impl Into<String>) -> Self {
let name = name.into(); Self::with_config(name, TcpClientConfig::default())
let mut branch = Branch::new("TCPClient"); }
// Add internal state branch /// Create a new TCP client with custom configuration
pub fn with_config(name: impl Into<String>, config: TcpClientConfig) -> Self {
let name = name.into();
let mut branch = Branch::new("TCPClient");
let state_branch = Branch::new("state"); let state_branch = Branch::new("state");
branch.add_child("state", Box::new(state_branch)); branch.add_child("state", Box::new(state_branch));
Self { Self {
name: name.clone(), name: name.clone(),
config: TcpClientConfig::default(), config,
protocols: Vec::new(),
status: ConnectionStatus::disconnected(), status: ConnectionStatus::disconnected(),
stream: None, stream: None,
protocol_stack: ProtocolStack::new(),
branch, branch,
} }
} }
/// Set protocol stack configuration
pub fn set_protocols(&mut self, protocols: Vec<ProtocolConfig>) -> Result<(), String> {
self.protocols = protocols.clone();
self.protocol_stack = ProtocolStack::from_configs(&protocols).map_err(|e| e.to_string())?;
Ok(())
}
/// Connect to the configured address /// Connect to the configured address
pub fn connect(&mut self) -> Result<(), String> { pub fn connect(&mut self) -> Result<(), String> {
let addr = format!("{}:{}", self.config.address, self.config.port); let addr = format!("{}:{}", self.config.address, self.config.port);
@@ -85,32 +123,66 @@ impl TcpClient {
self.status.connected self.status.connected
} }
/// Send data over the connection /// Send raw data over the connection
pub fn send(&mut self, data: &[u8]) -> Result<usize, String> { pub fn send_raw(&mut self, data: &[u8]) -> Result<usize, String> {
let stream = self.stream.as_ref().ok_or("Not connected")?; let stream = self.stream.as_ref().ok_or("Not connected")?;
let mut stream = stream.lock().map_err(|e| format!("Lock failed: {}", e))?; let mut stream = stream.lock().map_err(|e| format!("Lock failed: {}", e))?;
stream stream
.write(data) .write(data)
.map_err(|e| format!("Write failed: {}", e)) .map_err(|e| format!("Write failed: {}", e))
} }
/// Receive data from the connection /// Receive raw data from the connection
pub fn recv(&mut self, buffer_size: usize) -> Result<Vec<u8>, String> { pub fn recv_raw(&mut self, buffer_size: usize) -> Result<Vec<u8>, String> {
let stream = self.stream.as_ref().ok_or("Not connected")?; let stream = self.stream.as_ref().ok_or("Not connected")?;
let mut stream = stream.lock().map_err(|e| format!("Lock failed: {}", e))?; let mut stream = stream.lock().map_err(|e| format!("Lock failed: {}", e))?;
let mut buffer = vec![0u8; buffer_size]; let mut buffer = vec![0u8; buffer_size];
let n = stream let n = stream
.read(&mut buffer) .read(&mut buffer)
.map_err(|e| format!("Read failed: {}", e))?; .map_err(|e| format!("Read failed: {}", e))?;
buffer.truncate(n); buffer.truncate(n);
Ok(buffer) Ok(buffer)
} }
/// Send a TreeMessage through the protocol stack
pub fn send_message_raw(&mut self, message: &TreeMessage) -> Result<(), String> {
let encoded = self
.protocol_stack
.encode_message(message)
.map_err(|e| format!("Encoding failed: {}", e))?;
self.send_raw(&encoded)?;
Ok(())
}
/// Receive and decode a TreeMessage
pub fn recv_message(&mut self, buffer_size: usize) -> Result<TreeMessage, String> {
let data = self.recv_raw(buffer_size)?;
self.protocol_stack
.decode_message(&data)
.map_err(|e| format!("Decoding failed: {}", e))
}
/// Send and wait for response (RPC pattern)
pub fn rpc_call(&mut self, message: &TreeMessage) -> Result<TreeMessage, String> {
let id = message.id.clone();
self.send_message_raw(message)?;
// Simple blocking receive - in production would have timeout
let response = self.recv_message(4096)?;
// Verify it's a response to our message
if let Some(response_to) = &response.response_to {
if response_to != id.as_deref().unwrap_or("") {
return Err("Response ID mismatch".to_string());
}
}
Ok(response)
}
/// Get current configuration /// Get current configuration
pub fn config(&self) -> &TcpClientConfig { pub fn config(&self) -> &TcpClientConfig {
&self.config &self.config
@@ -127,12 +199,87 @@ impl TcpClient {
"connected": self.status.connected, "connected": self.status.connected,
"remote_address": self.status.remote_address, "remote_address": self.status.remote_address,
"local_address": self.status.local_address, "local_address": self.status.local_address,
"config": { "bytes_sent": self.status.bytes_sent,
"address": self.config.address, "bytes_received": self.status.bytes_received,
"port": self.config.port, "config": self.config,
} "protocols": self.protocol_stack.to_configs(),
}) })
} }
/// Handle RPC call from message
fn handle_rpc(&mut self, payload: &Value) -> Value {
let method = match payload.get("method").and_then(|m| m.as_str()) {
Some(m) => m,
None => return json!({"success": false, "error": "missing method"}),
};
let params = payload.get("params").cloned().unwrap_or(Value::Null);
match method {
"connect" => {
// Allow override of address/port
if let Some(addr) = params.get("address").and_then(|a| a.as_str()) {
self.config.address = addr.to_string();
}
if let Some(port) = params.get("port").and_then(|p| p.as_u64()) {
self.config.port = port as u16;
}
match self.connect() {
Ok(_) => json!({"success": true, "status": self.status}),
Err(e) => json!({"success": false, "error": e}),
}
}
"disconnect" => match self.disconnect() {
Ok(_) => json!({"success": true}),
Err(e) => json!({"success": false, "error": e}),
},
"send" => {
let data = params
.get("data")
.and_then(|d| d.as_str())
.map(|s| s.as_bytes().to_vec());
match data {
Some(data) => match self.send_raw(&data) {
Ok(n) => json!({"success": true, "bytes_sent": n}),
Err(e) => json!({"success": false, "error": e}),
},
None => json!({"success": false, "error": "missing data"}),
}
}
"recv" => {
let size = params
.get("size")
.and_then(|s| s.as_u64())
.map(|s| s as usize)
.unwrap_or(4096);
match self.recv_raw(size) {
Ok(data) => json!({
"success": true,
"data": String::from_utf8_lossy(&data),
"bytes": data.len()
}),
Err(e) => json!({"success": false, "error": e}),
}
}
"status" => self.get_status(),
"set_protocols" => {
if let Some(protocols) = params.get("protocols") {
match serde_json::from_value(protocols.clone()) {
Ok(p) => match self.set_protocols(p) {
Ok(_) => json!({"success": true}),
Err(e) => json!({"success": false, "error": e}),
},
Err(e) => json!({"success": false, "error": e.to_string()}),
}
} else {
json!({"success": false, "error": "missing protocols"})
}
}
_ => json!({"success": false, "error": format!("unknown method: {}", method)}),
}
}
} }
impl Component for TcpClient { impl Component for TcpClient {
@@ -145,8 +292,21 @@ impl Component for TcpClient {
} }
fn init(&mut self, config: Value) -> Result<(), String> { fn init(&mut self, config: Value) -> Result<(), String> {
self.config = // Support both legacy config and new format
serde_json::from_value(config).map_err(|e| format!("Invalid config: {}", e))?; if let Some(client_config) = config.get("config") {
self.config = serde_json::from_value(client_config.clone())
.map_err(|e| format!("Invalid config: {}", e))?;
} else {
self.config = serde_json::from_value(config.clone())
.map_err(|e| format!("Invalid config: {}", e))?;
}
if let Some(protocols) = config.get("protocols") {
let p: Vec<ProtocolConfig> = serde_json::from_value(protocols.clone())
.map_err(|e| format!("Invalid protocols: {}", e))?;
self.set_protocols(p)?;
}
self.connect()?; self.connect()?;
Ok(()) Ok(())
} }
@@ -158,12 +318,21 @@ impl Component for TcpClient {
impl TreeElement for TcpClient { impl TreeElement for TcpClient {
fn get_type(&self) -> Value { fn get_type(&self) -> Value {
json!("TCPClient") json!({
"type": "TCPClient",
"name": self.name,
})
} }
fn send_message(&mut self, target: Value, message: Value) -> Value { fn send_message(&mut self, target: Value, message: Value) -> Value {
match target { match target {
Value::Null => { Value::Null => {
// Check for RPC call format
if message.get("method").is_some() {
return self.handle_rpc(&message);
}
// Legacy string commands
if let Some(cmd) = message.as_str() { if let Some(cmd) = message.as_str() {
match cmd { match cmd {
"Connect" => match self.connect() { "Connect" => match self.connect() {
@@ -196,6 +365,9 @@ impl TreeElement for TcpClient {
} }
Err(e) => json!({"success": false, "error": e.to_string()}), Err(e) => json!({"success": false, "error": e.to_string()}),
} }
} else if obj.get("method").is_some() {
let payload = Value::Object(obj.clone());
self.handle_rpc(&payload)
} else { } else {
json!(symbols::ERR_INVALID_COMMAND) json!(symbols::ERR_INVALID_COMMAND)
} }
@@ -203,23 +375,45 @@ impl TreeElement for TcpClient {
json!(symbols::ERR_INVALID_COMMAND) json!(symbols::ERR_INVALID_COMMAND)
} }
} }
Value::String(subtarget) => { Value::String(subtarget) => match subtarget.as_str() {
match subtarget.as_str() { "config" => json!(self.config),
"config" => { "state" => json!({
// Return or modify configuration "connected": self.status.connected,
json!(self.config) "remote": self.status.remote_address,
} }),
"state" => { "protocols" => json!(self.protocol_stack.to_configs()),
// Return connection state _ => json!(symbols::ERR_CHILD_NOT_FOUND),
json!({ },
"connected": self.status.connected,
"remote": self.status.remote_address,
})
}
_ => json!(symbols::ERR_CHILD_NOT_FOUND),
}
}
_ => json!(symbols::ERR_INVALID_TARGET), _ => json!(symbols::ERR_INVALID_TARGET),
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_client_creation() {
let client = TcpClient::new("test-client");
assert_eq!(client.name(), "test-client");
assert!(!client.is_connected());
}
#[test]
fn test_config_serialization() {
let client = TcpClient::with_config("test", TcpClientConfig::new("127.0.0.1", 8080));
let json = serde_json::to_string(&client).unwrap();
assert!(json.contains("test"));
assert!(json.contains("127.0.0.1"));
}
#[test]
fn test_rpc_status() {
let mut client = TcpClient::new("test");
let result = client.send_message(json!(null), json!({"method": "status"}));
let obj = result.as_object().unwrap();
assert!(obj.contains_key("connected"));
}
}
+2 -2
View File
@@ -98,7 +98,7 @@ impl TcpServerConfig {
} }
/// Connection status information /// Connection status information
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ConnectionStatus { pub struct ConnectionStatus {
pub connected: bool, pub connected: bool,
pub remote_address: Option<String>, pub remote_address: Option<String>,
@@ -138,7 +138,7 @@ impl ConnectionStatus {
} }
/// Server listener status /// Server listener status
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ListenerStatus { pub struct ListenerStatus {
pub listening: bool, pub listening: bool,
pub bind_address: String, pub bind_address: String,
+311 -20
View File
@@ -1,7 +1,7 @@
//! TCP Server component for inbound connections. //! TCP Server component for inbound connections.
//! //!
//! Provides a TreeElement for managing TCP server listeners with //! Provides a TreeElement for managing TCP server listeners with
//! configuration, status queries, and connection management. //! configuration, status queries, connection management, and protocol stacking.
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Read, Write}; use std::io::{Read, Write};
@@ -9,41 +9,52 @@ use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value}; use serde_json::{json, Value};
use crate::tree::component::Component; use crate::tree::component::Component;
use crate::tree::message::TreeMessage;
use crate::tree::protocols::{ProtocolConfig, ProtocolStack};
use crate::tree::symbols; use crate::tree::symbols;
use crate::tree::tcp::config::{ListenerStatus, TcpServerConfig}; use crate::tree::tcp::config::{ListenerStatus, TcpServerConfig};
use crate::tree::{Branch, TreeElement}; use crate::tree::{Branch, TreeElement};
/// A connected client managed by the server /// A connected client managed by the server
struct ManagedClient { #[derive(Debug)]
id: String, pub struct ManagedClient {
pub id: String,
stream: TcpStream, stream: TcpStream,
peer_addr: String, peer_addr: String,
local_addr: String,
} }
impl ManagedClient { impl ManagedClient {
fn new(id: String, stream: TcpStream) -> Self { pub fn new(id: String, stream: TcpStream) -> Self {
let peer_addr = stream let peer_addr = stream
.peer_addr() .peer_addr()
.map(|a| a.to_string()) .map(|a| a.to_string())
.unwrap_or_else(|_| "unknown".to_string()); .unwrap_or_else(|_| "unknown".to_string());
let local_addr = stream
.local_addr()
.map(|a| a.to_string())
.unwrap_or_else(|_| "unknown".to_string());
Self { Self {
id, id,
stream, stream,
peer_addr, peer_addr,
local_addr,
} }
} }
fn send(&mut self, data: &[u8]) -> Result<usize, String> { pub fn send(&mut self, data: &[u8]) -> Result<usize, String> {
self.stream self.stream
.write(data) .write(data)
.map_err(|e| format!("Write failed: {}", e)) .map_err(|e| format!("Write failed: {}", e))
} }
fn recv(&mut self, buffer_size: usize) -> Result<Vec<u8>, String> { pub fn recv(&mut self, buffer_size: usize) -> Result<Vec<u8>, String> {
let mut buffer = vec![0u8; buffer_size]; let mut buffer = vec![0u8; buffer_size];
let _ = self.stream.set_read_timeout(Some(Duration::from_secs(1))); let _ = self.stream.set_read_timeout(Some(Duration::from_secs(1)));
@@ -58,37 +69,86 @@ impl ManagedClient {
} }
} }
fn peer_address(&self) -> &str { pub fn peer_address(&self) -> &str {
&self.peer_addr &self.peer_addr
} }
pub fn local_address(&self) -> &str {
&self.local_addr
}
pub fn set_nonblocking(&mut self, nonblocking: bool) -> Result<(), String> {
self.stream
.set_nonblocking(nonblocking)
.map_err(|e| format!("Failed to set non-blocking: {}", e))
}
} }
/// TCP Server component /// TCP Server component with protocol stacking support.
///
/// This component can:
/// - Listen for incoming TCP connections
/// - Manage multiple concurrent connections
/// - Apply protocol stacks to connections
/// - Send/receive messages via RPC
#[derive(Debug, Serialize, Deserialize)]
pub struct TcpServer { pub struct TcpServer {
name: String, /// Unique name for this server
config: TcpServerConfig, pub name: String,
/// Server configuration
pub config: TcpServerConfig,
/// Protocol stack for incoming connections
#[serde(default)]
pub protocols: Vec<ProtocolConfig>,
/// Current listener status
#[serde(skip)]
status: ListenerStatus, status: ListenerStatus,
/// TCP listener (runtime only)
#[serde(skip)]
listener: Option<TcpListener>, listener: Option<TcpListener>,
/// Active clients
#[serde(skip)]
clients: HashMap<String, Arc<Mutex<ManagedClient>>>, clients: HashMap<String, Arc<Mutex<ManagedClient>>>,
/// Protocol stacks per client
#[serde(skip)]
client_protocols: HashMap<String, ProtocolStack>,
/// Total connections since start
total_connections: u64, total_connections: u64,
/// Internal tree structure
#[serde(skip)]
branch: Branch, branch: Branch,
} }
impl TcpServer { impl TcpServer {
/// Create a new TCP server with default settings
pub fn new(name: impl Into<String>) -> Self { pub fn new(name: impl Into<String>) -> Self {
Self::with_config(name, TcpServerConfig::default())
}
/// Create a new TCP server with custom configuration
pub fn with_config(name: impl Into<String>, config: TcpServerConfig) -> Self {
let name = name.into(); let name = name.into();
Self { Self {
name: name.clone(), name: name.clone(),
config: TcpServerConfig::default(), config,
protocols: Vec::new(),
status: ListenerStatus::stopped("0.0.0.0", 0), status: ListenerStatus::stopped("0.0.0.0", 0),
listener: None, listener: None,
clients: HashMap::new(), clients: HashMap::new(),
client_protocols: HashMap::new(),
total_connections: 0, total_connections: 0,
branch: Branch::new("TCPServer"), branch: Branch::new("TCPServer"),
} }
} }
/// Set protocol stack configuration
pub fn set_protocols(&mut self, protocols: Vec<ProtocolConfig>) -> Result<(), String> {
self.protocols = protocols.clone();
// Don't rebuild client_protocols here - each client gets its own stack
Ok(())
}
/// Start listening for connections /// Start listening for connections
pub fn listen(&mut self) -> Result<(), String> { pub fn listen(&mut self) -> Result<(), String> {
let addr = format!("{}:{}", self.config.bind_address, self.config.port); let addr = format!("{}:{}", self.config.bind_address, self.config.port);
@@ -114,6 +174,7 @@ impl TcpServer {
pub fn stop(&mut self) -> Result<(), String> { pub fn stop(&mut self) -> Result<(), String> {
self.listener = None; self.listener = None;
self.clients.clear(); self.clients.clear();
self.client_protocols.clear();
self.status = ListenerStatus::stopped(&self.config.bind_address, self.config.port); self.status = ListenerStatus::stopped(&self.config.bind_address, self.config.port);
Ok(()) Ok(())
} }
@@ -135,10 +196,16 @@ impl TcpServer {
/// Register an accepted connection /// Register an accepted connection
pub fn register_client(&mut self, id: String, stream: TcpStream) { pub fn register_client(&mut self, id: String, stream: TcpStream) {
let client_id = id.clone(); let client_id = id.clone();
self.clients.insert(
id, // Create protocol stack for this client
Arc::new(Mutex::new(ManagedClient::new(client_id, stream))), let mut protocol_stack = ProtocolStack::new();
); for config in &self.protocols {
let _ = protocol_stack.push(config);
}
let client = Arc::new(Mutex::new(ManagedClient::new(client_id, stream)));
self.clients.insert(id.clone(), client);
self.client_protocols.insert(id, protocol_stack);
} }
/// Disconnect a client /// Disconnect a client
@@ -146,6 +213,7 @@ impl TcpServer {
self.clients self.clients
.remove(id) .remove(id)
.ok_or_else(|| format!("Client '{}' not found", id))?; .ok_or_else(|| format!("Client '{}' not found", id))?;
self.client_protocols.remove(id);
Ok(()) Ok(())
} }
@@ -157,10 +225,56 @@ impl TcpServer {
.ok_or_else(|| format!("Client '{}' not found", client_id))?; .ok_or_else(|| format!("Client '{}' not found", client_id))?;
let mut client = client.lock().map_err(|e| format!("Lock failed: {}", e))?; let mut client = client.lock().map_err(|e| format!("Lock failed: {}", e))?;
client.send(data) client.send(data)
} }
/// Receive from a specific client
pub fn recv_from(&mut self, client_id: &str, buffer_size: usize) -> Result<Vec<u8>, String> {
let client = self
.clients
.get(client_id)
.ok_or_else(|| format!("Client '{}' not found", client_id))?;
let mut client = client.lock().map_err(|e| format!("Lock failed: {}", e))?;
client.recv(buffer_size)
}
/// Send TreeMessage to client through protocol stack
pub fn send_message_to(
&mut self,
client_id: &str,
message: &TreeMessage,
) -> Result<(), String> {
let protocol_stack = self
.client_protocols
.get_mut(client_id)
.ok_or_else(|| format!("Client '{}' not found", client_id))?;
let encoded = protocol_stack
.encode_message(message)
.map_err(|e| format!("Encoding failed: {}", e))?;
self.send_to(client_id, &encoded).map(|_| ())
}
/// Receive TreeMessage from client through protocol stack
pub fn recv_message_from(
&mut self,
client_id: &str,
buffer_size: usize,
) -> Result<TreeMessage, String> {
let data = self.recv_from(client_id, buffer_size)?;
let protocol_stack = self
.client_protocols
.get_mut(client_id)
.ok_or_else(|| format!("Client '{}' not found", client_id))?;
protocol_stack
.decode_message(&data)
.map_err(|e| format!("Decoding failed: {}", e))
}
/// Check if listening /// Check if listening
pub fn is_listening(&self) -> bool { pub fn is_listening(&self) -> bool {
self.status.listening self.status.listening
@@ -178,7 +292,17 @@ impl TcpServer {
/// Get status as JSON /// Get status as JSON
pub fn get_status(&self) -> Value { pub fn get_status(&self) -> Value {
let client_list: Vec<Value> = self.clients.keys().map(|k| json!(k)).collect(); let client_list: Vec<Value> = self
.clients
.iter()
.map(|(id, client)| {
let addr = client
.lock()
.map(|c| c.peer_address().to_string())
.unwrap_or_else(|_| "unknown".to_string());
json!({"id": id, "peer": addr})
})
.collect();
json!({ json!({
"listening": self.status.listening, "listening": self.status.listening,
@@ -186,9 +310,119 @@ impl TcpServer {
"port": self.config.port, "port": self.config.port,
"active_connections": self.clients.len(), "active_connections": self.clients.len(),
"total_connections": self.total_connections, "total_connections": self.total_connections,
"config": self.config,
"protocols": self.protocols,
"clients": client_list, "clients": client_list,
}) })
} }
/// Handle RPC call from message
fn handle_rpc(&mut self, payload: &Value) -> Value {
let method = match payload.get("method").and_then(|m| m.as_str()) {
Some(m) => m,
None => return json!({"success": false, "error": "missing method"}),
};
let params = payload.get("params").cloned().unwrap_or(Value::Null);
match method {
"listen" | "start" => {
if let Some(addr) = params.get("bind_address").and_then(|a| a.as_str()) {
self.config.bind_address = addr.to_string();
}
if let Some(port) = params.get("port").and_then(|p| p.as_u64()) {
self.config.port = port as u16;
}
match self.listen() {
Ok(_) => json!({"success": true, "status": self.status}),
Err(e) => json!({"success": false, "error": e}),
}
}
"stop" => match self.stop() {
Ok(_) => json!({"success": true}),
Err(e) => json!({"success": false, "error": e}),
},
"accept" => {
// Try to accept a pending connection
if let Some((id, stream)) = self.accept() {
self.register_client(id.clone(), stream);
json!({"success": true, "client_id": id})
} else {
json!({"success": true, "client_id": null})
}
}
"send" => {
let client_id = params
.get("client_id")
.and_then(|c| c.as_str())
.ok_or_else(|| json!({"error": "missing client_id"}));
match client_id {
Ok(id) => {
let data = params
.get("data")
.and_then(|d| d.as_str())
.map(|s| s.as_bytes().to_vec());
match data {
Some(data) => match self.send_to(id, &data) {
Ok(n) => json!({"success": true, "bytes_sent": n}),
Err(e) => json!({"success": false, "error": e}),
},
None => json!({"success": false, "error": "missing data"}),
}
}
Err(e) => e,
}
}
"recv" => {
let client_id = params
.get("client_id")
.and_then(|c| c.as_str())
.ok_or_else(|| json!({"error": "missing client_id"}));
match client_id {
Ok(id) => {
let size = params
.get("size")
.and_then(|s| s.as_u64())
.map(|s| s as usize)
.unwrap_or(4096);
match self.recv_from(id, size) {
Ok(data) => json!({
"success": true,
"data": String::from_utf8_lossy(&data),
"bytes": data.len()
}),
Err(e) => json!({"success": false, "error": e}),
}
}
Err(e) => e,
}
}
"disconnect" => {
let client_id = params
.get("client_id")
.and_then(|c| c.as_str())
.ok_or_else(|| json!({"error": "missing client_id"}));
match client_id {
Ok(id) => match self.disconnect_client(id) {
Ok(_) => json!({"success": true}),
Err(e) => json!({"success": false, "error": e}),
},
Err(e) => e,
}
}
"status" => self.get_status(),
"list_clients" => {
let clients: Vec<Value> = self.clients.keys().map(|k| json!(k)).collect();
json!({"success": true, "clients": clients})
}
_ => json!({"success": false, "error": format!("unknown method: {}", method)}),
}
}
} }
impl Component for TcpServer { impl Component for TcpServer {
@@ -201,8 +435,20 @@ impl Component for TcpServer {
} }
fn init(&mut self, config: Value) -> Result<(), String> { fn init(&mut self, config: Value) -> Result<(), String> {
self.config = if let Some(server_config) = config.get("config") {
serde_json::from_value(config).map_err(|e| format!("Invalid config: {}", e))?; self.config = serde_json::from_value(server_config.clone())
.map_err(|e| format!("Invalid config: {}", e))?;
} else {
self.config = serde_json::from_value(config.clone())
.map_err(|e| format!("Invalid config: {}", e))?;
}
if let Some(protocols) = config.get("protocols") {
let p: Vec<ProtocolConfig> = serde_json::from_value(protocols.clone())
.map_err(|e| format!("Invalid protocols: {}", e))?;
self.set_protocols(p)?;
}
self.listen()?; self.listen()?;
Ok(()) Ok(())
} }
@@ -214,12 +460,21 @@ impl Component for TcpServer {
impl TreeElement for TcpServer { impl TreeElement for TcpServer {
fn get_type(&self) -> Value { fn get_type(&self) -> Value {
json!("TCPServer") json!({
"type": "TCPServer",
"name": self.name,
})
} }
fn send_message(&mut self, target: Value, message: Value) -> Value { fn send_message(&mut self, target: Value, message: Value) -> Value {
match target { match target {
Value::Null => { Value::Null => {
// Check for RPC call format
if message.get("method").is_some() {
return self.handle_rpc(&message);
}
// Legacy string commands
if let Some(cmd) = message.as_str() { if let Some(cmd) = message.as_str() {
match cmd { match cmd {
"Listen" | "Start" => match self.listen() { "Listen" | "Start" => match self.listen() {
@@ -251,6 +506,9 @@ impl TreeElement for TcpServer {
} }
Err(e) => json!({"success": false, "error": e.to_string()}), Err(e) => json!({"success": false, "error": e.to_string()}),
} }
} else if obj.get("method").is_some() {
let payload = Value::Object(obj.clone());
self.handle_rpc(&payload)
} else { } else {
json!(symbols::ERR_INVALID_COMMAND) json!(symbols::ERR_INVALID_COMMAND)
} }
@@ -261,9 +519,42 @@ impl TreeElement for TcpServer {
Value::String(subtarget) => match subtarget.as_str() { Value::String(subtarget) => match subtarget.as_str() {
"config" => json!(self.config), "config" => json!(self.config),
"status" => self.get_status(), "status" => self.get_status(),
"clients" => {
let clients: Vec<Value> = self.clients.keys().map(|k| json!(k)).collect();
json!(clients)
}
_ => json!(symbols::ERR_CHILD_NOT_FOUND), _ => json!(symbols::ERR_CHILD_NOT_FOUND),
}, },
_ => json!(symbols::ERR_INVALID_TARGET), _ => json!(symbols::ERR_INVALID_TARGET),
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_server_creation() {
let server = TcpServer::new("test-server");
assert_eq!(server.name(), "test-server");
assert!(!server.is_listening());
}
#[test]
fn test_config_serialization() {
let server = TcpServer::with_config("test", TcpServerConfig::new(8080));
let json = serde_json::to_string(&server).unwrap();
assert!(json.contains("test"));
assert!(json.contains("8080"));
}
#[test]
fn test_rpc_status() {
let mut server = TcpServer::new("test");
let result = server.send_message(json!(null), json!({"method": "status"}));
let obj = result.as_object().unwrap();
assert!(obj.contains_key("listening"));
}
}
+183 -25
View File
@@ -7,11 +7,16 @@ use std::io::{Read, Write};
use std::net::TcpListener; use std::net::TcpListener;
use std::sync::mpsc; use std::sync::mpsc;
use std::thread; use std::thread;
use std::time::Duration;
use unshell::tree::{EndpointManager, TreeMessage}; use serde_json::json;
use unshell::tree::message::TreeMessage;
use unshell::tree::protocols::{ProtocolConfig, ProtocolStack};
use unshell::tree::tcp::{TcpClient, TcpServer};
use unshell::tree::{ComponentRegistry, EndpointManager, TreeElement};
fn main() { fn main() {
println!("=== TCP Chain Test Harness ===\n"); println!("=== Tree Protocol Test Harness ===\n");
// Test 1: Local TCP Server-Client loopback // Test 1: Local TCP Server-Client loopback
test_tcp_loopback(); test_tcp_loopback();
@@ -19,9 +24,21 @@ fn main() {
// Test 2: Tree message routing // Test 2: Tree message routing
test_tree_message(); test_tree_message();
// Test 3: TreeMessage serialization // Test 3: TreeMessage serialization (new API)
test_message_serialization(); test_message_serialization();
// Test 4: TCP Server with RPC
test_tcp_server();
// Test 5: TCP Client with RPC
test_tcp_client();
// Test 6: Protocol stacking
test_protocol_stack();
// Test 7: Component registry
test_component_registry();
println!("\n=== All tests complete ==="); println!("\n=== All tests complete ===");
} }
@@ -29,7 +46,6 @@ fn main() {
fn test_tcp_loopback() { fn test_tcp_loopback() {
println!("[Test 1] TCP Loopback Test"); println!("[Test 1] TCP Loopback Test");
// Start a TCP server in a thread
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let server_thread = thread::spawn(move || { let server_thread = thread::spawn(move || {
@@ -37,24 +53,25 @@ fn test_tcp_loopback() {
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
tx.send(addr.port()).unwrap(); tx.send(addr.port()).unwrap();
for stream in listener.incoming() { // Accept one connection only
if let Ok(mut stream) = stream { if let Ok((mut stream, _)) = listener.accept() {
let mut buf = [0u8; 1024]; let mut buf = [0u8; 1024];
if let Ok(n) = stream.read(&mut buf) { if let Ok(n) = stream.read(&mut buf) {
let response = b"Echo: "; let response = b"Echo: ";
let _ = stream.write(response); let _ = stream.write(response);
let _ = stream.write(&buf[..n]); let _ = stream.write(&buf[..n]);
let _ = stream.flush(); let _ = stream.flush();
}
} }
} }
}); });
let port = rx.recv().unwrap(); let port = rx.recv().unwrap();
// Connect client
let mut stream = let mut stream =
std::net::TcpStream::connect(format!("127.0.0.1:{}", port)).expect("Failed to connect"); std::net::TcpStream::connect(format!("127.0.0.1:{}", port)).expect("Failed to connect");
stream
.set_read_timeout(Some(Duration::from_millis(1000)))
.unwrap();
let msg = b"Hello from client!"; let msg = b"Hello from client!";
stream.write(msg).expect("Failed to write"); stream.write(msg).expect("Failed to write");
@@ -76,7 +93,6 @@ fn test_tree_message() {
let mut endpoint = EndpointManager::new("endpoint-1"); let mut endpoint = EndpointManager::new("endpoint-1");
// Test GetChildren
let response = endpoint let response = endpoint
.branch_mut() .branch_mut()
.send_message(serde_json::Value::Null, serde_json::json!("GetChildren")); .send_message(serde_json::Value::Null, serde_json::json!("GetChildren"));
@@ -84,13 +100,11 @@ fn test_tree_message() {
let children = response.as_object().unwrap(); let children = response.as_object().unwrap();
println!(" Children: {:?}", children.keys().collect::<Vec<_>>()); println!(" Children: {:?}", children.keys().collect::<Vec<_>>());
// Test ID access
let response = endpoint let response = endpoint
.branch_mut() .branch_mut()
.send_message(serde_json::json!("id"), serde_json::Value::Null); .send_message(serde_json::json!("id"), serde_json::Value::Null);
println!(" Endpoint ID: {:?}", response); println!(" Endpoint ID: {:?}", response);
// Test logs queue
let sender = endpoint.logs_sender().clone(); let sender = endpoint.logs_sender().clone();
sender.send(serde_json::json!("Test log entry")).unwrap(); sender.send(serde_json::json!("Test log entry")).unwrap();
@@ -102,24 +116,168 @@ fn test_tree_message() {
println!(" ✓ Tree message test passed\n"); println!(" ✓ Tree message test passed\n");
} }
/// Test TreeMessage serialization /// Test TreeMessage serialization (new API)
fn test_message_serialization() { fn test_message_serialization() {
println!("[Test 3] TreeMessage Serialization"); println!("[Test 3] TreeMessage Serialization");
let msg = TreeMessage::new_req( // Test new API
"msg-1", let msg = TreeMessage::new("query")
vec!["endpoint1".to_string(), "shell".to_string()], .to_target(["endpoint1", "shell"])
"Get", .with_payload(json!({"key": "value"}));
);
let json = serde_json::to_string_pretty(&msg).unwrap(); let json = serde_json::to_string_pretty(&msg).unwrap();
println!(" Message: {}", json); println!(" Message: {}", json);
// Test response message // Test response
let resp = TreeMessage::new_resp("resp-1", "msg-1", serde_json::json!("result data")); let resp =
TreeMessage::new("response").with_payload(json!({"success": true, "result": "data"}));
let json = serde_json::to_string_pretty(&resp).unwrap(); let json = serde_json::to_string_pretty(&resp).unwrap();
println!(" Response: {}", json); println!(" Response: {}", json);
// Test RPC call
let rpc_msg = TreeMessage::new("rpc.call")
.to_target(["components", "tcp-client"])
.with_payload(json!({
"method": "connect",
"params": {"address": "127.0.0.1", "port": 8080}
}));
let json = serde_json::to_string_pretty(&rpc_msg).unwrap();
println!(" RPC Call: {}", json);
println!(" ✓ Message serialization test passed\n"); println!(" ✓ Message serialization test passed\n");
} }
/// Test TCP Server with RPC
fn test_tcp_server() {
println!("[Test 4] TCP Server with RPC");
let mut server = TcpServer::new("test-server");
// Configure
let response = server.send_message(
json!(null),
json!({
"method": "status"
}),
);
println!(" Initial status: {:?}", response);
// Try to start listening
let response = server.send_message(
json!(null),
json!({
"method": "listen",
"params": {
"bind_address": "127.0.0.1",
"port": 0
}
}),
);
println!(" Listen result: {:?}", response);
// Get status after listening
let response = server.send_message(json!(null), json!({"method": "status"}));
println!(" Status after listen: {:?}", response);
println!(" ✓ TCP Server test passed\n");
}
/// Test TCP Client with RPC
fn test_tcp_client() {
println!("[Test 5] TCP Client with RPC");
let mut client = TcpClient::new("test-client");
// Get initial status
let response = client.send_message(json!(null), json!({"method": "status"}));
println!(" Initial status: {:?}", response);
// Try to connect (will fail since no server, but tests the RPC)
let response = client.send_message(
json!(null),
json!({
"method": "connect",
"params": {"address": "127.0.0.1", "port": 65432}
}),
);
println!(" Connect result: {:?}", response);
// Get status after connect attempt
let response = client.send_message(json!(null), json!({"method": "status"}));
println!(" Status after connect: {:?}", response);
println!(" ✓ TCP Client test passed\n");
}
/// Test Protocol stacking
fn test_protocol_stack() {
println!("[Test 6] Protocol Stacking");
// Create a stack: base64 -> tcp
let mut stack = ProtocolStack::new();
stack
.push(&ProtocolConfig::Base64(Default::default()))
.unwrap();
stack
.push(&ProtocolConfig::Tcp(Default::default()))
.unwrap();
println!(" Stack protocols: {:?}", stack.to_configs());
// Test encoding/decoding
let test_data = b"Hello, World!";
let encoded = stack.encode(test_data).unwrap();
println!(
" Encoded ({} bytes): {:?}",
encoded.len(),
String::from_utf8_lossy(&encoded)
);
let decoded = stack.decode(&encoded).unwrap();
println!(" Decoded: {:?}", String::from_utf8_lossy(&decoded));
// Test with HTTP
let mut http_stack = ProtocolStack::new();
http_stack
.push(&ProtocolConfig::Base64(Default::default()))
.unwrap();
http_stack
.push(&ProtocolConfig::Http(Default::default()))
.unwrap();
let test_data = b"test message";
let encoded = http_stack.encode(test_data).unwrap();
println!(
" HTTP+Base64 encoded: {:?}",
String::from_utf8_lossy(&encoded).lines().next()
);
println!(" ✓ Protocol stacking test passed\n");
}
/// Test Component Registry
fn test_component_registry() {
println!("[Test 7] Component Registry");
let mut registry = ComponentRegistry::new();
// Create and register a TCP client component
let client = Box::new(TcpClient::new("tcp-client-1"));
registry.register(client).unwrap();
// List components
let list = registry.list();
println!(" Registered components: {:?}", list);
// Send RPC to component
let result = registry.send_to_component("tcp-client-1", json!({"method": "status"}));
println!(" Component status: {:?}", result);
// Send RPC via path
let result = registry.send_message(json!("rpc.tcp-client-1"), json!({"method": "status"}));
println!(" Via RPC path: {:?}", result);
println!(" ✓ Component registry test passed\n");
}