From cd301dea67263b15ae7b79a427a70bcdca0a2a73 Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Wed, 22 Apr 2026 10:25:03 -0600 Subject: [PATCH] Add documentation to treetest --- ush-treetest/Cargo.lock | 158 ++++++++- ush-treetest/Cargo.toml | 3 + ush-treetest/src/cli/cli.rs | 451 +++++++++++++++++++++++++ ush-treetest/src/cli/mod.rs | 351 +------------------ ush-treetest/src/client.rs | 434 ++++++++++++++++++++++++ ush-treetest/src/leaves/shell.rs | 82 ++++- ush-treetest/src/leaves/tty.rs | 248 +++++++------- ush-treetest/src/main.rs | 309 +++++------------ ush-treetest/src/protocol/mod.rs | 38 +++ ush-treetest/src/protocol/transport.rs | 321 ++++++++++++++---- ush-treetest/src/protocol/types.rs | 298 ++++++++++++++-- ush-treetest/src/server.rs | 264 +++++++++++++++ ush-treetest/src/tree/endpoint.rs | 3 +- ush-treetest/src/tree/mod.rs | 25 +- 14 files changed, 2216 insertions(+), 769 deletions(-) create mode 100644 ush-treetest/src/cli/cli.rs create mode 100644 ush-treetest/src/client.rs create mode 100644 ush-treetest/src/server.rs diff --git a/ush-treetest/Cargo.lock b/ush-treetest/Cargo.lock index bf48c37..4d1e49a 100644 --- a/ush-treetest/Cargo.lock +++ b/ush-treetest/Cargo.lock @@ -193,6 +193,12 @@ dependencies = [ "log", ] +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + [[package]] name = "funty" version = "2.0.0" @@ -210,6 +216,12 @@ dependencies = [ "wasi", ] +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + [[package]] name = "hashbrown" version = "0.12.3" @@ -219,18 +231,40 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" + [[package]] name = "heck" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "indexmap" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" +dependencies = [ + "equivalent", + "hashbrown 0.17.0", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + [[package]] name = "jiff" version = "0.2.23" @@ -401,7 +435,7 @@ dependencies = [ "bitvec", "bytecheck", "bytes", - "hashbrown", + "hashbrown 0.12.3", "ptr_meta", "rend", "rkyv_derive", @@ -433,6 +467,15 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", +] + [[package]] name = "serde_core" version = "1.0.228" @@ -453,6 +496,28 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "serde_spanned" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6662b5879511e06e8999a8a235d848113e942c9124f211511b16466ee2995f26" +dependencies = [ + "serde_core", +] + [[package]] name = "simdutf8" version = "0.1.5" @@ -493,6 +558,21 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "target-triple" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "591ef38edfb78ca4771ee32cf494cb8771944bee237a9b91fc9c1424ac4b777b" + +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "tinyvec" version = "1.11.0" @@ -508,6 +588,60 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" +[[package]] +name = "toml" +version = "1.1.2+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81f3d15e84cbcd896376e6730314d59fb5a87f31e4b038454184435cd57defee" +dependencies = [ + "indexmap", + "serde_core", + "serde_spanned", + "toml_datetime", + "toml_parser", + "toml_writer", + "winnow", +] + +[[package]] +name = "toml_datetime" +version = "1.1.1+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3165f65f62e28e0115a00b2ebdd37eb6f3b641855f9d636d3cd4103767159ad7" +dependencies = [ + "serde_core", +] + +[[package]] +name = "toml_parser" +version = "1.1.2+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526" +dependencies = [ + "winnow", +] + +[[package]] +name = "toml_writer" +version = "1.1.1+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "756daf9b1013ebe47a8776667b466417e2d4c5679d441c26230efd9ef78692db" + +[[package]] +name = "trybuild" +version = "1.0.116" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47c635f0191bd3a2941013e5062667100969f8c4e9cd787c14f977265d73616e" +dependencies = [ + "glob", + "serde", + "serde_derive", + "serde_json", + "target-triple", + "termcolor", + "toml", +] + [[package]] name = "unicode-ident" version = "1.0.24" @@ -523,6 +657,7 @@ dependencies = [ "libc", "log", "rkyv", + "trybuild", ] [[package]] @@ -598,6 +733,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys", +] + [[package]] name = "windows-link" version = "0.2.1" @@ -613,6 +757,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "winnow" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ee1708bef14716a11bae175f579062d4554d95be2c6829f518df847b7b3fdd0" + [[package]] name = "wyz" version = "0.5.1" @@ -621,3 +771,9 @@ checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" dependencies = [ "tap", ] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/ush-treetest/Cargo.toml b/ush-treetest/Cargo.toml index 927aa07..aab9dc6 100644 --- a/ush-treetest/Cargo.toml +++ b/ush-treetest/Cargo.toml @@ -20,6 +20,9 @@ libc = { version = "0.2", optional = true } version = "4.5" features = ["derive", "env"] +[dev-dependencies] +trybuild = "1.0" + [profile.release] opt-level = 3 lto = true \ No newline at end of file diff --git a/ush-treetest/src/cli/cli.rs b/ush-treetest/src/cli/cli.rs new file mode 100644 index 0000000..546f0ad --- /dev/null +++ b/ush-treetest/src/cli/cli.rs @@ -0,0 +1,451 @@ +//! # CLI Implementation +//! +//! This module provides the interactive CLI implementation for the unshell tree protocol testbed. + +use crate::protocol::{ + FrameType, TreeRequest, TreeResponse, TcpTransport, Transport, + make_request, make_stream_open, make_stream_data, make_stream_close, + make_handshake, +}; +use crate::tree::Tree; +use crate::leaves::{RemoteShell, TTY}; +use std::string::String; +use std::vec::Vec; + +/// CLI state - manages connection and local tree. +/// +/// # Example +/// ``` +/// use ush_treetest::cli::Cli; +/// +/// let mut cli = Cli::new(); +/// println!("Leaves: {:?}", cli.list_leaves()); +/// ``` +/// +/// # Fields +/// * `transport` - Optional TCP transport for remote connection +/// * `tree` - Local tree for local operations +/// * `current_path` - Current working path +/// * `request_id` - Next request ID to send +/// * `stream_id` - Next stream ID to allocate +/// * `streams` - Active streams +/// * `base_path` - Base path assigned by server +/// * `mode` - Operation mode (Local or Connected) +pub struct Cli { + transport: Option, + tree: Tree, + current_path: String, + request_id: u64, + #[allow(dead_code)] + stream_id: u16, + streams: Vec, + base_path: String, + mode: CliMode, +} + +/// CLI operation mode. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum CliMode { + /// Local-only mode + Local, + /// Connected to remote server + Connected, +} + +/// State of an active stream. +/// +/// # Fields +/// * `stream_id` - The stream identifier +/// * `path` - The path this stream is connected to +#[derive(Debug, Clone)] +#[allow(dead_code)] +struct StreamState { + stream_id: u16, + path: String, +} + +impl Cli { + /// Create a new CLI with a local tree. + /// + /// The local tree has `/shell` and `/tty` endpoints registered. + /// + /// # Example + /// ``` + /// let cli = Cli::new(); + /// let leaves = cli.list_leaves(); + /// assert!(leaves.contains(&"/shell".to_string())); + /// ``` + pub fn new() -> Self { + let mut tree = Tree::new(); + tree.add_endpoint("/shell", Box::new(RemoteShell::new("shell"))); + tree.add_endpoint("/tty", Box::new(TTY::new("tty"))); + + Self { + transport: None, + tree, + current_path: String::from("/"), + request_id: 1, + stream_id: 1, + streams: Vec::new(), + base_path: String::from("/"), + mode: CliMode::Local, + } + } + + /// Get next request ID. + fn next_request_id(&mut self) -> u64 { + let id = self.request_id; + self.request_id += 1; + id + } + + /// Get next stream ID. + #[allow(dead_code)] + fn next_stream_id(&mut self) -> u16 { + let id = self.stream_id; + self.stream_id = self.stream_id.wrapping_add(1); + id + } + + /// List nodes at a path. + /// + /// # Arguments + /// * `path` - Optional path (defaults to current path) + /// + /// # Returns + /// List of child node names + pub fn list_nodes(&self, path: Option<&str>) -> Result, String> { + let path = path.unwrap_or(&self.current_path); + self.tree.list_nodes(path) + } + + /// List endpoints at a path. + /// + /// # Arguments + /// * `path` - Optional path (defaults to current path) + pub fn list_endpoints( + &self, + path: Option<&str>, + ) -> Result, String> { + let path = path.unwrap_or(&self.current_path); + self.tree.list_endpoints(path) + } + + /// List all leaf paths. + pub fn list_leaves(&self) -> Vec { + self.tree.list_leaves() + } + + /// Get info about a node. + pub fn get_info(&self, path: &str) -> Result { + self.tree.get_info(path) + } + + /// Execute a command locally on the tree. + pub fn exec_local(&mut self, path: &str, cmd: &str) -> Result { + let (handler, matched_path) = self + .tree + .find_handler(path) + .ok_or_else(|| format!("path not found: {}", path))?; + + let request = TreeRequest::Exec { + cmd: cmd.to_string(), + }; + + let mut handler = handler.lock().map_err(|e| e.to_string())?; + handler.handle_request(&request, matched_path) + } + + /// Connect to a remote server. + pub fn connect(&mut self, addr: &str) -> Result<(), String> { + let transport = TcpTransport::connect(addr).map_err(|e| e.to_string())?; + self.transport = Some(transport); + self.mode = CliMode::Connected; + self.do_handshake() + } + + /// Perform handshake with remote server. + fn do_handshake(&mut self) -> Result<(), String> { + let transport = self.transport.as_mut().ok_or("not connected")?; + let (header, payload) = make_handshake(vec![self.current_path.clone()]); + transport + .send_frame(&header, Some(&payload)) + .map_err(|e| e.to_string())?; + let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?; + if header.frame_type != FrameType::HandshakeAck { + return Err("unexpected response type".to_string()); + } + let ack = crate::protocol::HandshakeAck::from_bytes(&payload) + .map_err(|e| e.to_string())?; + if !ack.accepted { + return Err("handshake rejected".to_string()); + } + self.base_path = ack.assigned_base_path.clone(); + Ok(()) + } + + /// Send a request to the remote server. + pub fn send_request( + &mut self, + dst_path: &str, + request: &TreeRequest, + ) -> Result { + let request_id = self.next_request_id(); + + let transport = self.transport.as_mut().ok_or("not connected")?; + + let full_path = if dst_path.starts_with('/') { + dst_path.to_string() + } else { + format!("{}/{}", self.current_path, dst_path) + }; + + let (header, payload) = make_request(&full_path, &self.base_path, request_id, request); + transport + .send_frame(&header, Some(&payload)) + .map_err(|e| e.to_string())?; + + let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?; + if header.frame_type != FrameType::Response { + return Err("unexpected response type".to_string()); + } + + let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?; + Ok(response) + } + + /// Open a stream to a remote path. + pub fn open_stream(&mut self, dst_path: &str) -> Result { + let request_id = self.next_request_id(); + + let transport = self.transport.as_mut().ok_or("not connected")?; + + let full_path = if dst_path.starts_with('/') { + dst_path.to_string() + } else { + format!("{}/{}", self.current_path, dst_path) + }; + + let header = make_stream_open(&full_path, &self.base_path, request_id); + transport.send_frame(&header, None).map_err(|e| e.to_string())?; + + let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?; + if header.frame_type != FrameType::Response { + return Err("unexpected response type".to_string()); + } + + let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?; + + match response { + TreeResponse::StreamOpened { stream_id } => { + self.streams.push(StreamState { + stream_id, + path: full_path, + }); + Ok(stream_id) + } + _ => Err("expected StreamOpened".to_string()), + } + } + + /// Send data on a stream. + pub fn send_stream_data(&mut self, stream_id: u16, data: &[u8]) -> Result<(), String> { + let transport = self.transport.as_mut().ok_or("not connected")?; + let (header, payload) = make_stream_data(stream_id, data); + transport + .send_frame(&header, Some(&payload)) + .map_err(|e| e.to_string()) + } + + /// Close a stream. + pub fn close_stream(&mut self, stream_id: u16) -> Result<(), String> { + let transport = self.transport.as_mut().ok_or("not connected")?; + let header = make_stream_close(stream_id); + transport + .send_frame(&header, None) + .map_err(|e| e.to_string())?; + self.streams.retain(|s| s.stream_id != stream_id); + Ok(()) + } + + /// Check if connected to remote. + pub fn is_connected(&self) -> bool { + matches!(self.mode, CliMode::Connected) + } + + /// Get current path. + pub fn current_path(&self) -> &str { + &self.current_path + } + + /// Set current path. + pub fn set_path(&mut self, path: &str) { + self.current_path = path.to_string(); + } +} + +/// Parse and execute a CLI command. +/// +/// # Arguments +/// * `cli` - The CLI state +/// * `line` - The command line to parse +/// +/// # Returns +/// Ok(output) on success, Err(error) on failure +/// +/// # Example +/// ``` +/// use ush_treetest::cli::{Cli, parse_and_execute}; +/// +/// let mut cli = Cli::new(); +/// let output = parse_and_execute(&mut cli, "leaves").unwrap(); +/// assert!(output.contains("shell")); +/// ``` +pub fn parse_and_execute(cli: &mut Cli, line: &str) -> Result { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.is_empty() { + return Ok(String::new()); + } + + match parts[0] { + "ls" | "list" => { + let path = parts.get(1).copied(); + let names = cli.list_nodes(path)?; + Ok(names.join("\n")) + } + "endpoints" => { + let path = parts.get(1).copied(); + let eps = cli.list_endpoints(path)?; + let mut output = String::new(); + for ep in &eps { + output.push_str(&format!("{} ({:?}) at {}\n", ep.name, ep.endpoint_type, ep.path)); + } + Ok(output) + } + "leaves" => Ok(cli.list_leaves().join("\n")), + "info" => { + if parts.len() < 2 { + return Err("usage: info ".to_string()); + } + let info = cli.get_info(parts[1])?; + Ok(format!("{:?}", info)) + } + "exec" => { + if parts.len() < 3 { + return Err("usage: exec ".to_string()); + } + let path = parts[1]; + let cmd = parts[2..].join(" "); + if cli.is_connected() { + let request = TreeRequest::Exec { + cmd: cmd.clone(), + }; + let response = cli.send_request(path, &request)?; + format_response(response) + } else { + let response = cli.exec_local(path, &cmd)?; + format_response(response) + } + } + "cd" => { + if parts.len() < 2 { + return Err("usage: cd ".to_string()); + } + let path = parts[1]; + if cli.get_info(path).is_ok() { + cli.set_path(path); + Ok(format!("changed to {}", path)) + } else { + Err(format!("path not found: {}", path)) + } + } + "pwd" => Ok(cli.current_path().to_string()), + "connect" => { + if parts.len() < 2 { + return Err("usage: connect ".to_string()); + } + cli.connect(parts[1])?; + Ok(format!("connected to {}", parts[1])) + } + "stream" => { + if parts.len() < 2 { + return Err("usage: stream ".to_string()); + } + if !cli.is_connected() { + return Err("not connected".to_string()); + } + let stream_id = cli.open_stream(parts[1])?; + Ok(format!("opened stream {} to {}", stream_id, parts[1])) + } + "close" => { + if parts.len() < 2 { + return Err("usage: close ".to_string()); + } + let stream_id: u16 = parts[1].parse().map_err(|_| "invalid stream id".to_string())?; + cli.close_stream(stream_id)?; + Ok(format!("closed stream {}", stream_id)) + } + "send" => { + if parts.len() < 3 { + return Err("usage: send ".to_string()); + } + let stream_id: u16 = parts[1].parse().map_err(|_| "invalid stream id".to_string())?; + let data = parts[2..].join(" "); + cli.send_stream_data(stream_id, data.as_bytes())?; + Ok("sent".to_string()) + } + "help" => Ok(HELP_TEXT.to_string()), + _ => Err(format!("unknown command: {}", parts[0])), + } +} + +/// Format a TreeResponse for display. +fn format_response(response: TreeResponse) -> Result { + match response { + TreeResponse::NodeList { names } => Ok(names.join("\n")), + TreeResponse::EndpointList { endpoints } => { + let mut output = String::new(); + for ep in endpoints { + output.push_str(&format!("{} ({:?})\n", ep.name, ep.endpoint_type)); + } + Ok(output) + } + TreeResponse::LeafList { leaves } => Ok(leaves.join("\n")), + TreeResponse::NodeInfo { info } => Ok(format!( + "path: {}\nis_leaf: {}\nhas_children: {}\nendpoints: {:?}", + info.path, info.is_leaf, info.has_children, info.endpoints + )), + TreeResponse::ExecOutput { + exit_code, + stdout, + stderr, + } => { + let mut output = String::new(); + output.push_str(&format!("exit code: {}\n", exit_code)); + if !stdout.is_empty() { + output.push_str(&format!("stdout: {}\n", String::from_utf8_lossy(&stdout))); + } + if !stderr.is_empty() { + output.push_str(&format!("stderr: {}\n", String::from_utf8_lossy(&stderr))); + } + Ok(output) + } + TreeResponse::StreamOpened { stream_id } => Ok(format!("stream opened: {}", stream_id)), + } +} + +/// Help text for CLI commands. +const HELP_TEXT: &str = r#"Commands: + ls [path] List child nodes + endpoints [path] List endpoints at path + leaves List all leaf paths + info Get node info + exec Execute command at path + cd Change current path + pwd Print working path + connect Connect to remote server + stream Open stream to path + send Send data on stream + close Close stream + help Show this help +"#; \ No newline at end of file diff --git a/ush-treetest/src/cli/mod.rs b/ush-treetest/src/cli/mod.rs index c02404a..23ea32e 100644 --- a/ush-treetest/src/cli/mod.rs +++ b/ush-treetest/src/cli/mod.rs @@ -1,343 +1,18 @@ //! # CLI Module -//! +//! //! This module provides the interactive CLI for the unshell tree protocol testbed. //! It supports both local tree operations and remote connections. +//! +//! # Usage +//! +//! ```no_run +//! use ush_treetest::cli::{Cli, parse_and_execute}; +//! +//! let mut cli = Cli::new(); +//! let output = parse_and_execute(&mut cli, "leaves").unwrap(); +//! println!("{}", output); +//! ``` -use crate::protocol::{ - FrameType, TreeRequest, TreeResponse, TcpTransport, Transport, - make_request, make_stream_open, make_stream_data, make_stream_close, - make_handshake, -}; -use crate::tree::Tree; -use crate::leaves::{RemoteShell, TTY}; -use std::string::String; -use std::vec::Vec; -use std::boxed::Box; -use std::result::Result; +pub mod cli; -/// CLI state - manages connection and local tree -pub struct Cli { - transport: Option, - tree: Tree, - current_path: String, - request_id: u64, - #[allow(dead_code)] - stream_id: u16, - streams: Vec, - base_path: String, - mode: CliMode, -} - -/// CLI operation mode -#[derive(Debug, Clone, Copy)] -enum CliMode { Local, Connected } - -/// State of an open stream -#[derive(Debug)] -#[allow(dead_code)] -struct StreamState { stream_id: u16, path: String } - -impl Cli { - /// Create a new CLI with a local tree - pub fn new() -> Self { - let mut tree = Tree::new(); - tree.add_endpoint("/shell", Box::new(RemoteShell::new("shell"))); - tree.add_endpoint("/tty", Box::new(TTY::new("tty"))); - Self { - transport: None, - tree, - current_path: String::from("/"), - request_id: 1, - stream_id: 1, - streams: Vec::new(), - base_path: String::from("/"), - mode: CliMode::Local - } - } - - /// Get next request ID - fn next_request_id(&mut self) -> u64 { - let id = self.request_id; - self.request_id += 1; - id - } - - /// Get next stream ID - #[allow(dead_code)] - fn next_stream_id(&mut self) -> u16 { - let id = self.stream_id; - self.stream_id = self.stream_id.wrapping_add(1); - id - } - - /// List nodes at a path - pub fn list_nodes(&self, path: Option<&str>) -> Result, String> { - let path = path.unwrap_or(&self.current_path); - self.tree.list_nodes(path) - } - - /// List endpoints at a path - pub fn list_endpoints(&self, path: Option<&str>) -> Result, String> { - let path = path.unwrap_or(&self.current_path); - self.tree.list_endpoints(path) - } - - /// List all leaf paths - pub fn list_leaves(&self) -> Vec { - self.tree.list_leaves() - } - - /// Get info about a node - pub fn get_info(&self, path: &str) -> Result { - self.tree.get_info(path) - } - - /// Execute a command locally on the tree - pub fn exec_local(&mut self, path: &str, cmd: &str) -> Result { - let (handler, matched_path) = self.tree.find_handler(path) - .ok_or_else(|| format!("path not found: {}", path))?; - - let request = TreeRequest::Exec { cmd: cmd.to_string() }; - - // Lock the handler and make the request - let mut handler = handler.lock().map_err(|e| e.to_string())?; - handler.handle_request(&request, matched_path) - } - - /// Connect to a remote server - pub fn connect(&mut self, addr: &str) -> Result<(), String> { - let transport = TcpTransport::connect(addr).map_err(|e| e.to_string())?; - self.transport = Some(transport); - self.mode = CliMode::Connected; - self.do_handshake() - } - - /// Perform handshake with remote server - fn do_handshake(&mut self) -> Result<(), String> { - let transport = self.transport.as_mut().ok_or("not connected")?; - let (header, payload) = make_handshake(vec![self.current_path.clone()]); - transport.send_frame(&header, Some(&payload)).map_err(|e| e.to_string())?; - let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?; - if header.frame_type != FrameType::HandshakeAck { return Err("unexpected response type".to_string()); } - let ack = crate::protocol::HandshakeAck::from_bytes(&payload).map_err(|e| e.to_string())?; - if !ack.accepted { return Err("handshake rejected".to_string()); } - self.base_path = ack.assigned_base_path.clone(); - Ok(()) - } - - /// Send a request to the remote server - pub fn send_request(&mut self, dst_path: &str, request: &TreeRequest) -> Result { - // Get request_id first to avoid borrow issues - let request_id = self.next_request_id(); - - let transport = self.transport.as_mut().ok_or("not connected")?; - - let full_path = if dst_path.starts_with('/') { - dst_path.to_string() - } else { - format!("{}/{}", self.current_path, dst_path) - }; - - let (header, payload) = make_request(&full_path, &self.base_path, request_id, request); - transport.send_frame(&header, Some(&payload)).map_err(|e| e.to_string())?; - - let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?; - if header.frame_type != FrameType::Response { return Err("unexpected response type".to_string()); } - - let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?; - Ok(response) - } - - /// Open a stream to a remote path - pub fn open_stream(&mut self, dst_path: &str) -> Result { - // Get request_id first - let request_id = self.next_request_id(); - - let transport = self.transport.as_mut().ok_or("not connected")?; - - let full_path = if dst_path.starts_with('/') { - dst_path.to_string() - } else { - format!("{}/{}", self.current_path, dst_path) - }; - - let header = make_stream_open(&full_path, &self.base_path, request_id); - transport.send_frame(&header, None).map_err(|e| e.to_string())?; - - let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?; - if header.frame_type != FrameType::Response { return Err("unexpected response type".to_string()); } - - let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?; - - match response { - TreeResponse::StreamOpened { stream_id } => { - self.streams.push(StreamState { stream_id, path: full_path }); - Ok(stream_id) - } - _ => Err("expected StreamOpened".to_string()) - } - } - - /// Send data on a stream - pub fn send_stream_data(&mut self, stream_id: u16, data: &[u8]) -> Result<(), String> { - let transport = self.transport.as_mut().ok_or("not connected")?; - let (header, payload) = make_stream_data(stream_id, data); - transport.send_frame(&header, Some(&payload)).map_err(|e| e.to_string()) - } - - /// Close a stream - pub fn close_stream(&mut self, stream_id: u16) -> Result<(), String> { - let transport = self.transport.as_mut().ok_or("not connected")?; - let header = make_stream_close(stream_id); - transport.send_frame(&header, None).map_err(|e| e.to_string())?; - self.streams.retain(|s| s.stream_id != stream_id); - Ok(()) - } - - /// Check if connected to remote - pub fn is_connected(&self) -> bool { - matches!(self.mode, CliMode::Connected) - } - - /// Get current path - pub fn current_path(&self) -> &str { - &self.current_path - } - - /// Set current path - pub fn set_path(&mut self, path: &str) { - self.current_path = path.to_string(); - } -} - -/// Parse and execute a CLI command -/// -/// # Arguments -/// * `cli` - The CLI state -/// * `line` - The command line to parse -/// -/// # Returns -/// Ok(output) on success, Err(error) on failure -pub fn parse_and_execute(cli: &mut Cli, line: &str) -> Result { - let parts: Vec<&str> = line.split_whitespace().collect(); - if parts.is_empty() { return Ok(String::new()); } - - match parts[0] { - "ls" | "list" => { - let path = parts.get(1).map(|s| *s); - let names = cli.list_nodes(path)?; - Ok(names.join("\n")) - } - "endpoints" => { - let path = parts.get(1).map(|s| *s); - let eps = cli.list_endpoints(path)?; - let mut output = String::new(); - for ep in &eps { - output.push_str(&format!("{} ({:?}) at {}\n", ep.name, ep.endpoint_type, ep.path)); - } - Ok(output) - } - "leaves" => { - Ok(cli.list_leaves().join("\n")) - } - "info" => { - if parts.len() < 2 { return Err("usage: info ".to_string()); } - let info = cli.get_info(parts[1])?; - Ok(format!("{:?}", info)) - } - "exec" => { - if parts.len() < 3 { return Err("usage: exec ".to_string()); } - let path = parts[1]; - let cmd = parts[2..].join(" "); - if cli.is_connected() { - let request = TreeRequest::Exec { cmd: cmd.clone() }; - let response = cli.send_request(path, &request)?; - format_response(response) - } else { - let response = cli.exec_local(path, &cmd)?; - format_response(response) - } - } - "cd" => { - if parts.len() < 2 { return Err("usage: cd ".to_string()); } - let path = parts[1]; - if cli.get_info(path).is_ok() { - cli.set_path(path); - Ok(format!("changed to {}", path)) - } else { - Err(format!("path not found: {}", path)) - } - } - "pwd" => { - Ok(cli.current_path().to_string()) - } - "connect" => { - if parts.len() < 2 { return Err("usage: connect ".to_string()); } - cli.connect(parts[1])?; - Ok(format!("connected to {}", parts[1])) - } - "stream" => { - if parts.len() < 2 { return Err("usage: stream ".to_string()); } - if !cli.is_connected() { return Err("not connected".to_string()); } - let stream_id = cli.open_stream(parts[1])?; - Ok(format!("opened stream {} to {}", stream_id, parts[1])) - } - "close" => { - if parts.len() < 2 { return Err("usage: close ".to_string()); } - let stream_id: u16 = parts[1].parse().map_err(|_| "invalid stream id".to_string())?; - cli.close_stream(stream_id)?; - Ok(format!("closed stream {}", stream_id)) - } - "send" => { - if parts.len() < 3 { return Err("usage: send ".to_string()); } - let stream_id: u16 = parts[1].parse().map_err(|_| "invalid stream id".to_string())?; - let data = parts[2..].join(" "); - cli.send_stream_data(stream_id, data.as_bytes())?; - Ok("sent".to_string()) - } - "help" => { - Ok(HELP_TEXT.to_string()) - } - _ => Err(format!("unknown command: {}", parts[0])), - } -} - -/// Format a TreeResponse for display -fn format_response(response: TreeResponse) -> Result { - match response { - TreeResponse::NodeList { names } => Ok(names.join("\n")), - TreeResponse::EndpointList { endpoints } => { - let mut output = String::new(); - for ep in endpoints { - output.push_str(&format!("{} ({:?})\n", ep.name, ep.endpoint_type)); - } - Ok(output) - } - TreeResponse::LeafList { leaves } => Ok(leaves.join("\n")), - TreeResponse::NodeInfo { info } => Ok(format!("path: {}\nis_leaf: {}\nhas_children: {}\nendpoints: {:?}", info.path, info.is_leaf, info.has_children, info.endpoints)), - TreeResponse::ExecOutput { exit_code, stdout, stderr } => { - let mut output = String::new(); - output.push_str(&format!("exit code: {}\n", exit_code)); - if !stdout.is_empty() { output.push_str(&format!("stdout: {}\n", String::from_utf8_lossy(&stdout))); } - if !stderr.is_empty() { output.push_str(&format!("stderr: {}\n", String::from_utf8_lossy(&stderr))); } - Ok(output) - } - TreeResponse::StreamOpened { stream_id } => Ok(format!("stream opened: {}", stream_id)), - } -} - -/// Help text for CLI commands -const HELP_TEXT: &str = r#"Commands: - ls [path] List child nodes - endpoints [path] List endpoints at path - leaves List all leaf paths - info Get node info - exec Execute command at path - cd Change current path - pwd Print working path - connect Connect to remote server - stream Open stream to path - send Send data on stream - close Close stream - help Show this help -"#; \ No newline at end of file +pub use cli::{Cli, parse_and_execute}; \ No newline at end of file diff --git a/ush-treetest/src/client.rs b/ush-treetest/src/client.rs new file mode 100644 index 0000000..b425292 --- /dev/null +++ b/ush-treetest/src/client.rs @@ -0,0 +1,434 @@ +//! # Client Implementation +//! +//! This module provides the client functionality for connecting to servers, +//! sending requests, and managing streams. + +use crate::protocol::{ + FrameType, TreeRequest, TreeResponse, TcpTransport, Transport, + make_request, make_stream_open, make_stream_data, make_stream_close, + make_handshake, +}; +use crate::tree::Tree; +use crate::leaves::{RemoteShell, TTY}; +use std::string::String; +use std::vec::Vec; +use std::fmt; + +/// Client state - manages connection and local tree. +/// +/// # Example +/// ``` +/// use ush_treetest::client::Client; +/// +/// // Start with local tree +/// let mut client = Client::new_local(); +/// println!("Leaves: {:?}", client.list_leaves()); +/// +/// // Connect to remote server +/// client.connect("localhost:8080").unwrap(); +/// ``` +/// +/// # Fields +/// * `transport` - Optional TCP transport for remote connection +/// * `tree` - Local tree for local operations +/// * `current_path` - Current working path +/// * `request_id` - Next request ID to send +/// * `stream_id` - Next stream ID to allocate +/// * `streams` - Active streams +/// * `base_path` - Base path assigned by server +/// * `mode` - Operation mode (Local or Connected) +#[allow(dead_code)] +pub struct Client { + transport: Option, + #[allow(dead_code)] + tree: Tree, + current_path: String, + request_id: u64, + #[allow(dead_code)] + stream_id: u16, + streams: Vec, + base_path: String, + mode: ClientMode, +} + +impl fmt::Debug for Client { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Client") + .field("transport", &self.transport.is_some()) + .field("current_path", &self.current_path) + .field("mode", &self.mode) + .finish() + } +} + +/// Client operation mode. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[allow(dead_code)] +enum ClientMode { + /// Local-only mode (no remote connection) + Local, + /// Connected to remote server + Connected, +} + +/// State of an open stream. +/// +/// # Fields +/// * `stream_id` - The stream identifier +/// * `path` - The path this stream is connected to +#[derive(Debug, Clone)] +#[allow(dead_code)] +struct StreamState { + stream_id: u16, + path: String, +} + +#[allow(dead_code)] +impl Client { + /// Create a new client with a local tree. + /// + /// The local tree has `/shell` and `/tty` endpoints registered. + /// + /// # Example + /// ``` + /// let mut client = Client::new_local(); + /// let leaves = client.list_leaves(); + /// assert!(leaves.contains(&"/shell".to_string())); + /// ``` + pub fn new_local() -> Self { + let mut tree = Tree::new(); + tree.add_endpoint("/shell", Box::new(RemoteShell::new("shell"))); + tree.add_endpoint("/tty", Box::new(TTY::new("tty"))); + + Self { + transport: None, + tree, + current_path: String::from("/"), + request_id: 1, + stream_id: 1, + streams: Vec::new(), + base_path: String::from("/"), + mode: ClientMode::Local, + } + } + + /// Get the next request ID. + /// + /// Each request gets a unique incrementing ID. + fn next_request_id(&mut self) -> u64 { + let id = self.request_id; + self.request_id += 1; + id + } + + /// Get the next stream ID. + #[allow(dead_code)] + fn next_stream_id(&mut self) -> u16 { + let id = self.stream_id; + self.stream_id = self.stream_id.wrapping_add(1); + id + } + + /// List nodes at a path. + /// + /// # Arguments + /// * `path` - Optional path (defaults to current path) + /// + /// # Returns + /// List of child node names + /// + /// # Example + /// ``` + /// let mut client = Client::new_local(); + /// let nodes = client.list_nodes(None).unwrap(); + /// ``` + pub fn list_nodes(&self, path: Option<&str>) -> Result, String> { + let path = path.unwrap_or(&self.current_path); + self.tree.list_nodes(path) + } + + /// List endpoints at a path. + /// + /// # Arguments + /// * `path` - Optional path (defaults to current path) + /// + /// # Returns + /// List of endpoint information + pub fn list_endpoints( + &self, + path: Option<&str>, + ) -> Result, String> { + let path = path.unwrap_or(&self.current_path); + self.tree.list_endpoints(path) + } + + /// List all leaf paths. + /// + /// # Returns + /// List of leaf node paths + /// + /// # Example + /// ``` + /// let client = Client::new_local(); + /// let leaves = client.list_leaves(); + /// assert!(!leaves.is_empty()); + /// ``` + pub fn list_leaves(&self) -> Vec { + self.tree.list_leaves() + } + + /// Get information about a node. + /// + /// # Arguments + /// * `path` - The path to get info about + /// + /// # Returns + /// Node information + /// + /// # Example + /// ``` + /// let client = Client::new_local(); + /// let info = client.get_info("/shell").unwrap(); + /// assert!(info.is_leaf); + /// ``` + pub fn get_info(&self, path: &str) -> Result { + self.tree.get_info(path) + } + + /// Execute a command locally on the tree. + /// + /// # Arguments + /// * `path` - The path to execute at + /// * `cmd` - The command to execute + /// + /// # Returns + /// Execution response with exit code and output + pub fn exec_local(&mut self, path: &str, cmd: &str) -> Result { + let (handler, matched_path) = self + .tree + .find_handler(path) + .ok_or_else(|| format!("path not found: {}", path))?; + + let request = TreeRequest::Exec { + cmd: cmd.to_string(), + }; + + let mut handler = handler.lock().map_err(|e| e.to_string())?; + handler.handle_request(&request, matched_path) + } + + /// Connect to a remote server. + /// + /// # Arguments + /// * `addr` - The server address (e.g., "localhost:8080") + /// + /// # Returns + /// Ok(()) on success + /// + /// # Example + /// ``` + /// let mut client = Client::new_local(); + /// client.connect("localhost:8080").unwrap(); + /// ``` + pub fn connect(&mut self, addr: &str) -> Result<(), String> { + let transport = TcpTransport::connect(addr).map_err(|e| e.to_string())?; + self.transport = Some(transport); + self.mode = ClientMode::Connected; + self.do_handshake() + } + + /// Perform handshake with remote server. + fn do_handshake(&mut self) -> Result<(), String> { + let transport = self.transport.as_mut().ok_or("not connected")?; + let (header, payload) = make_handshake(vec![self.current_path.clone()]); + transport + .send_frame(&header, Some(&payload)) + .map_err(|e| e.to_string())?; + let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?; + if header.frame_type != FrameType::HandshakeAck { + return Err("unexpected response type".to_string()); + } + let ack = crate::protocol::HandshakeAck::from_bytes(&payload) + .map_err(|e| e.to_string())?; + if !ack.accepted { + return Err("handshake rejected".to_string()); + } + self.base_path = ack.assigned_base_path.clone(); + Ok(()) + } + + /// Send a request to the remote server. + /// + /// # Arguments + /// * `dst_path` - The destination path + /// * `request` - The request to send + /// + /// # Returns + /// The response from the server + /// + /// # Example + /// ``` + /// let mut client = Client::new_local(); + /// client.connect("localhost:8080").unwrap(); + /// + /// let request = TreeRequest::Exec { cmd: "echo hello".to_string() }; + /// let response = client.send_request("/shell", &request).unwrap(); + /// ``` + pub fn send_request(&mut self, dst_path: &str, request: &TreeRequest) -> Result { + let request_id = self.next_request_id(); + + let transport = self.transport.as_mut().ok_or("not connected")?; + + let full_path = if dst_path.starts_with('/') { + dst_path.to_string() + } else { + format!("{}/{}", self.current_path, dst_path) + }; + + let (header, payload) = make_request(&full_path, &self.base_path, request_id, request); + transport + .send_frame(&header, Some(&payload)) + .map_err(|e| e.to_string())?; + + let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?; + if header.frame_type != FrameType::Response { + return Err("unexpected response type".to_string()); + } + + let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?; + Ok(response) + } + + /// Open a stream to a remote path. + /// + /// # Arguments + /// * `dst_path` - The destination path + /// + /// # Returns + /// The stream ID on success + /// + /// # Example + /// ``` + /// let mut client = Client::new_local(); + /// client.connect("localhost:8080").unwrap(); + /// let stream_id = client.open_stream("/tty").unwrap(); + /// ``` + pub fn open_stream(&mut self, dst_path: &str) -> Result { + let request_id = self.next_request_id(); + + let transport = self.transport.as_mut().ok_or("not connected")?; + + let full_path = if dst_path.starts_with('/') { + dst_path.to_string() + } else { + format!("{}/{}", self.current_path, dst_path) + }; + + let header = make_stream_open(&full_path, &self.base_path, request_id); + transport.send_frame(&header, None).map_err(|e| e.to_string())?; + + let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?; + if header.frame_type != FrameType::Response { + return Err("unexpected response type".to_string()); + } + + let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?; + + match response { + TreeResponse::StreamOpened { stream_id } => { + self.streams.push(StreamState { + stream_id, + path: full_path, + }); + Ok(stream_id) + } + _ => Err("expected StreamOpened".to_string()), + } + } + + /// Send data on a stream. + /// + /// # Arguments + /// * `stream_id` - The stream ID + /// * `data` - The data to send + /// + /// # Example + /// ``` + /// let mut client = Client::new_local(); + /// client.connect("localhost:8080").unwrap(); + /// let stream_id = client.open_stream("/tty").unwrap(); + /// client.send_stream_data(stream_id, b"hello").unwrap(); + /// ``` + pub fn send_stream_data(&mut self, stream_id: u16, data: &[u8]) -> Result<(), String> { + let transport = self.transport.as_mut().ok_or("not connected")?; + let (header, payload) = make_stream_data(stream_id, data); + transport + .send_frame(&header, Some(&payload)) + .map_err(|e| e.to_string()) + } + + /// Close a stream. + /// + /// # Arguments + /// * `stream_id` - The stream ID to close + /// + /// # Example + /// ``` + /// let mut client = Client::new_local(); + /// client.connect("localhost:8080").unwrap(); + /// let stream_id = client.open_stream("/tty").unwrap(); + /// client.close_stream(stream_id).unwrap(); + /// ``` + pub fn close_stream(&mut self, stream_id: u16) -> Result<(), String> { + let transport = self.transport.as_mut().ok_or("not connected")?; + let header = make_stream_close(stream_id); + transport + .send_frame(&header, None) + .map_err(|e| e.to_string())?; + self.streams.retain(|s| s.stream_id != stream_id); + Ok(()) + } + + /// Check if connected to remote. + /// + /// # Returns + /// True if connected to a remote server + /// + /// # Example + /// ``` + /// let client = Client::new_local(); + /// assert!(!client.is_connected()); + /// ``` + pub fn is_connected(&self) -> bool { + matches!(self.mode, ClientMode::Connected) + } + + /// Get current path. + /// + /// # Returns + /// The current working path + /// + /// # Example + /// ``` + /// let client = Client::new_local(); + /// assert_eq!(client.current_path(), "/"); + /// ``` + pub fn current_path(&self) -> &str { + &self.current_path + } + + /// Set current path. + /// + /// # Arguments + /// * `path` - The new current path + /// + /// # Example + /// ``` + /// let mut client = Client::new_local(); + /// client.set_path("/shell"); + /// assert_eq!(client.current_path(), "/shell"); + /// ``` + pub fn set_path(&mut self, path: &str) { + self.current_path = path.to_string(); + } +} \ No newline at end of file diff --git a/ush-treetest/src/leaves/shell.rs b/ush-treetest/src/leaves/shell.rs index 5e2059d..f307770 100644 --- a/ush-treetest/src/leaves/shell.rs +++ b/ush-treetest/src/leaves/shell.rs @@ -1,37 +1,101 @@ //! # RemoteShell Leaf +//! +//! This module provides command execution functionality. use crate::protocol::{TreeRequest, TreeResponse, EndpointType}; use crate::tree::Endpoint; use std::string::String; use std::vec::Vec; use std::result::Result; +use std::fmt; -pub struct RemoteShell { name: String } +/// RemoteShell - executes commands locally. +/// +/// # Example +/// ``` +/// use ush_treetest::leaves::RemoteShell; +/// +/// let shell = RemoteShell::new("shell"); +/// ``` +pub struct RemoteShell { + name: String, +} impl RemoteShell { - pub fn new(name: &str) -> Self { Self { name: name.to_string() } } + /// Create a new RemoteShell endpoint. + /// + /// # Arguments + /// * `name` - The name for this endpoint + pub fn new(name: &str) -> Self { + Self { + name: name.to_string(), + } + } + fn execute(&self, cmd: &str) -> (i32, Vec, Vec) { use std::process::{Command, Stdio}; - match Command::new("sh").args(["-c", cmd]).stdout(Stdio::piped()).stderr(Stdio::piped()).output() { + match Command::new("sh") + .args(["-c", cmd]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .output() + { Ok(out) => (out.status.code().unwrap_or(-1), out.stdout, out.stderr), Err(e) => (-1, Vec::new(), format!("{}\n", e).into_bytes()), } } } +impl fmt::Debug for RemoteShell { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RemoteShell") + .field("name", &self.name) + .finish() + } +} + impl Endpoint for RemoteShell { - fn handle_request(&mut self, request: &TreeRequest, _src_path: &str) -> Result { + fn handle_request( + &mut self, + request: &TreeRequest, + _src_path: &str, + ) -> Result { match request { TreeRequest::Exec { cmd } => { let (exit_code, stdout, stderr) = self.execute(cmd); - Ok(TreeResponse::ExecOutput { exit_code, stdout, stderr }) + Ok(TreeResponse::ExecOutput { + exit_code, + stdout, + stderr, + }) } _ => Err("unsupported request".to_string()), } } - fn on_stream_open(&mut self, _stream_id: u16, _src_path: &str) -> Option { None } - fn on_stream_data(&mut self, _stream_id: u16, _data: &[u8]) -> bool { false } + + fn on_stream_open( + &mut self, + _stream_id: u16, + _src_path: &str, + ) -> Option { + None + } + + fn on_stream_data( + &mut self, + _stream_id: u16, + _data: &[u8], + ) -> bool { + false + } + fn on_stream_close(&mut self, _stream_id: u16) {} - fn endpoint_type(&self) -> EndpointType { EndpointType::Leaf } - fn name(&self) -> &str { &self.name } + + fn endpoint_type(&self) -> EndpointType { + EndpointType::Leaf + } + + fn name(&self) -> &str { + &self.name + } } \ No newline at end of file diff --git a/ush-treetest/src/leaves/tty.rs b/ush-treetest/src/leaves/tty.rs index 27ea092..0bd0639 100644 --- a/ush-treetest/src/leaves/tty.rs +++ b/ush-treetest/src/leaves/tty.rs @@ -8,19 +8,29 @@ use crate::tree::Endpoint; use std::boxed::Box; use std::result::Result; use std::collections::HashMap; +use std::fmt; -/// A PTY session - represents an active terminal session +/// A PTY session - represents an active terminal session. #[allow(dead_code)] -pub struct PtySession { +pub struct PtySession { /// Stream ID for this session - pub stream_id: u16, + pub stream_id: u16, /// Master file descriptor for the PTY - pub master: std::os::unix::io::RawFd, + pub master: std::os::unix::io::RawFd, /// Child process PID - pub child_pid: u32 + pub child_pid: u32, } -/// TTY endpoint - provides PTY streaming functionality +impl fmt::Debug for PtySession { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PtySession") + .field("stream_id", &self.stream_id) + .field("child_pid", &self.child_pid) + .finish() + } +} + +/// TTY endpoint - provides PTY streaming functionality. pub struct TTY { name: String, sessions: HashMap>, @@ -28,146 +38,149 @@ pub struct TTY { next_id: u16, } +impl fmt::Debug for TTY { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TTY") + .field("name", &self.name) + .field("sessions", &self.sessions.len()) + .finish() + } +} + impl TTY { - /// Create a new TTY endpoint + /// Create a new TTY endpoint. + /// + /// # Arguments + /// * `name` - The name for this endpoint pub fn new(name: &str) -> Self { - Self { - name: name.to_string(), - sessions: HashMap::new(), - next_id: 1 + Self { + name: name.to_string(), + sessions: HashMap::new(), + next_id: 1, } } - - /// Open a new PTY session - /// + + /// Open a new PTY session. + /// /// # Arguments /// * `stream_id` - The stream ID for this session - /// + /// /// # Returns /// Ok(()) on success, Err(message) on failure fn open_pty(&mut self, stream_id: u16) -> Result<(), String> { - // Open PTY master - must be unsafe let master = unsafe { libc::posix_openpt(libc::O_RDWR | libc::O_NOCTTY) }; - if master < 0 { - return Err("failed to open PTY".to_string()); + if master < 0 { + return Err("failed to open PTY".to_string()); } - - // Grant PTY access - unsafe - if unsafe { libc::grantpt(master) } != 0 { - unsafe { libc::close(master); } - return Err("failed to grant PTY".to_string()); + + if unsafe { libc::grantpt(master) } != 0 { + unsafe { libc::close(master) }; + return Err("failed to grant PTY".to_string()); } - - // Unlock PTY - unsafe - if unsafe { libc::unlockpt(master) } != 0 { - unsafe { libc::close(master); } - return Err("failed to unlock PTY".to_string()); + + if unsafe { libc::unlockpt(master) } != 0 { + unsafe { libc::close(master) }; + return Err("failed to unlock PTY".to_string()); } - - // Get slave name - unsafe but returns pointer we need to check + let slave_name = unsafe { let ptr = libc::ptsname(master); - if ptr.is_null() { - libc::close(master); - return Err("failed to get PTY name".to_string()); + if ptr.is_null() { + libc::close(master); + return Err("failed to get PTY name".to_string()); } std::ffi::CStr::from_ptr(ptr).to_string_lossy().into_owned() }; - - // Fork - unsafe + let pid = unsafe { libc::fork() }; - if pid < 0 { - unsafe { libc::close(master); } - return Err("fork failed".to_string()); + if pid < 0 { + unsafe { libc::close(master) }; + return Err("fork failed".to_string()); } - + if pid == 0 { - // Child process - set up slave PTY and exec shell - unsafe { libc::close(master); } - - let slave = unsafe { libc::open(slave_name.as_ptr() as *const libc::c_char, libc::O_RDWR) }; - if slave < 0 { - unsafe { libc::exit(1); } + unsafe { libc::close(master) }; + + let slave = unsafe { + libc::open(slave_name.as_ptr() as *const libc::c_char, libc::O_RDWR) + }; + if slave < 0 { + unsafe { libc::exit(1) }; } - - // Set controlling terminal - unsafe - unsafe { libc::ioctl(slave, libc::TIOCSCTTY, 0); } - - // Redirect stdio - unsafe - unsafe { - libc::dup2(slave, libc::STDIN_FILENO); - libc::dup2(slave, libc::STDOUT_FILENO); - libc::dup2(slave, libc::STDERR_FILENO); - libc::close(slave); + + unsafe { libc::ioctl(slave, libc::TIOCSCTTY, 0) }; + + unsafe { + libc::dup2(slave, libc::STDIN_FILENO); + libc::dup2(slave, libc::STDOUT_FILENO); + libc::dup2(slave, libc::STDERR_FILENO); + libc::close(slave); } - - // Exec shell - unsafe - unsafe { + + unsafe { libc::execl( - "/bin/sh\0".as_ptr() as *const libc::c_char, - "sh\0".as_ptr() as *const libc::c_char, - std::ptr::null::() + "/bin/sh\0".as_ptr() as *const libc::c_char, + "sh\0".as_ptr() as *const libc::c_char, + std::ptr::null::(), ); } - - // If exec fails, exit - unsafe { libc::exit(1); } + + unsafe { libc::exit(1) }; } - - // Parent - store session - self.sessions.insert(stream_id, Box::new(PtySession { - stream_id, - master, - child_pid: pid as u32 + + self.sessions.insert(stream_id, Box::new(PtySession { + stream_id, + master, + child_pid: pid as u32, })); Ok(()) } - - /// Write data to a PTY session - /// + + /// Write data to a PTY session. + /// /// # Arguments /// * `stream_id` - The stream ID /// * `data` - The data to write - /// + /// /// # Returns /// Ok(()) on success, Err(message) on failure fn write_to_pty(&mut self, stream_id: u16, data: &[u8]) -> Result<(), String> { let session = self.sessions.get_mut(&stream_id).ok_or("session not found")?; - let written = unsafe { + let written = unsafe { libc::write( - session.master, - data.as_ptr() as *const libc::c_void, - data.len() - ) + session.master, + data.as_ptr() as *const libc::c_void, + data.len(), + ) }; - if written < 0 { - return Err("write failed".to_string()); + if written < 0 { + return Err("write failed".to_string()); } Ok(()) } - - /// Close a PTY session - /// + + /// Close a PTY session. + /// /// # Arguments /// * `stream_id` - The stream ID to close fn close_pty(&mut self, stream_id: u16) { if let Some(session) = self.sessions.remove(&stream_id) { - // Send SIGTERM to child - unsafe - unsafe { libc::kill(session.child_pid as i32, libc::SIGTERM); } - - // Wait for child - unsafe + unsafe { libc::kill(session.child_pid as i32, libc::SIGTERM) }; + let mut status: libc::c_int = 0; - unsafe { libc::waitpid(session.child_pid as i32, &mut status, 0); } - - // Close master - unsafe - unsafe { libc::close(session.master); } + unsafe { libc::waitpid(session.child_pid as i32, &mut status, 0) }; + + unsafe { libc::close(session.master) }; } } } impl Endpoint for TTY { - /// Handle a request - TTY only supports exec for basic commands - fn handle_request(&mut self, request: &TreeRequest, _src_path: &str) -> Result { + fn handle_request( + &mut self, + request: &TreeRequest, + _src_path: &str, + ) -> Result { match request { TreeRequest::Exec { cmd } => { use std::process::{Command, Stdio}; @@ -177,39 +190,42 @@ impl Endpoint for TTY { .stderr(Stdio::piped()) .output() .map_err(|e| e.to_string())?; - Ok(TreeResponse::ExecOutput { - exit_code: output.status.code().unwrap_or(-1), - stdout: output.stdout, - stderr: output.stderr + Ok(TreeResponse::ExecOutput { + exit_code: output.status.code().unwrap_or(-1), + stdout: output.stdout, + stderr: output.stderr, }) } _ => Err("use stream for TTY".to_string()), } } - - /// Handle stream open - creates a new PTY session - fn on_stream_open(&mut self, stream_id: u16, _src_path: &str) -> Option { + + fn on_stream_open( + &mut self, + stream_id: u16, + _src_path: &str, + ) -> Option { self.open_pty(stream_id).ok().map(|_| stream_id) } - - /// Handle stream data - writes to PTY - fn on_stream_data(&mut self, stream_id: u16, data: &[u8]) -> bool { + + fn on_stream_data( + &mut self, + stream_id: u16, + data: &[u8], + ) -> bool { self.write_to_pty(stream_id, data).ok(); true } - - /// Handle stream close - closes PTY session - fn on_stream_close(&mut self, stream_id: u16) { - self.close_pty(stream_id); + + fn on_stream_close(&mut self, stream_id: u16) { + self.close_pty(stream_id); } - - /// Get endpoint type - fn endpoint_type(&self) -> EndpointType { - EndpointType::Stream + + fn endpoint_type(&self) -> EndpointType { + EndpointType::Stream } - - /// Get endpoint name - fn name(&self) -> &str { - &self.name + + fn name(&self) -> &str { + &self.name } } \ No newline at end of file diff --git a/ush-treetest/src/main.rs b/ush-treetest/src/main.rs index 9d125a2..74ded1b 100644 --- a/ush-treetest/src/main.rs +++ b/ush-treetest/src/main.rs @@ -1,57 +1,110 @@ //! # Unshell Tree Protocol Testbed -//! +//! //! This is a testbed implementation of a tree-based routing protocol for unshell. //! It supports serving and connecting to tree endpoints, with leaves for RemoteShell //! (command execution) and TTY (PTY streaming). +//! +//! # Commands +//! +//! - `serve [addr]` - Start a server +//! - `connect [addr]` - Connect to a server and run CLI +//! - `run ` - Run a single command locally +//! - (default) - Run interactive CLI with local tree +//! +//! # Example +//! +//! ```bash +//! # Start server +//! $ ush-treetest serve 0.0.0.0:8080 +//! +//! # Connect from another terminal +//! $ ush-treetest connect localhost:8080 +//! ``` mod cli; +mod client; mod leaves; mod protocol; +mod server; mod tree; -use crate::protocol::{FrameHeader, FrameType, TreeRequest, TreeResponse, make_response, make_handshake_ack, Transport}; -use crate::tree::Tree; -use crate::leaves::{RemoteShell, TTY}; -use crate::protocol::TcpTransport; +use crate::cli::{Cli, parse_and_execute}; use std::io::{self, Write}; -use std::sync::{Arc, Mutex}; use clap::{Parser, Subcommand}; +/// CLI argument parser. +/// +/// # Example +/// ``` +/// let args = Args::parse(); +/// match args.command { +/// Some(Command::Serve { addr }) => { ... } +/// Some(Command::Connect { addr }) => { ... } +/// _ => { ... } +/// } +/// ``` #[derive(Parser)] #[command(name = "ush-treetest")] #[command(about = "Unshell tree protocol testbed")] struct Args { #[command(subcommand)] command: Option, - + #[arg(short, long)] addr: Option, } +/// Subcommands for the CLI. +/// +/// # Variants +/// - `Serve` - Start a server +/// - `Connect` - Connect to a server +/// - `Run` - Run a single command locally +/// - `Cli` - Run interactive CLI (default) #[derive(Subcommand)] enum Command { + /// Start a server Serve { + /// Address to listen on #[arg(default_value = "0.0.0.0:8080")] addr: String, }, + /// Connect to a server Connect { + /// Server address to connect to #[arg(default_value = "localhost:8080")] addr: String, }, + /// Run interactive CLI Cli {}, + /// Run a single command locally Run { + /// Command to execute command: String, }, } +/// Main entry point. +/// +/// # Example +/// ``` +/// // Start server +/// $ ush-treetest serve +/// +/// // Connect to server +/// $ ush-treetest connect localhost:8080 +/// +/// // Run locally +/// $ ush-treetest run "exec /shell echo hello" +/// ``` fn main() { let _ = env_logger::try_init(); - + let args = Args::parse(); - + match args.command { Some(Command::Serve { addr }) => { - run_server(&addr); + server::run_server(&addr); } Some(Command::Connect { addr }) => { run_client(&addr); @@ -65,214 +118,26 @@ fn main() { } } -fn run_server(addr: &str) { - log::info!("Starting server on {}", addr); - - let tree = Arc::new(Mutex::new(Tree::new())); - { - let mut tree = tree.lock().unwrap(); - tree.add_endpoint("/shell", Box::new(RemoteShell::new("shell"))); - tree.add_endpoint("/tty", Box::new(TTY::new("tty"))); - } - - let listener = TcpTransport::listen(addr).expect("failed to bind"); - log::info!("Listening on {}", addr); - - loop { - match TcpTransport::accept(&listener) { - Ok(transport) => { - log::info!("New connection from {:?}", transport.peer_addr()); - let tree = Arc::clone(&tree); - std::thread::spawn(move || { - handle_connection(transport, tree); - }); - } - Err(e) => { - log::error!("accept error: {:?}", e); - } - } - } -} - -fn handle_connection(mut transport: TcpTransport, tree: Arc>) { - let (header, _payload) = match transport.recv_frame() { - Ok(h) => h, - Err(e) => { - log::error!("recv error: {:?}", e); - return; - } - }; - - if header.frame_type != FrameType::Handshake { - log::error!("expected handshake"); - return; - } - - log::info!("Client connected"); - - let (ack_header, ack_payload) = make_handshake_ack(true, "/client"); - transport.send_frame(&ack_header, Some(&ack_payload)).expect("send failed"); - - loop { - match transport.recv_frame() { - Ok((header, payload)) => { - let response = handle_frame(&header, &payload, &tree); - - if let Some(response) = response { - let (resp_header, resp_payload) = match response { - Ok((h, p)) => (h, p), - Err(e) => { - log::error!("handle error: {:?}", e); - break; - } - }; - transport.send_frame(&resp_header, Some(&resp_payload)).expect("send failed"); - } - - if header.frame_type == FrameType::StreamClose { - break; - } - } - Err(e) => { - log::error!("recv error: {:?}", e); - break; - } - } - } - - log::info!("Connection closed"); -} - -/// Handle a single frame and return an optional response -/// +/// Run the client with connection to a server. +/// /// # Arguments -/// * `header` - The frame header -/// * `payload` - The frame payload bytes -/// * `tree` - Shared access to the tree -/// -/// # Returns -/// Some(Ok((header, payload))) for a response to send, Some(Err(e)) for an error, None for no response -fn handle_frame(header: &FrameHeader, payload: &[u8], tree: &Arc>) -> Option), String>> { - match header.frame_type { - FrameType::Request => { - let request: TreeRequest = match TreeRequest::from_bytes(payload) { - Ok(r) => r, - Err(e) => return Some(Err(e.to_string())), - }; - - let dst_path = header.dst_path.as_deref().unwrap_or("/"); - - // Acquire lock for the entire request handling - let mut tree = match tree.lock() { - Ok(t) => t, - Err(e) => return Some(Err(format!("lock error: {}", e))), - }; - - let response = match request { - TreeRequest::ListNodes {} => { - let names = tree.list_nodes(dst_path).unwrap_or_default(); - TreeResponse::NodeList { names } - } - TreeRequest::ListEndpoints {} => { - let endpoints = tree.list_endpoints(dst_path).unwrap_or_default(); - TreeResponse::EndpointList { endpoints } - } - TreeRequest::ListLeaves {} => { - let leaves = tree.list_leaves(); - TreeResponse::LeafList { leaves } - } - TreeRequest::GetInfo { path } => { - match tree.get_info(&path) { - Ok(info) => TreeResponse::NodeInfo { info }, - Err(e) => return Some(Err(e)), - } - } - TreeRequest::Exec { ref cmd } => { - let (handler, matched_path) = match tree.find_handler(dst_path) { - Some(h) => h, - None => return Some(Err(format!("path not found: {}", dst_path))), - }; - // Lock the handler and make the request - let result = { - let mut handler = match handler.lock() { - Ok(h) => h, - Err(e) => return Some(Err(format!("lock error: {}", e))), - }; - handler.handle_request(&TreeRequest::Exec { cmd: cmd.clone() }, matched_path) - }; - match result { - Ok(resp) => resp, - Err(e) => return Some(Err(e)), - } - } - TreeRequest::StreamOpen { path } => { - match tree.open_stream(&path, &header.src_path) { - Ok(stream_id) => TreeResponse::StreamOpened { stream_id }, - Err(e) => return Some(Err(e)), - } - } - TreeRequest::Resize { .. } => { - return Some(Err("unsupported request: Resize".to_string())); - } - }; - - Some(Ok(make_response(&header.src_path, header.request_id.unwrap_or(0), &response))) - } - - FrameType::StreamOpen => { - let dst_path = header.dst_path.as_deref().unwrap_or("/"); - let mut tree = match tree.lock() { - Ok(t) => t, - Err(e) => return Some(Err(format!("lock error: {}", e))), - }; - match tree.open_stream(dst_path, &header.src_path) { - Ok(stream_id) => { - let response = TreeResponse::StreamOpened { stream_id }; - Some(Ok(make_response(&header.src_path, header.request_id.unwrap_or(0), &response))) - } - Err(e) => Some(Err(e)), - } - } - - FrameType::StreamData => { - let mut tree = match tree.lock() { - Ok(t) => t, - Err(e) => return Some(Err(format!("lock error: {}", e))), - }; - tree.route_stream_data(header, payload).ok(); - None - } - - FrameType::StreamClose => { - let mut tree = match tree.lock() { - Ok(t) => t, - Err(e) => return Some(Err(format!("lock error: {}", e))), - }; - if let Some(stream_id) = header.stream_id { - tree.close_stream(stream_id).ok(); - } - None - } - - _ => Some(Err("unsupported frame type".to_string())), - } -} - +/// * `addr` - Server address fn run_client(addr: &str) { - let mut cli = cli::Cli::new(); - + let mut cli = Cli::new(); + if let Err(e) = cli.connect(addr) { eprintln!("Failed to connect: {}", e); return; } - + println!("Connected to {}", addr); run_cli_loop(&mut cli); } +/// Run an interactive CLI with a local tree. fn run_interactive() { - let mut cli = cli::Cli::new(); - + let mut cli = Cli::new(); + println!("Unshell Tree Protocol Testbed"); println!("Type 'help' for commands\n"); println!("Local tree with endpoints:"); @@ -280,31 +145,35 @@ fn run_interactive() { println!(" {}", leaf); } println!(); - + run_cli_loop(&mut cli); } -fn run_cli_loop(cli: &mut cli::Cli) { +/// Run the CLI command loop. +/// +/// # Arguments +/// * `cli` - The CLI instance +fn run_cli_loop(cli: &mut Cli) { loop { print!("{}> ", cli.current_path()); io::stdout().flush().ok(); - + let mut line = String::new(); if io::stdin().read_line(&mut line).is_err() { break; } - + let line = line.trim(); - + if line.is_empty() { continue; } - + if line == "quit" || line == "exit" { break; } - - match cli::parse_and_execute(cli, line) { + + match parse_and_execute(cli, line) { Ok(output) => { if !output.is_empty() { println!("{}", output); @@ -317,10 +186,14 @@ fn run_cli_loop(cli: &mut cli::Cli) { } } +/// Run a single command locally. +/// +/// # Arguments +/// * `command` - The command to run fn run_single_command(command: &str) { - let mut cli = cli::Cli::new(); - - match cli::parse_and_execute(&mut cli, command) { + let mut cli = Cli::new(); + + match parse_and_execute(&mut cli, command) { Ok(output) => { if !output.is_empty() { println!("{}", output); diff --git a/ush-treetest/src/protocol/mod.rs b/ush-treetest/src/protocol/mod.rs index 26ffdce..aecc19d 100644 --- a/ush-treetest/src/protocol/mod.rs +++ b/ush-treetest/src/protocol/mod.rs @@ -1,4 +1,42 @@ //! # Protocol Module +//! +//! This module defines the protocol types and transport layer for the unshell tree protocol. +//! It provides serialization via rkyv and TCP transport for frame passing. +//! +//! # Frame Format +//! +//! Each frame consists of: +//! - 4-byte header length (little-endian u32) +//! - Serialized header bytes (using rkyv) +//! - 4-byte payload length (little-endian u32) +//! - Payload bytes (optional) +//! +//! # Usage +//! +//! ```no_run +//! use ush_treetest::protocol::{ +//! FrameType, FrameHeader, TreeRequest, TreeResponse, +//! TcpTransport, Transport, +//! }; +//! +//! // Connect to server +//! let mut transport = TcpTransport::connect("localhost:8080").unwrap(); +//! +//! // Send a request +//! let header = FrameHeader { +//! frame_type: FrameType::Request, +//! dst_path: Some("/shell".to_string()), +//! src_path: "/client".to_string(), +//! request_id: Some(1), +//! stream_id: None, +//! }; +//! let payload = TreeRequest::Exec { cmd: "echo hello".to_string() }.to_bytes(); +//! transport.send_frame(&header, Some(&payload)).unwrap(); +//! +//! // Receive response +//! let (header, payload) = transport.recv_frame().unwrap(); +//! let response = TreeResponse::from_bytes(&payload).unwrap(); +//! ``` pub mod types; pub mod transport; diff --git a/ush-treetest/src/protocol/transport.rs b/ush-treetest/src/protocol/transport.rs index 6e5d265..25dc1da 100644 --- a/ush-treetest/src/protocol/transport.rs +++ b/ush-treetest/src/protocol/transport.rs @@ -1,27 +1,85 @@ //! # Transport Layer //! //! This module provides the Transport trait and TCP implementation. -//! Uses a simple length-prefixed framing: [u32 header_len][header bytes][u32 payload_len][payload bytes] +//! Uses a simple length-prefixed framing: `[u32 header_len][header bytes][u32 payload_len][payload bytes]` +//! +//! # Frame Format +//! +//! Each frame is encoded as: +//! - 4 bytes: header length (little-endian u32) +//! - N bytes: serialized header +//! - 4 bytes: payload length (little-endian u32) +//! - M bytes: payload (optional) +//! +//! # Usage +//! +//! ```no_run +//! use ush_treetest::protocol::{TcpTransport, Transport, FrameHeader, FrameType}; +//! +//! // Connect to server +//! let mut transport = TcpTransport::connect("localhost:8080").unwrap(); +//! +//! // Send a frame +//! let header = FrameHeader { +//! frame_type: FrameType::Request, +//! dst_path: Some("/shell".to_string()), +//! src_path: "/client".to_string(), +//! request_id: Some(1), +//! stream_id: None, +//! }; +//! transport.send_frame(&header, Some(b"test payload")).unwrap(); +//! +//! // Receive a frame +//! let (header, payload) = transport.recv_frame().unwrap(); +//! ``` use crate::protocol::types::*; use std::net::{TcpStream, TcpListener}; use std::io::{Read, Write, Error}; +/// Transport trait - interface for sending and receiving frames. +/// +/// This trait defines the interface for all transport implementations. +/// Implementors must provide send_frame, recv_frame, and close methods. pub trait Transport: Sized { + /// Error type for this transport type Error: std::fmt::Debug; - /// Send a frame (header + optional payload) - fn send_frame(&mut self, header: &FrameHeader, payload: Option<&[u8]>) -> Result<(), Self::Error>; - /// Receive a frame + + /// Send a frame (header + optional payload). + /// + /// # Arguments + /// * `header` - The frame header + /// * `payload` - Optional payload bytes + fn send_frame( + &mut self, + header: &FrameHeader, + payload: Option<&[u8]>, + ) -> Result<(), Self::Error>; + + /// Receive a frame. + /// + /// # Returns + /// (header, payload) tuple fn recv_frame(&mut self) -> Result<(FrameHeader, Vec), Self::Error>; - /// Close the transport + + /// Close the transport. #[allow(dead_code)] fn close(&mut self) -> Result<(), Self::Error>; } +/// Transport-level errors. +/// +/// # Variants +/// * `ConnectionClosed` - The connection was closed +/// * `InvalidFrame` - The frame was invalid +/// * `Io` - I/O error #[derive(Debug)] pub enum TransportError { + /// Connection was closed ConnectionClosed, + /// Invalid frame format InvalidFrame(String), + /// I/O error Io(String), } @@ -36,52 +94,106 @@ impl std::fmt::Display for TransportError { } impl From for TransportError { - fn from(e: Error) -> Self { TransportError::Io(e.to_string()) } + fn from(e: Error) -> Self { + TransportError::Io(e.to_string()) + } } -/// TCP transport implementation +/// TCP transport implementation. +#[derive(Debug)] pub struct TcpTransport { stream: TcpStream, } impl TcpTransport { + /// Create a new TCP transport from an existing stream. + /// + /// Sets read/write timeouts to 30 seconds for safety. + /// + /// # Arguments + /// * `stream` - An existing TCP stream pub fn new(stream: TcpStream) -> Self { - // Set timeouts for safety - stream.set_read_timeout(Some(std::time::Duration::from_secs(30))).ok(); - stream.set_write_timeout(Some(std::time::Duration::from_secs(30))).ok(); + stream + .set_read_timeout(Some(std::time::Duration::from_secs(30))) + .ok(); + stream + .set_write_timeout(Some(std::time::Duration::from_secs(30))) + .ok(); Self { stream } } - - /// Connect to a remote address + + /// Connect to a remote address. + /// + /// # Arguments + /// * `addr` - The address to connect to (e.g., "localhost:8080") + /// + /// # Returns + /// Connected transport + /// + /// # Example + /// ``` + /// let transport = TcpTransport::connect("localhost:8080").unwrap(); + /// ``` pub fn connect(addr: &str) -> Result { let stream = TcpStream::connect(addr)?; Ok(Self::new(stream)) } - - /// Create a listening socket + + /// Create a listening socket. + /// + /// # Arguments + /// * `addr` - The address to listen on + /// + /// # Returns + /// TCP listener + /// + /// # Example + /// ``` + /// let listener = TcpTransport::listen("0.0.0.0:8080").unwrap(); + /// ``` pub fn listen(addr: &str) -> Result { let listener = TcpListener::bind(addr)?; listener.set_nonblocking(false)?; Ok(listener) } - - /// Accept an incoming connection + + /// Accept an incoming connection. + /// + /// # Arguments + /// * `listener` - The listening socket + /// + /// # Returns + /// New transport for the connection + /// + /// # Example + /// ``` + /// let listener = TcpTransport::listen("0.0.0.0:8080").unwrap(); + /// let transport = TcpTransport::accept(&listener).unwrap(); + /// ``` pub fn accept(listener: &std::net::TcpListener) -> Result { let stream = listener.accept()?.0; Ok(Self::new(stream)) } - - /// Get peer address + + /// Get peer address. + /// + /// # Returns + /// The peer's socket address pub fn peer_addr(&self) -> Result { self.stream.peer_addr() } - - /// Read exactly n bytes + + /// Read exactly n bytes. + /// + /// Will block until all bytes are read or an error occurs. fn read_exact(&mut self, mut n: usize) -> Result, TransportError> { let mut buf = Vec::with_capacity(n); while n > 0 { let mut chunk = vec![0u8; n]; - let read = self.stream.read(&mut chunk).map_err(|e| TransportError::Io(e.to_string()))?; + let read = + self.stream + .read(&mut chunk) + .map_err(|e| TransportError::Io(e.to_string()))?; if read == 0 { return Err(TransportError::ConnectionClosed); } @@ -94,63 +206,87 @@ impl TcpTransport { impl Transport for TcpTransport { type Error = TransportError; - - fn send_frame(&mut self, header: &FrameHeader, payload: Option<&[u8]>) -> Result<(), Self::Error> { - // Serialize header using rkyv + + fn send_frame( + &mut self, + header: &FrameHeader, + payload: Option<&[u8]>, + ) -> Result<(), Self::Error> { let header_bytes = header.to_bytes(); let header_len = header_bytes.len() as u32; - - // Get payload bytes + let payload_bytes = payload.unwrap_or(&[]); let payload_len = payload_bytes.len() as u32; - - // Build frame: [u32 header_len][header][u32 payload_len][payload] - let mut frame = Vec::with_capacity(4 + header_len as usize + 4 + payload_len as usize); + + let mut frame = + Vec::with_capacity(4 + header_len as usize + 4 + payload_len as usize); frame.extend_from_slice(&header_len.to_le_bytes()); frame.extend_from_slice(&header_bytes); frame.extend_from_slice(&payload_len.to_le_bytes()); frame.extend_from_slice(payload_bytes); - - self.stream.write_all(&frame).map_err(|e| TransportError::Io(e.to_string()))?; - self.stream.flush().map_err(|e| TransportError::Io(e.to_string()))?; + + self.stream + .write_all(&frame) + .map_err(|e| TransportError::Io(e.to_string()))?; + self.stream + .flush() + .map_err(|e| TransportError::Io(e.to_string()))?; Ok(()) } - + fn recv_frame(&mut self) -> Result<(FrameHeader, Vec), Self::Error> { - // Read header length let header_len_bytes = self.read_exact(4)?; let header_len = u32::from_le_bytes(header_len_bytes.try_into().unwrap()) as usize; - - // Read header + let header_bytes = self.read_exact(header_len)?; - let header = FrameHeader::from_bytes(&header_bytes).map_err(|e| TransportError::InvalidFrame(e))?; - - // Read payload length + let header = + FrameHeader::from_bytes(&header_bytes).map_err(|e| TransportError::InvalidFrame(e))?; + let payload_len_bytes = self.read_exact(4)?; - let payload_len = u32::from_le_bytes(payload_len_bytes.try_into().unwrap()) as usize; - - // Read payload + let payload_len = + u32::from_le_bytes(payload_len_bytes.try_into().unwrap()) as usize; + let payload = if payload_len > 0 { self.read_exact(payload_len)? } else { Vec::new() }; - + Ok((header, payload)) } - + fn close(&mut self) -> Result<(), Self::Error> { - self.stream.shutdown(std::net::Shutdown::Both).map_err(|e| TransportError::Io(e.to_string()))?; + self.stream + .shutdown(std::net::Shutdown::Both) + .map_err(|e| TransportError::Io(e.to_string()))?; Ok(()) } } -// ============================================================================= -// Frame builder functions -// ============================================================================= - -/// Create a request frame -pub fn make_request(dst_path: &str, src_path: &str, request_id: u64, request: &TreeRequest) -> (FrameHeader, Vec) { +/// Create a request frame. +/// +/// # Arguments +/// * `dst_path` - Destination path +/// * `src_path` - Source path +/// * `request_id` - Request ID +/// * `request` - The request payload +/// +/// # Returns +/// (header, payload) tuple +/// +/// # Example +/// ``` +/// use ush_treetest::protocol::{make_request, TreeRequest}; +/// +/// let request = TreeRequest::Exec { cmd: "echo hello".to_string() }; +/// let (header, payload) = make_request("/shell", "/client", 1, &request); +/// ``` +pub fn make_request( + dst_path: &str, + src_path: &str, + request_id: u64, + request: &TreeRequest, +) -> (FrameHeader, Vec) { let header = FrameHeader { frame_type: FrameType::Request, dst_path: Some(dst_path.to_string()), @@ -162,8 +298,20 @@ pub fn make_request(dst_path: &str, src_path: &str, request_id: u64, request: &T (header, payload) } -/// Create a response frame -pub fn make_response(src_path: &str, request_id: u64, response: &TreeResponse) -> (FrameHeader, Vec) { +/// Create a response frame. +/// +/// # Arguments +/// * `src_path` - Source path +/// * `request_id` - Request ID +/// * `response` - The response payload +/// +/// # Returns +/// (header, payload) tuple +pub fn make_response( + src_path: &str, + request_id: u64, + response: &TreeResponse, +) -> (FrameHeader, Vec) { let header = FrameHeader { frame_type: FrameType::Response, dst_path: None, @@ -175,7 +323,15 @@ pub fn make_response(src_path: &str, request_id: u64, response: &TreeResponse) - (header, payload) } -/// Create a stream open frame +/// Create a stream open frame. +/// +/// # Arguments +/// * `dst_path` - Destination path +/// * `src_path` - Source path +/// * `request_id` - Request ID +/// +/// # Returns +/// Frame header (no payload) pub fn make_stream_open(dst_path: &str, src_path: &str, request_id: u64) -> FrameHeader { FrameHeader { frame_type: FrameType::StreamOpen, @@ -186,7 +342,14 @@ pub fn make_stream_open(dst_path: &str, src_path: &str, request_id: u64) -> Fram } } -/// Create a stream data frame +/// Create a stream data frame. +/// +/// # Arguments +/// * `stream_id` - Stream ID +/// * `data` - Data to send +/// +/// # Returns +/// (header, payload) tuple pub fn make_stream_data(stream_id: u16, data: &[u8]) -> (FrameHeader, Vec) { let header = FrameHeader { frame_type: FrameType::StreamData, @@ -198,7 +361,13 @@ pub fn make_stream_data(stream_id: u16, data: &[u8]) -> (FrameHeader, Vec) { (header, data.to_vec()) } -/// Create a stream close frame +/// Create a stream close frame. +/// +/// # Arguments +/// * `stream_id` - Stream ID to close +/// +/// # Returns +/// Frame header (no payload) pub fn make_stream_close(stream_id: u16) -> FrameHeader { FrameHeader { frame_type: FrameType::StreamClose, @@ -209,9 +378,25 @@ pub fn make_stream_close(stream_id: u16) -> FrameHeader { } } -/// Create a handshake frame +/// Create a handshake frame. +/// +/// # Arguments +/// * `registered_paths` - Paths to register +/// +/// # Returns +/// (header, payload) tuple +/// +/// # Example +/// ``` +/// use ush_treetest::protocol::make_handshake; +/// +/// let paths = vec!["/client".to_string()]; +/// let (header, payload) = make_handshake(paths); +/// ``` pub fn make_handshake(registered_paths: Vec) -> (FrameHeader, Vec) { - let handshake = Handshake { registered_paths }; + let handshake = Handshake { + registered_paths, + }; let payload = handshake.to_bytes(); let header = FrameHeader { frame_type: FrameType::Handshake, @@ -223,11 +408,21 @@ pub fn make_handshake(registered_paths: Vec) -> (FrameHeader, Vec) { (header, payload) } -/// Create a handshake ack frame -pub fn make_handshake_ack(accepted: bool, assigned_base_path: &str) -> (FrameHeader, Vec) { - let ack = HandshakeAck { - accepted, - assigned_base_path: assigned_base_path.to_string() +/// Create a handshake ack frame. +/// +/// # Arguments +/// * `accepted` - Whether handshake was accepted +/// * `assigned_base_path` - Base path to assign +/// +/// # Returns +/// (header, payload) tuple +pub fn make_handshake_ack( + accepted: bool, + assigned_base_path: &str, +) -> (FrameHeader, Vec) { + let ack = HandshakeAck { + accepted, + assigned_base_path: assigned_base_path.to_string(), }; let payload = ack.to_bytes(); let header = FrameHeader { diff --git a/ush-treetest/src/protocol/types.rs b/ush-treetest/src/protocol/types.rs index e8acc90..cab676e 100644 --- a/ush-treetest/src/protocol/types.rs +++ b/ush-treetest/src/protocol/types.rs @@ -2,27 +2,86 @@ //! //! This module defines the core types for the UnShell protocol. //! Uses rkyv for zero-copy serialization. +//! +//! # Serialization +//! +//! All types implement `rkyv::Archive`, `rkyv::Serialize`, and `rkyv::Deserialize` +//! for efficient serialization without runtime type information. +//! +//! # Example +//! +//! ```no_run +//! use ush_treetest::protocol::{TreeRequest, TreeResponse}; +//! +//! // Serialize a request +//! let request = TreeRequest::Exec { cmd: "echo hello".to_string() }; +//! let bytes = request.to_bytes(); +//! +//! // Deserialize it back +//! let decoded = TreeRequest::from_bytes(&bytes).unwrap(); +//! ``` use rkyv::{Archive, Serialize, Deserialize}; use std::string::String; use std::vec::Vec; +/// Default buffer size for rkyv serialization. +/// +/// This value is chosen to accommodate typical protocol messages. const BUFFER_SIZE: usize = 4096; -/// Frame type enum - distinguishes between different frame kinds +/// Frame type enum - distinguishes between different frame kinds. +/// +/// Each frame type has a specific purpose in the protocol: +/// - `Request` / `Response`: Request-response pairs +/// - `StreamOpen` / `StreamData` / `StreamClose`: Streaming operations +/// - `Handshake` / `HandshakeAck`: Connection setup +/// +/// # Example +/// ``` +/// use ush_treetest::protocol::FrameType; +/// +/// let frame_type = FrameType::Request; +/// assert_eq!(frame_type as u8, 0x01); +/// ``` #[derive(Archive, Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum FrameType { + /// Request frame - client requesting an operation Request = 0x01, + /// Response frame - server responding to a request Response = 0x02, + /// Stream open frame - requesting a stream StreamOpen = 0x03, + /// Stream data frame - sending data on a stream StreamData = 0x04, + /// Stream close frame - closing a stream StreamClose = 0x05, + /// Handshake frame - connection initialization Handshake = 0x10, + /// Handshake acknowledgement - connection acceptance HandshakeAck = 0x11, } impl FrameType { + /// Convert a byte value to a FrameType. + /// + /// # Arguments + /// * `v` - The byte value to convert + /// + /// # Returns + /// Some(FrameType) if valid, None otherwise + /// + /// # Example + /// ``` + /// use ush_treetest::protocol::FrameType; + /// + /// let ft = FrameType::from_u8(0x01); + /// assert_eq!(ft, Some(FrameType::Request)); + /// + /// let invalid = FrameType::from_u8(0xFF); + /// assert_eq!(invalid, None); + /// ``` #[allow(dead_code)] pub fn from_u8(v: u8) -> Option { match v { @@ -38,124 +97,323 @@ impl FrameType { } } -/// Frame header - the metadata sent before each payload +/// Frame header - the metadata sent before each payload. +/// +/// The header contains routing information and identifies the frame type. +/// +/// # Fields +/// * `frame_type` - The type of frame +/// * `dst_path` - Optional destination path for routing +/// * `src_path` - Source path for the frame +/// * `request_id` - Optional request ID for correlation +/// * `stream_id` - Optional stream ID for streaming +/// +/// # Example +/// ``` +/// use ush_treetest::protocol::{FrameHeader, FrameType}; +/// +/// let header = FrameHeader { +/// frame_type: FrameType::Request, +/// dst_path: Some("/shell".to_string()), +/// src_path: "/client".to_string(), +/// request_id: Some(1), +/// stream_id: None, +/// }; +/// +/// // Serialize and deserialize +/// let bytes = header.to_bytes(); +/// let decoded = FrameHeader::from_bytes(&bytes).unwrap(); +/// assert_eq!(decoded.frame_type, FrameType::Request); +/// ``` #[derive(Archive, Serialize, Deserialize, Debug, Clone)] pub struct FrameHeader { + /// The type of this frame pub frame_type: FrameType, + /// Destination path for routing (None for responses) pub dst_path: Option, + /// Source path of the sender pub src_path: String, + /// Request ID for correlation (for request/response) pub request_id: Option, + /// Stream ID (for stream operations) pub stream_id: Option, } impl FrameHeader { + /// Serialize the header to bytes. + /// + /// # Returns + /// Serialized bytes pub fn to_bytes(&self) -> Vec { - rkyv::to_bytes::(self).unwrap().into_vec() + rkyv::to_bytes::(self) + .unwrap() + .into_vec() } - + + /// Deserialize header from bytes. + /// + /// # Arguments + /// * `bytes` - Serialized bytes + /// + /// # Returns + /// Deserialized header pub fn from_bytes(bytes: &[u8]) -> Result { unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) } } -/// Tree request - operations on the tree +/// Tree request - operations on the tree. +/// +/// These requests are sent from clients to servers to perform operations. +/// +/// # Example +/// ``` +/// use ush_treetest::protocol::TreeRequest; +/// +/// // Execute a command +/// let request = TreeRequest::Exec { cmd: "echo hello".to_string() }; +/// let bytes = request.to_bytes(); +/// let decoded = TreeRequest::from_bytes(&bytes).unwrap(); +/// ``` #[derive(Archive, Serialize, Deserialize, Debug, Clone)] pub enum TreeRequest { + /// List child nodes at a path ListNodes {}, + /// List endpoints at a path ListEndpoints {}, + /// List all leaf paths in the tree ListLeaves {}, + /// Get information about a node GetInfo { path: String }, + /// Execute a command Exec { cmd: String }, + /// Open a stream to a path StreamOpen { path: String }, + /// Resize a terminal Resize { rows: u16, cols: u16 }, } impl TreeRequest { + /// Serialize the request to bytes. pub fn to_bytes(&self) -> Vec { - rkyv::to_bytes::(self).unwrap().into_vec() + rkyv::to_bytes::(self) + .unwrap() + .into_vec() } - + + /// Deserialize request from bytes. + /// + /// # Arguments + /// * `bytes` - Serialized bytes pub fn from_bytes(bytes: &[u8]) -> Result { unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) } } -/// Tree response - results from tree operations +/// Tree response - results from tree operations. +/// +/// These responses are sent from servers to clients. +/// +/// # Example +/// ``` +/// use ush_treetest::protocol::TreeResponse; +/// +/// let response = TreeResponse::ExecOutput { +/// exit_code: 0, +/// stdout: b"hello".to_vec(), +/// stderr: b"".to_vec(), +/// }; +/// let bytes = response.to_bytes(); +/// ``` #[derive(Archive, Serialize, Deserialize, Debug, Clone)] pub enum TreeResponse { + /// List of child node names NodeList { names: Vec }, + /// List of endpoints EndpointList { endpoints: Vec }, + /// List of leaf paths LeafList { leaves: Vec }, + /// Node information NodeInfo { info: NodeInfo }, - ExecOutput { exit_code: i32, stdout: Vec, stderr: Vec }, + /// Command execution output + ExecOutput { + exit_code: i32, + stdout: Vec, + stderr: Vec, + }, + /// Stream opened confirmation StreamOpened { stream_id: u16 }, } impl TreeResponse { + /// Serialize the response to bytes. pub fn to_bytes(&self) -> Vec { - rkyv::to_bytes::(self).unwrap().into_vec() + rkyv::to_bytes::(self) + .unwrap() + .into_vec() } - + + /// Deserialize response from bytes. + /// + /// # Arguments + /// * `bytes` - Serialized bytes pub fn from_bytes(bytes: &[u8]) -> Result { unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) } } -/// Information about an endpoint +/// Information about an endpoint. +/// +/// # Fields +/// * `name` - The endpoint name +/// * `path` - The path where the endpoint is registered +/// * `endpoint_type` - The type of endpoint +/// +/// # Example +/// ``` +/// use ush_treetest::protocol::{EndpointInfo, EndpointType}; +/// +/// let info = EndpointInfo { +/// name: "shell".to_string(), +/// path: "/shell".to_string(), +/// endpoint_type: EndpointType::Leaf, +/// }; +/// ``` #[derive(Archive, Serialize, Deserialize, Debug, Clone)] pub struct EndpointInfo { + /// The endpoint name pub name: String, + /// The path where this endpoint is registered pub path: String, + /// The type of this endpoint pub endpoint_type: EndpointType, } -/// Type of endpoint +/// Type of endpoint. +/// +/// # Example +/// ``` +/// use ush_treetest::protocol::EndpointType; +/// +/// let leaf_type = EndpointType::Leaf; +/// assert!(matches!(leaf_type, EndpointType::Leaf)); +/// ``` #[derive(Archive, Serialize, Deserialize, Debug, Clone, Copy)] #[repr(u8)] pub enum EndpointType { + /// Leaf endpoint - executes commands Leaf = 0x01, + /// Proxy endpoint - routes to other endpoints Proxy = 0x02, + /// Stream endpoint - provides streaming Stream = 0x03, } -/// Information about a node in the tree +/// Information about a node in the tree. +/// +/// # Fields +/// * `path` - The node path +/// * `is_leaf` - Whether this is a leaf node +/// * `has_children` - Whether this node has children +/// * `endpoints` - List of endpoint names at this node +/// +/// # Example +/// ``` +/// use ush_treetest::protocol::NodeInfo; +/// +/// let info = NodeInfo { +/// path: "/shell".to_string(), +/// is_leaf: true, +/// has_children: false, +/// endpoints: vec!["shell".to_string()], +/// }; +/// assert!(info.is_leaf); +/// ``` #[derive(Archive, Serialize, Deserialize, Debug, Clone)] pub struct NodeInfo { + /// The node path pub path: String, + /// Whether this is a leaf node (endpoint with no children) pub is_leaf: bool, + /// Whether this node has children pub has_children: bool, + /// Names of endpoints at this node pub endpoints: Vec, } -/// Handshake message - sent when connecting +/// Handshake message - sent when connecting. +/// +/// The client sends registered paths during handshake. +/// +/// # Fields +/// * `registered_paths` - Paths the client wants to register +/// +/// # Example +/// ``` +/// use ush_treetest::protocol::Handshake; +/// +/// let handshake = Handshake { +/// registered_paths: vec!["/client".to_string()], +/// }; +/// let bytes = handshake.to_bytes(); +/// ``` #[derive(Archive, Serialize, Deserialize, Debug, Clone)] pub struct Handshake { + /// Paths the client wants to register pub registered_paths: Vec, } impl Handshake { + /// Serialize the handshake to bytes. pub fn to_bytes(&self) -> Vec { - rkyv::to_bytes::(self).unwrap().into_vec() + rkyv::to_bytes::(self) + .unwrap() + .into_vec() } - + + /// Deserialize handshake from bytes. #[allow(dead_code)] pub fn from_bytes(bytes: &[u8]) -> Result { unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) } } -/// Handshake acknowledgement - router's response to handshake +/// Handshake acknowledgement - router's response to handshake. +/// +/// # Fields +/// * `accepted` - Whether the handshake was accepted +/// * `assigned_base_path` - Base path assigned by the server +/// +/// # Example +/// ``` +/// use ush_treetest::protocol::HandshakeAck; +/// +/// let ack = HandshakeAck { +/// accepted: true, +/// assigned_base_path: "/client".to_string(), +/// }; +/// assert!(ack.accepted); +/// ``` #[derive(Archive, Serialize, Deserialize, Debug, Clone)] pub struct HandshakeAck { + /// Whether the handshake was accepted pub accepted: bool, + /// Base path assigned by the server pub assigned_base_path: String, } impl HandshakeAck { + /// Serialize the acknowledgement to bytes. pub fn to_bytes(&self) -> Vec { - rkyv::to_bytes::(self).unwrap().into_vec() + rkyv::to_bytes::(self) + .unwrap() + .into_vec() } - + + /// Deserialize acknowledgement from bytes. + /// + /// # Arguments + /// * `bytes` - Serialized bytes pub fn from_bytes(bytes: &[u8]) -> Result { unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) } diff --git a/ush-treetest/src/server.rs b/ush-treetest/src/server.rs new file mode 100644 index 0000000..fa0c39f --- /dev/null +++ b/ush-treetest/src/server.rs @@ -0,0 +1,264 @@ +//! # Server Implementation +//! +//! This module provides the server functionality for handling incoming connections. + +use crate::protocol::{ + FrameHeader, FrameType, TreeRequest, TreeResponse, TcpTransport, Transport, + make_response, make_handshake_ack, +}; +use crate::tree::Tree; +use crate::leaves::{RemoteShell, TTY}; +use std::sync::{Arc, Mutex}; + +/// Default listening address for the server. +/// +/// # Example +/// ``` +/// let addr = ush_treetest::server::DEFAULT_ADDR; +/// assert_eq!(addr, "0.0.0.0:8080"); +/// ``` +#[allow(dead_code)] +pub const DEFAULT_ADDR: &str = "0.0.0.0:8080"; + +/// Run the server with the given address. +/// +/// This function starts listening on the specified address and handles incoming +/// connections in separate threads. +/// +/// # Arguments +/// * `addr` - The address to listen on (e.g., "0.0.0.0:8080") +/// +/// # Example +/// ``` +/// run_server("0.0.0.0:8080"); +/// ``` +pub fn run_server(addr: &str) -> ! { + log::info!("Starting server on {}", addr); + + let tree = Arc::new(Mutex::new(Tree::new())); + { + let mut tree = tree.lock().unwrap(); + tree.add_endpoint("/shell", Box::new(RemoteShell::new("shell"))); + tree.add_endpoint("/tty", Box::new(TTY::new("tty"))); + } + + let listener = TcpTransport::listen(addr).expect("failed to bind"); + log::info!("Listening on {}", addr); + + loop { + match TcpTransport::accept(&listener) { + Ok(transport) => { + log::info!("New connection from {:?}", transport.peer_addr()); + let tree = Arc::clone(&tree); + std::thread::spawn(move || { + handle_connection(transport, tree); + }); + } + Err(e) => { + log::error!("accept error: {:?}", e); + } + } + } +} + +/// Handle a single connection. +/// +/// This function handles the handshake and then processes frames in a loop until +/// the connection is closed. +/// +/// # Arguments +/// * `transport` - The TCP transport for the connection +/// * `tree` - Shared access to the tree +pub fn handle_connection(mut transport: TcpTransport, tree: Arc>) { + let (header, _payload) = match transport.recv_frame() { + Ok(h) => h, + Err(e) => { + log::error!("recv error: {:?}", e); + return; + } + }; + + if header.frame_type != FrameType::Handshake { + log::error!("expected handshake"); + return; + } + + log::info!("Client connected"); + + let (ack_header, ack_payload) = make_handshake_ack(true, "/client"); + transport.send_frame(&ack_header, Some(&ack_payload)).expect("send failed"); + + loop { + match transport.recv_frame() { + Ok((header, payload)) => { + let response = handle_frame(&header, &payload, &tree); + + if let Some(response) = response { + let (resp_header, resp_payload) = match response { + Ok((h, p)) => (h, p), + Err(e) => { + log::error!("handle error: {:?}", e); + break; + } + }; + transport.send_frame(&resp_header, Some(&resp_payload)).expect("send failed"); + } + + if header.frame_type == FrameType::StreamClose { + break; + } + } + Err(e) => { + log::error!("recv error: {:?}", e); + break; + } + } + } + + log::info!("Connection closed"); +} + +/// Handle a single frame and return an optional response. +/// +/// # Arguments +/// * `header` - The frame header +/// * `payload` - The frame payload bytes +/// * `tree` - Shared access to the tree +/// +/// # Returns +/// * `Some(Ok((header, payload)))` for a response to send +/// * `Some(Err(e))` for an error +/// * `None` for no response (async handling) +/// +/// # Example +/// ``` +/// use ush_treetest::protocol::{FrameType, FrameHeader, TcpTransport}; +/// +/// let header = FrameHeader { +/// frame_type: FrameType::Request, +/// dst_path: Some("/shell".to_string()), +/// src_path: "/client".to_string(), +/// request_id: Some(1), +/// stream_id: None, +/// }; +/// let payload = vec![]; +/// +/// if let Some(result) = handle_frame(&header, &payload, &tree) { +/// // Handle response +/// } +/// ``` +pub fn handle_frame( + header: &FrameHeader, + payload: &[u8], + tree: &Arc>, +) -> Option), String>> { + match header.frame_type { + FrameType::Request => { + let request: TreeRequest = match TreeRequest::from_bytes(payload) { + Ok(r) => r, + Err(e) => return Some(Err(e.to_string())), + }; + + let dst_path = header.dst_path.as_deref().unwrap_or("/"); + + let mut tree = match tree.lock() { + Ok(t) => t, + Err(e) => return Some(Err(format!("lock error: {}", e))), + }; + + let response = match request { + TreeRequest::ListNodes {} => { + let names = tree.list_nodes(dst_path).unwrap_or_default(); + TreeResponse::NodeList { names } + } + TreeRequest::ListEndpoints {} => { + let endpoints = tree.list_endpoints(dst_path).unwrap_or_default(); + TreeResponse::EndpointList { endpoints } + } + TreeRequest::ListLeaves {} => { + let leaves = tree.list_leaves(); + TreeResponse::LeafList { leaves } + } + TreeRequest::GetInfo { path } => { + match tree.get_info(&path) { + Ok(info) => TreeResponse::NodeInfo { info }, + Err(e) => return Some(Err(e)), + } + } + TreeRequest::Exec { ref cmd } => { + let (handler, matched_path) = match tree.find_handler(dst_path) { + Some(h) => h, + None => return Some(Err(format!("path not found: {}", dst_path))), + }; + let result = { + let mut handler = match handler.lock() { + Ok(h) => h, + Err(e) => return Some(Err(format!("lock error: {}", e))), + }; + handler.handle_request(&TreeRequest::Exec { cmd: cmd.clone() }, matched_path) + }; + match result { + Ok(resp) => resp, + Err(e) => return Some(Err(e)), + } + } + TreeRequest::StreamOpen { path } => { + match tree.open_stream(&path, &header.src_path) { + Ok(stream_id) => TreeResponse::StreamOpened { stream_id }, + Err(e) => return Some(Err(e)), + } + } + TreeRequest::Resize { .. } => { + return Some(Err("unsupported request: Resize".to_string())); + } + }; + + Some(Ok(make_response( + &header.src_path, + header.request_id.unwrap_or(0), + &response, + ))) + } + + FrameType::StreamOpen => { + let dst_path = header.dst_path.as_deref().unwrap_or("/"); + let mut tree = match tree.lock() { + Ok(t) => t, + Err(e) => return Some(Err(format!("lock error: {}", e))), + }; + match tree.open_stream(dst_path, &header.src_path) { + Ok(stream_id) => { + let response = TreeResponse::StreamOpened { stream_id }; + Some(Ok(make_response( + &header.src_path, + header.request_id.unwrap_or(0), + &response, + ))) + } + Err(e) => Some(Err(e)), + } + } + + FrameType::StreamData => { + let mut tree = match tree.lock() { + Ok(t) => t, + Err(e) => return Some(Err(format!("lock error: {}", e))), + }; + tree.route_stream_data(header, payload).ok(); + None + } + + FrameType::StreamClose => { + let mut tree = match tree.lock() { + Ok(t) => t, + Err(e) => return Some(Err(format!("lock error: {}", e))), + }; + if let Some(stream_id) = header.stream_id { + tree.close_stream(stream_id).ok(); + } + None + } + + _ => Some(Err("unsupported frame type".to_string())), + } +} \ No newline at end of file diff --git a/ush-treetest/src/tree/endpoint.rs b/ush-treetest/src/tree/endpoint.rs index ff14154..29bcc1b 100644 --- a/ush-treetest/src/tree/endpoint.rs +++ b/ush-treetest/src/tree/endpoint.rs @@ -5,11 +5,12 @@ use crate::protocol::{TreeRequest, TreeResponse, EndpointType}; use std::string::String; +use std::fmt; /// Endpoint trait - implemented by all leaf handlers in the tree /// /// This trait is object-safe and must be Send + Sync to allow sharing across threads. -pub trait Endpoint: Send + Sync { +pub trait Endpoint: Send + Sync + fmt::Debug { /// Handle a request and return a response fn handle_request(&mut self, request: &TreeRequest, src_path: &str) -> Result; diff --git a/ush-treetest/src/tree/mod.rs b/ush-treetest/src/tree/mod.rs index 9673b66..8d2e810 100644 --- a/ush-treetest/src/tree/mod.rs +++ b/ush-treetest/src/tree/mod.rs @@ -14,16 +14,26 @@ use std::vec::Vec; use std::boxed::Box; use std::result::Result; use std::sync::{Arc, Mutex}; +use std::fmt; -/// A node in the tree - contains an optional endpoint and child nodes +/// A node in the tree - contains an optional endpoint and child nodes. pub struct Node { - endpoint: Option>>>, + endpoint: Option>>>, children: BTreeMap, streams: BTreeMap, next_stream_id: u16, path: String, } +impl fmt::Debug for Node { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Node") + .field("path", &self.path) + .field("children", &self.children.keys().cloned().collect::>()) + .finish() + } +} + impl Node { /// Create a new node with the given path pub fn new(path: &str) -> Self { @@ -108,11 +118,20 @@ impl Node { } } -/// Tree structure for routing - contains the root node +/// Tree structure for routing - contains the root node. +#[allow(dead_code)] pub struct Tree { root: Node, } +impl fmt::Debug for Tree { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Tree") + .field("root", &self.root.path) + .finish() + } +} + impl Tree { /// Create a new empty tree pub fn new() -> Self {