Add documentation to treetest

This commit is contained in:
Michael Mikovsky
2026-04-22 10:25:03 -06:00
parent 1af134104e
commit cd301dea67
14 changed files with 2216 additions and 769 deletions
+157 -1
View File
@@ -193,6 +193,12 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "equivalent"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
[[package]] [[package]]
name = "funty" name = "funty"
version = "2.0.0" version = "2.0.0"
@@ -210,6 +216,12 @@ dependencies = [
"wasi", "wasi",
] ]
[[package]]
name = "glob"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.12.3" version = "0.12.3"
@@ -219,18 +231,40 @@ dependencies = [
"ahash", "ahash",
] ]
[[package]]
name = "hashbrown"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51"
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.5.0" version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "indexmap"
version = "2.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9"
dependencies = [
"equivalent",
"hashbrown 0.17.0",
]
[[package]] [[package]]
name = "is_terminal_polyfill" name = "is_terminal_polyfill"
version = "1.70.2" version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695"
[[package]]
name = "itoa"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682"
[[package]] [[package]]
name = "jiff" name = "jiff"
version = "0.2.23" version = "0.2.23"
@@ -401,7 +435,7 @@ dependencies = [
"bitvec", "bitvec",
"bytecheck", "bytecheck",
"bytes", "bytes",
"hashbrown", "hashbrown 0.12.3",
"ptr_meta", "ptr_meta",
"rend", "rend",
"rkyv_derive", "rkyv_derive",
@@ -433,6 +467,15 @@ version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b"
[[package]]
name = "serde"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
dependencies = [
"serde_core",
]
[[package]] [[package]]
name = "serde_core" name = "serde_core"
version = "1.0.228" version = "1.0.228"
@@ -453,6 +496,28 @@ dependencies = [
"syn 2.0.117", "syn 2.0.117",
] ]
[[package]]
name = "serde_json"
version = "1.0.149"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86"
dependencies = [
"itoa",
"memchr",
"serde",
"serde_core",
"zmij",
]
[[package]]
name = "serde_spanned"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6662b5879511e06e8999a8a235d848113e942c9124f211511b16466ee2995f26"
dependencies = [
"serde_core",
]
[[package]] [[package]]
name = "simdutf8" name = "simdutf8"
version = "0.1.5" version = "0.1.5"
@@ -493,6 +558,21 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "target-triple"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "591ef38edfb78ca4771ee32cf494cb8771944bee237a9b91fc9c1424ac4b777b"
[[package]]
name = "termcolor"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "tinyvec" name = "tinyvec"
version = "1.11.0" version = "1.11.0"
@@ -508,6 +588,60 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "toml"
version = "1.1.2+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81f3d15e84cbcd896376e6730314d59fb5a87f31e4b038454184435cd57defee"
dependencies = [
"indexmap",
"serde_core",
"serde_spanned",
"toml_datetime",
"toml_parser",
"toml_writer",
"winnow",
]
[[package]]
name = "toml_datetime"
version = "1.1.1+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3165f65f62e28e0115a00b2ebdd37eb6f3b641855f9d636d3cd4103767159ad7"
dependencies = [
"serde_core",
]
[[package]]
name = "toml_parser"
version = "1.1.2+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526"
dependencies = [
"winnow",
]
[[package]]
name = "toml_writer"
version = "1.1.1+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "756daf9b1013ebe47a8776667b466417e2d4c5679d441c26230efd9ef78692db"
[[package]]
name = "trybuild"
version = "1.0.116"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47c635f0191bd3a2941013e5062667100969f8c4e9cd787c14f977265d73616e"
dependencies = [
"glob",
"serde",
"serde_derive",
"serde_json",
"target-triple",
"termcolor",
"toml",
]
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.24" version = "1.0.24"
@@ -523,6 +657,7 @@ dependencies = [
"libc", "libc",
"log", "log",
"rkyv", "rkyv",
"trybuild",
] ]
[[package]] [[package]]
@@ -598,6 +733,15 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "winapi-util"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys",
]
[[package]] [[package]]
name = "windows-link" name = "windows-link"
version = "0.2.1" version = "0.2.1"
@@ -613,6 +757,12 @@ dependencies = [
"windows-link", "windows-link",
] ]
[[package]]
name = "winnow"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ee1708bef14716a11bae175f579062d4554d95be2c6829f518df847b7b3fdd0"
[[package]] [[package]]
name = "wyz" name = "wyz"
version = "0.5.1" version = "0.5.1"
@@ -621,3 +771,9 @@ checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed"
dependencies = [ dependencies = [
"tap", "tap",
] ]
[[package]]
name = "zmij"
version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa"
+3
View File
@@ -20,6 +20,9 @@ libc = { version = "0.2", optional = true }
version = "4.5" version = "4.5"
features = ["derive", "env"] features = ["derive", "env"]
[dev-dependencies]
trybuild = "1.0"
[profile.release] [profile.release]
opt-level = 3 opt-level = 3
lto = true lto = true
+451
View File
@@ -0,0 +1,451 @@
//! # CLI Implementation
//!
//! This module provides the interactive CLI implementation for the unshell tree protocol testbed.
use crate::protocol::{
FrameType, TreeRequest, TreeResponse, TcpTransport, Transport,
make_request, make_stream_open, make_stream_data, make_stream_close,
make_handshake,
};
use crate::tree::Tree;
use crate::leaves::{RemoteShell, TTY};
use std::string::String;
use std::vec::Vec;
/// CLI state - manages connection and local tree.
///
/// # Example
/// ```
/// use ush_treetest::cli::Cli;
///
/// let mut cli = Cli::new();
/// println!("Leaves: {:?}", cli.list_leaves());
/// ```
///
/// # Fields
/// * `transport` - Optional TCP transport for remote connection
/// * `tree` - Local tree for local operations
/// * `current_path` - Current working path
/// * `request_id` - Next request ID to send
/// * `stream_id` - Next stream ID to allocate
/// * `streams` - Active streams
/// * `base_path` - Base path assigned by server
/// * `mode` - Operation mode (Local or Connected)
pub struct Cli {
transport: Option<TcpTransport>,
tree: Tree,
current_path: String,
request_id: u64,
#[allow(dead_code)]
stream_id: u16,
streams: Vec<StreamState>,
base_path: String,
mode: CliMode,
}
/// CLI operation mode.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CliMode {
/// Local-only mode
Local,
/// Connected to remote server
Connected,
}
/// State of an active stream.
///
/// # Fields
/// * `stream_id` - The stream identifier
/// * `path` - The path this stream is connected to
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct StreamState {
stream_id: u16,
path: String,
}
impl Cli {
/// Create a new CLI with a local tree.
///
/// The local tree has `/shell` and `/tty` endpoints registered.
///
/// # Example
/// ```
/// let cli = Cli::new();
/// let leaves = cli.list_leaves();
/// assert!(leaves.contains(&"/shell".to_string()));
/// ```
pub fn new() -> Self {
let mut tree = Tree::new();
tree.add_endpoint("/shell", Box::new(RemoteShell::new("shell")));
tree.add_endpoint("/tty", Box::new(TTY::new("tty")));
Self {
transport: None,
tree,
current_path: String::from("/"),
request_id: 1,
stream_id: 1,
streams: Vec::new(),
base_path: String::from("/"),
mode: CliMode::Local,
}
}
/// Get next request ID.
fn next_request_id(&mut self) -> u64 {
let id = self.request_id;
self.request_id += 1;
id
}
/// Get next stream ID.
#[allow(dead_code)]
fn next_stream_id(&mut self) -> u16 {
let id = self.stream_id;
self.stream_id = self.stream_id.wrapping_add(1);
id
}
/// List nodes at a path.
///
/// # Arguments
/// * `path` - Optional path (defaults to current path)
///
/// # Returns
/// List of child node names
pub fn list_nodes(&self, path: Option<&str>) -> Result<Vec<String>, String> {
let path = path.unwrap_or(&self.current_path);
self.tree.list_nodes(path)
}
/// List endpoints at a path.
///
/// # Arguments
/// * `path` - Optional path (defaults to current path)
pub fn list_endpoints(
&self,
path: Option<&str>,
) -> Result<Vec<crate::protocol::EndpointInfo>, String> {
let path = path.unwrap_or(&self.current_path);
self.tree.list_endpoints(path)
}
/// List all leaf paths.
pub fn list_leaves(&self) -> Vec<String> {
self.tree.list_leaves()
}
/// Get info about a node.
pub fn get_info(&self, path: &str) -> Result<crate::protocol::NodeInfo, String> {
self.tree.get_info(path)
}
/// Execute a command locally on the tree.
pub fn exec_local(&mut self, path: &str, cmd: &str) -> Result<TreeResponse, String> {
let (handler, matched_path) = self
.tree
.find_handler(path)
.ok_or_else(|| format!("path not found: {}", path))?;
let request = TreeRequest::Exec {
cmd: cmd.to_string(),
};
let mut handler = handler.lock().map_err(|e| e.to_string())?;
handler.handle_request(&request, matched_path)
}
/// Connect to a remote server.
pub fn connect(&mut self, addr: &str) -> Result<(), String> {
let transport = TcpTransport::connect(addr).map_err(|e| e.to_string())?;
self.transport = Some(transport);
self.mode = CliMode::Connected;
self.do_handshake()
}
/// Perform handshake with remote server.
fn do_handshake(&mut self) -> Result<(), String> {
let transport = self.transport.as_mut().ok_or("not connected")?;
let (header, payload) = make_handshake(vec![self.current_path.clone()]);
transport
.send_frame(&header, Some(&payload))
.map_err(|e| e.to_string())?;
let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?;
if header.frame_type != FrameType::HandshakeAck {
return Err("unexpected response type".to_string());
}
let ack = crate::protocol::HandshakeAck::from_bytes(&payload)
.map_err(|e| e.to_string())?;
if !ack.accepted {
return Err("handshake rejected".to_string());
}
self.base_path = ack.assigned_base_path.clone();
Ok(())
}
/// Send a request to the remote server.
pub fn send_request(
&mut self,
dst_path: &str,
request: &TreeRequest,
) -> Result<TreeResponse, String> {
let request_id = self.next_request_id();
let transport = self.transport.as_mut().ok_or("not connected")?;
let full_path = if dst_path.starts_with('/') {
dst_path.to_string()
} else {
format!("{}/{}", self.current_path, dst_path)
};
let (header, payload) = make_request(&full_path, &self.base_path, request_id, request);
transport
.send_frame(&header, Some(&payload))
.map_err(|e| e.to_string())?;
let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?;
if header.frame_type != FrameType::Response {
return Err("unexpected response type".to_string());
}
let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?;
Ok(response)
}
/// Open a stream to a remote path.
pub fn open_stream(&mut self, dst_path: &str) -> Result<u16, String> {
let request_id = self.next_request_id();
let transport = self.transport.as_mut().ok_or("not connected")?;
let full_path = if dst_path.starts_with('/') {
dst_path.to_string()
} else {
format!("{}/{}", self.current_path, dst_path)
};
let header = make_stream_open(&full_path, &self.base_path, request_id);
transport.send_frame(&header, None).map_err(|e| e.to_string())?;
let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?;
if header.frame_type != FrameType::Response {
return Err("unexpected response type".to_string());
}
let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?;
match response {
TreeResponse::StreamOpened { stream_id } => {
self.streams.push(StreamState {
stream_id,
path: full_path,
});
Ok(stream_id)
}
_ => Err("expected StreamOpened".to_string()),
}
}
/// Send data on a stream.
pub fn send_stream_data(&mut self, stream_id: u16, data: &[u8]) -> Result<(), String> {
let transport = self.transport.as_mut().ok_or("not connected")?;
let (header, payload) = make_stream_data(stream_id, data);
transport
.send_frame(&header, Some(&payload))
.map_err(|e| e.to_string())
}
/// Close a stream.
pub fn close_stream(&mut self, stream_id: u16) -> Result<(), String> {
let transport = self.transport.as_mut().ok_or("not connected")?;
let header = make_stream_close(stream_id);
transport
.send_frame(&header, None)
.map_err(|e| e.to_string())?;
self.streams.retain(|s| s.stream_id != stream_id);
Ok(())
}
/// Check if connected to remote.
pub fn is_connected(&self) -> bool {
matches!(self.mode, CliMode::Connected)
}
/// Get current path.
pub fn current_path(&self) -> &str {
&self.current_path
}
/// Set current path.
pub fn set_path(&mut self, path: &str) {
self.current_path = path.to_string();
}
}
/// Parse and execute a CLI command.
///
/// # Arguments
/// * `cli` - The CLI state
/// * `line` - The command line to parse
///
/// # Returns
/// Ok(output) on success, Err(error) on failure
///
/// # Example
/// ```
/// use ush_treetest::cli::{Cli, parse_and_execute};
///
/// let mut cli = Cli::new();
/// let output = parse_and_execute(&mut cli, "leaves").unwrap();
/// assert!(output.contains("shell"));
/// ```
pub fn parse_and_execute(cli: &mut Cli, line: &str) -> Result<String, String> {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.is_empty() {
return Ok(String::new());
}
match parts[0] {
"ls" | "list" => {
let path = parts.get(1).copied();
let names = cli.list_nodes(path)?;
Ok(names.join("\n"))
}
"endpoints" => {
let path = parts.get(1).copied();
let eps = cli.list_endpoints(path)?;
let mut output = String::new();
for ep in &eps {
output.push_str(&format!("{} ({:?}) at {}\n", ep.name, ep.endpoint_type, ep.path));
}
Ok(output)
}
"leaves" => Ok(cli.list_leaves().join("\n")),
"info" => {
if parts.len() < 2 {
return Err("usage: info <path>".to_string());
}
let info = cli.get_info(parts[1])?;
Ok(format!("{:?}", info))
}
"exec" => {
if parts.len() < 3 {
return Err("usage: exec <path> <command>".to_string());
}
let path = parts[1];
let cmd = parts[2..].join(" ");
if cli.is_connected() {
let request = TreeRequest::Exec {
cmd: cmd.clone(),
};
let response = cli.send_request(path, &request)?;
format_response(response)
} else {
let response = cli.exec_local(path, &cmd)?;
format_response(response)
}
}
"cd" => {
if parts.len() < 2 {
return Err("usage: cd <path>".to_string());
}
let path = parts[1];
if cli.get_info(path).is_ok() {
cli.set_path(path);
Ok(format!("changed to {}", path))
} else {
Err(format!("path not found: {}", path))
}
}
"pwd" => Ok(cli.current_path().to_string()),
"connect" => {
if parts.len() < 2 {
return Err("usage: connect <host:port>".to_string());
}
cli.connect(parts[1])?;
Ok(format!("connected to {}", parts[1]))
}
"stream" => {
if parts.len() < 2 {
return Err("usage: stream <path>".to_string());
}
if !cli.is_connected() {
return Err("not connected".to_string());
}
let stream_id = cli.open_stream(parts[1])?;
Ok(format!("opened stream {} to {}", stream_id, parts[1]))
}
"close" => {
if parts.len() < 2 {
return Err("usage: close <stream_id>".to_string());
}
let stream_id: u16 = parts[1].parse().map_err(|_| "invalid stream id".to_string())?;
cli.close_stream(stream_id)?;
Ok(format!("closed stream {}", stream_id))
}
"send" => {
if parts.len() < 3 {
return Err("usage: send <stream_id> <data>".to_string());
}
let stream_id: u16 = parts[1].parse().map_err(|_| "invalid stream id".to_string())?;
let data = parts[2..].join(" ");
cli.send_stream_data(stream_id, data.as_bytes())?;
Ok("sent".to_string())
}
"help" => Ok(HELP_TEXT.to_string()),
_ => Err(format!("unknown command: {}", parts[0])),
}
}
/// Format a TreeResponse for display.
fn format_response(response: TreeResponse) -> Result<String, String> {
match response {
TreeResponse::NodeList { names } => Ok(names.join("\n")),
TreeResponse::EndpointList { endpoints } => {
let mut output = String::new();
for ep in endpoints {
output.push_str(&format!("{} ({:?})\n", ep.name, ep.endpoint_type));
}
Ok(output)
}
TreeResponse::LeafList { leaves } => Ok(leaves.join("\n")),
TreeResponse::NodeInfo { info } => Ok(format!(
"path: {}\nis_leaf: {}\nhas_children: {}\nendpoints: {:?}",
info.path, info.is_leaf, info.has_children, info.endpoints
)),
TreeResponse::ExecOutput {
exit_code,
stdout,
stderr,
} => {
let mut output = String::new();
output.push_str(&format!("exit code: {}\n", exit_code));
if !stdout.is_empty() {
output.push_str(&format!("stdout: {}\n", String::from_utf8_lossy(&stdout)));
}
if !stderr.is_empty() {
output.push_str(&format!("stderr: {}\n", String::from_utf8_lossy(&stderr)));
}
Ok(output)
}
TreeResponse::StreamOpened { stream_id } => Ok(format!("stream opened: {}", stream_id)),
}
}
/// Help text for CLI commands.
const HELP_TEXT: &str = r#"Commands:
ls [path] List child nodes
endpoints [path] List endpoints at path
leaves List all leaf paths
info <path> Get node info
exec <path> <cmd> Execute command at path
cd <path> Change current path
pwd Print working path
connect <host> Connect to remote server
stream <path> Open stream to path
send <id> <data> Send data on stream
close <id> Close stream
help Show this help
"#;
+12 -337
View File
@@ -2,342 +2,17 @@
//! //!
//! This module provides the interactive CLI for the unshell tree protocol testbed. //! This module provides the interactive CLI for the unshell tree protocol testbed.
//! It supports both local tree operations and remote connections. //! It supports both local tree operations and remote connections.
//!
//! # Usage
//!
//! ```no_run
//! use ush_treetest::cli::{Cli, parse_and_execute};
//!
//! let mut cli = Cli::new();
//! let output = parse_and_execute(&mut cli, "leaves").unwrap();
//! println!("{}", output);
//! ```
use crate::protocol::{ pub mod cli;
FrameType, TreeRequest, TreeResponse, TcpTransport, Transport,
make_request, make_stream_open, make_stream_data, make_stream_close,
make_handshake,
};
use crate::tree::Tree;
use crate::leaves::{RemoteShell, TTY};
use std::string::String;
use std::vec::Vec;
use std::boxed::Box;
use std::result::Result;
/// CLI state - manages connection and local tree pub use cli::{Cli, parse_and_execute};
pub struct Cli {
transport: Option<TcpTransport>,
tree: Tree,
current_path: String,
request_id: u64,
#[allow(dead_code)]
stream_id: u16,
streams: Vec<StreamState>,
base_path: String,
mode: CliMode,
}
/// CLI operation mode
#[derive(Debug, Clone, Copy)]
enum CliMode { Local, Connected }
/// State of an open stream
#[derive(Debug)]
#[allow(dead_code)]
struct StreamState { stream_id: u16, path: String }
impl Cli {
/// Create a new CLI with a local tree
pub fn new() -> Self {
let mut tree = Tree::new();
tree.add_endpoint("/shell", Box::new(RemoteShell::new("shell")));
tree.add_endpoint("/tty", Box::new(TTY::new("tty")));
Self {
transport: None,
tree,
current_path: String::from("/"),
request_id: 1,
stream_id: 1,
streams: Vec::new(),
base_path: String::from("/"),
mode: CliMode::Local
}
}
/// Get next request ID
fn next_request_id(&mut self) -> u64 {
let id = self.request_id;
self.request_id += 1;
id
}
/// Get next stream ID
#[allow(dead_code)]
fn next_stream_id(&mut self) -> u16 {
let id = self.stream_id;
self.stream_id = self.stream_id.wrapping_add(1);
id
}
/// List nodes at a path
pub fn list_nodes(&self, path: Option<&str>) -> Result<Vec<String>, String> {
let path = path.unwrap_or(&self.current_path);
self.tree.list_nodes(path)
}
/// List endpoints at a path
pub fn list_endpoints(&self, path: Option<&str>) -> Result<Vec<crate::protocol::EndpointInfo>, String> {
let path = path.unwrap_or(&self.current_path);
self.tree.list_endpoints(path)
}
/// List all leaf paths
pub fn list_leaves(&self) -> Vec<String> {
self.tree.list_leaves()
}
/// Get info about a node
pub fn get_info(&self, path: &str) -> Result<crate::protocol::NodeInfo, String> {
self.tree.get_info(path)
}
/// Execute a command locally on the tree
pub fn exec_local(&mut self, path: &str, cmd: &str) -> Result<TreeResponse, String> {
let (handler, matched_path) = self.tree.find_handler(path)
.ok_or_else(|| format!("path not found: {}", path))?;
let request = TreeRequest::Exec { cmd: cmd.to_string() };
// Lock the handler and make the request
let mut handler = handler.lock().map_err(|e| e.to_string())?;
handler.handle_request(&request, matched_path)
}
/// Connect to a remote server
pub fn connect(&mut self, addr: &str) -> Result<(), String> {
let transport = TcpTransport::connect(addr).map_err(|e| e.to_string())?;
self.transport = Some(transport);
self.mode = CliMode::Connected;
self.do_handshake()
}
/// Perform handshake with remote server
fn do_handshake(&mut self) -> Result<(), String> {
let transport = self.transport.as_mut().ok_or("not connected")?;
let (header, payload) = make_handshake(vec![self.current_path.clone()]);
transport.send_frame(&header, Some(&payload)).map_err(|e| e.to_string())?;
let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?;
if header.frame_type != FrameType::HandshakeAck { return Err("unexpected response type".to_string()); }
let ack = crate::protocol::HandshakeAck::from_bytes(&payload).map_err(|e| e.to_string())?;
if !ack.accepted { return Err("handshake rejected".to_string()); }
self.base_path = ack.assigned_base_path.clone();
Ok(())
}
/// Send a request to the remote server
pub fn send_request(&mut self, dst_path: &str, request: &TreeRequest) -> Result<TreeResponse, String> {
// Get request_id first to avoid borrow issues
let request_id = self.next_request_id();
let transport = self.transport.as_mut().ok_or("not connected")?;
let full_path = if dst_path.starts_with('/') {
dst_path.to_string()
} else {
format!("{}/{}", self.current_path, dst_path)
};
let (header, payload) = make_request(&full_path, &self.base_path, request_id, request);
transport.send_frame(&header, Some(&payload)).map_err(|e| e.to_string())?;
let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?;
if header.frame_type != FrameType::Response { return Err("unexpected response type".to_string()); }
let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?;
Ok(response)
}
/// Open a stream to a remote path
pub fn open_stream(&mut self, dst_path: &str) -> Result<u16, String> {
// Get request_id first
let request_id = self.next_request_id();
let transport = self.transport.as_mut().ok_or("not connected")?;
let full_path = if dst_path.starts_with('/') {
dst_path.to_string()
} else {
format!("{}/{}", self.current_path, dst_path)
};
let header = make_stream_open(&full_path, &self.base_path, request_id);
transport.send_frame(&header, None).map_err(|e| e.to_string())?;
let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?;
if header.frame_type != FrameType::Response { return Err("unexpected response type".to_string()); }
let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?;
match response {
TreeResponse::StreamOpened { stream_id } => {
self.streams.push(StreamState { stream_id, path: full_path });
Ok(stream_id)
}
_ => Err("expected StreamOpened".to_string())
}
}
/// Send data on a stream
pub fn send_stream_data(&mut self, stream_id: u16, data: &[u8]) -> Result<(), String> {
let transport = self.transport.as_mut().ok_or("not connected")?;
let (header, payload) = make_stream_data(stream_id, data);
transport.send_frame(&header, Some(&payload)).map_err(|e| e.to_string())
}
/// Close a stream
pub fn close_stream(&mut self, stream_id: u16) -> Result<(), String> {
let transport = self.transport.as_mut().ok_or("not connected")?;
let header = make_stream_close(stream_id);
transport.send_frame(&header, None).map_err(|e| e.to_string())?;
self.streams.retain(|s| s.stream_id != stream_id);
Ok(())
}
/// Check if connected to remote
pub fn is_connected(&self) -> bool {
matches!(self.mode, CliMode::Connected)
}
/// Get current path
pub fn current_path(&self) -> &str {
&self.current_path
}
/// Set current path
pub fn set_path(&mut self, path: &str) {
self.current_path = path.to_string();
}
}
/// Parse and execute a CLI command
///
/// # Arguments
/// * `cli` - The CLI state
/// * `line` - The command line to parse
///
/// # Returns
/// Ok(output) on success, Err(error) on failure
pub fn parse_and_execute(cli: &mut Cli, line: &str) -> Result<String, String> {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.is_empty() { return Ok(String::new()); }
match parts[0] {
"ls" | "list" => {
let path = parts.get(1).map(|s| *s);
let names = cli.list_nodes(path)?;
Ok(names.join("\n"))
}
"endpoints" => {
let path = parts.get(1).map(|s| *s);
let eps = cli.list_endpoints(path)?;
let mut output = String::new();
for ep in &eps {
output.push_str(&format!("{} ({:?}) at {}\n", ep.name, ep.endpoint_type, ep.path));
}
Ok(output)
}
"leaves" => {
Ok(cli.list_leaves().join("\n"))
}
"info" => {
if parts.len() < 2 { return Err("usage: info <path>".to_string()); }
let info = cli.get_info(parts[1])?;
Ok(format!("{:?}", info))
}
"exec" => {
if parts.len() < 3 { return Err("usage: exec <path> <command>".to_string()); }
let path = parts[1];
let cmd = parts[2..].join(" ");
if cli.is_connected() {
let request = TreeRequest::Exec { cmd: cmd.clone() };
let response = cli.send_request(path, &request)?;
format_response(response)
} else {
let response = cli.exec_local(path, &cmd)?;
format_response(response)
}
}
"cd" => {
if parts.len() < 2 { return Err("usage: cd <path>".to_string()); }
let path = parts[1];
if cli.get_info(path).is_ok() {
cli.set_path(path);
Ok(format!("changed to {}", path))
} else {
Err(format!("path not found: {}", path))
}
}
"pwd" => {
Ok(cli.current_path().to_string())
}
"connect" => {
if parts.len() < 2 { return Err("usage: connect <host:port>".to_string()); }
cli.connect(parts[1])?;
Ok(format!("connected to {}", parts[1]))
}
"stream" => {
if parts.len() < 2 { return Err("usage: stream <path>".to_string()); }
if !cli.is_connected() { return Err("not connected".to_string()); }
let stream_id = cli.open_stream(parts[1])?;
Ok(format!("opened stream {} to {}", stream_id, parts[1]))
}
"close" => {
if parts.len() < 2 { return Err("usage: close <stream_id>".to_string()); }
let stream_id: u16 = parts[1].parse().map_err(|_| "invalid stream id".to_string())?;
cli.close_stream(stream_id)?;
Ok(format!("closed stream {}", stream_id))
}
"send" => {
if parts.len() < 3 { return Err("usage: send <stream_id> <data>".to_string()); }
let stream_id: u16 = parts[1].parse().map_err(|_| "invalid stream id".to_string())?;
let data = parts[2..].join(" ");
cli.send_stream_data(stream_id, data.as_bytes())?;
Ok("sent".to_string())
}
"help" => {
Ok(HELP_TEXT.to_string())
}
_ => Err(format!("unknown command: {}", parts[0])),
}
}
/// Format a TreeResponse for display
fn format_response(response: TreeResponse) -> Result<String, String> {
match response {
TreeResponse::NodeList { names } => Ok(names.join("\n")),
TreeResponse::EndpointList { endpoints } => {
let mut output = String::new();
for ep in endpoints {
output.push_str(&format!("{} ({:?})\n", ep.name, ep.endpoint_type));
}
Ok(output)
}
TreeResponse::LeafList { leaves } => Ok(leaves.join("\n")),
TreeResponse::NodeInfo { info } => Ok(format!("path: {}\nis_leaf: {}\nhas_children: {}\nendpoints: {:?}", info.path, info.is_leaf, info.has_children, info.endpoints)),
TreeResponse::ExecOutput { exit_code, stdout, stderr } => {
let mut output = String::new();
output.push_str(&format!("exit code: {}\n", exit_code));
if !stdout.is_empty() { output.push_str(&format!("stdout: {}\n", String::from_utf8_lossy(&stdout))); }
if !stderr.is_empty() { output.push_str(&format!("stderr: {}\n", String::from_utf8_lossy(&stderr))); }
Ok(output)
}
TreeResponse::StreamOpened { stream_id } => Ok(format!("stream opened: {}", stream_id)),
}
}
/// Help text for CLI commands
const HELP_TEXT: &str = r#"Commands:
ls [path] List child nodes
endpoints [path] List endpoints at path
leaves List all leaf paths
info <path> Get node info
exec <path> <cmd> Execute command at path
cd <path> Change current path
pwd Print working path
connect <host> Connect to remote server
stream <path> Open stream to path
send <id> <data> Send data on stream
close <id> Close stream
help Show this help
"#;
+434
View File
@@ -0,0 +1,434 @@
//! # Client Implementation
//!
//! This module provides the client functionality for connecting to servers,
//! sending requests, and managing streams.
use crate::protocol::{
FrameType, TreeRequest, TreeResponse, TcpTransport, Transport,
make_request, make_stream_open, make_stream_data, make_stream_close,
make_handshake,
};
use crate::tree::Tree;
use crate::leaves::{RemoteShell, TTY};
use std::string::String;
use std::vec::Vec;
use std::fmt;
/// Client state - manages connection and local tree.
///
/// # Example
/// ```
/// use ush_treetest::client::Client;
///
/// // Start with local tree
/// let mut client = Client::new_local();
/// println!("Leaves: {:?}", client.list_leaves());
///
/// // Connect to remote server
/// client.connect("localhost:8080").unwrap();
/// ```
///
/// # Fields
/// * `transport` - Optional TCP transport for remote connection
/// * `tree` - Local tree for local operations
/// * `current_path` - Current working path
/// * `request_id` - Next request ID to send
/// * `stream_id` - Next stream ID to allocate
/// * `streams` - Active streams
/// * `base_path` - Base path assigned by server
/// * `mode` - Operation mode (Local or Connected)
#[allow(dead_code)]
pub struct Client {
transport: Option<TcpTransport>,
#[allow(dead_code)]
tree: Tree,
current_path: String,
request_id: u64,
#[allow(dead_code)]
stream_id: u16,
streams: Vec<StreamState>,
base_path: String,
mode: ClientMode,
}
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
.field("transport", &self.transport.is_some())
.field("current_path", &self.current_path)
.field("mode", &self.mode)
.finish()
}
}
/// Client operation mode.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(dead_code)]
enum ClientMode {
/// Local-only mode (no remote connection)
Local,
/// Connected to remote server
Connected,
}
/// State of an open stream.
///
/// # Fields
/// * `stream_id` - The stream identifier
/// * `path` - The path this stream is connected to
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct StreamState {
stream_id: u16,
path: String,
}
#[allow(dead_code)]
impl Client {
/// Create a new client with a local tree.
///
/// The local tree has `/shell` and `/tty` endpoints registered.
///
/// # Example
/// ```
/// let mut client = Client::new_local();
/// let leaves = client.list_leaves();
/// assert!(leaves.contains(&"/shell".to_string()));
/// ```
pub fn new_local() -> Self {
let mut tree = Tree::new();
tree.add_endpoint("/shell", Box::new(RemoteShell::new("shell")));
tree.add_endpoint("/tty", Box::new(TTY::new("tty")));
Self {
transport: None,
tree,
current_path: String::from("/"),
request_id: 1,
stream_id: 1,
streams: Vec::new(),
base_path: String::from("/"),
mode: ClientMode::Local,
}
}
/// Get the next request ID.
///
/// Each request gets a unique incrementing ID.
fn next_request_id(&mut self) -> u64 {
let id = self.request_id;
self.request_id += 1;
id
}
/// Get the next stream ID.
#[allow(dead_code)]
fn next_stream_id(&mut self) -> u16 {
let id = self.stream_id;
self.stream_id = self.stream_id.wrapping_add(1);
id
}
/// List nodes at a path.
///
/// # Arguments
/// * `path` - Optional path (defaults to current path)
///
/// # Returns
/// List of child node names
///
/// # Example
/// ```
/// let mut client = Client::new_local();
/// let nodes = client.list_nodes(None).unwrap();
/// ```
pub fn list_nodes(&self, path: Option<&str>) -> Result<Vec<String>, String> {
let path = path.unwrap_or(&self.current_path);
self.tree.list_nodes(path)
}
/// List endpoints at a path.
///
/// # Arguments
/// * `path` - Optional path (defaults to current path)
///
/// # Returns
/// List of endpoint information
pub fn list_endpoints(
&self,
path: Option<&str>,
) -> Result<Vec<crate::protocol::EndpointInfo>, String> {
let path = path.unwrap_or(&self.current_path);
self.tree.list_endpoints(path)
}
/// List all leaf paths.
///
/// # Returns
/// List of leaf node paths
///
/// # Example
/// ```
/// let client = Client::new_local();
/// let leaves = client.list_leaves();
/// assert!(!leaves.is_empty());
/// ```
pub fn list_leaves(&self) -> Vec<String> {
self.tree.list_leaves()
}
/// Get information about a node.
///
/// # Arguments
/// * `path` - The path to get info about
///
/// # Returns
/// Node information
///
/// # Example
/// ```
/// let client = Client::new_local();
/// let info = client.get_info("/shell").unwrap();
/// assert!(info.is_leaf);
/// ```
pub fn get_info(&self, path: &str) -> Result<crate::protocol::NodeInfo, String> {
self.tree.get_info(path)
}
/// Execute a command locally on the tree.
///
/// # Arguments
/// * `path` - The path to execute at
/// * `cmd` - The command to execute
///
/// # Returns
/// Execution response with exit code and output
pub fn exec_local(&mut self, path: &str, cmd: &str) -> Result<TreeResponse, String> {
let (handler, matched_path) = self
.tree
.find_handler(path)
.ok_or_else(|| format!("path not found: {}", path))?;
let request = TreeRequest::Exec {
cmd: cmd.to_string(),
};
let mut handler = handler.lock().map_err(|e| e.to_string())?;
handler.handle_request(&request, matched_path)
}
/// Connect to a remote server.
///
/// # Arguments
/// * `addr` - The server address (e.g., "localhost:8080")
///
/// # Returns
/// Ok(()) on success
///
/// # Example
/// ```
/// let mut client = Client::new_local();
/// client.connect("localhost:8080").unwrap();
/// ```
pub fn connect(&mut self, addr: &str) -> Result<(), String> {
let transport = TcpTransport::connect(addr).map_err(|e| e.to_string())?;
self.transport = Some(transport);
self.mode = ClientMode::Connected;
self.do_handshake()
}
/// Perform handshake with remote server.
fn do_handshake(&mut self) -> Result<(), String> {
let transport = self.transport.as_mut().ok_or("not connected")?;
let (header, payload) = make_handshake(vec![self.current_path.clone()]);
transport
.send_frame(&header, Some(&payload))
.map_err(|e| e.to_string())?;
let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?;
if header.frame_type != FrameType::HandshakeAck {
return Err("unexpected response type".to_string());
}
let ack = crate::protocol::HandshakeAck::from_bytes(&payload)
.map_err(|e| e.to_string())?;
if !ack.accepted {
return Err("handshake rejected".to_string());
}
self.base_path = ack.assigned_base_path.clone();
Ok(())
}
/// Send a request to the remote server.
///
/// # Arguments
/// * `dst_path` - The destination path
/// * `request` - The request to send
///
/// # Returns
/// The response from the server
///
/// # Example
/// ```
/// let mut client = Client::new_local();
/// client.connect("localhost:8080").unwrap();
///
/// let request = TreeRequest::Exec { cmd: "echo hello".to_string() };
/// let response = client.send_request("/shell", &request).unwrap();
/// ```
pub fn send_request(&mut self, dst_path: &str, request: &TreeRequest) -> Result<TreeResponse, String> {
let request_id = self.next_request_id();
let transport = self.transport.as_mut().ok_or("not connected")?;
let full_path = if dst_path.starts_with('/') {
dst_path.to_string()
} else {
format!("{}/{}", self.current_path, dst_path)
};
let (header, payload) = make_request(&full_path, &self.base_path, request_id, request);
transport
.send_frame(&header, Some(&payload))
.map_err(|e| e.to_string())?;
let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?;
if header.frame_type != FrameType::Response {
return Err("unexpected response type".to_string());
}
let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?;
Ok(response)
}
/// Open a stream to a remote path.
///
/// # Arguments
/// * `dst_path` - The destination path
///
/// # Returns
/// The stream ID on success
///
/// # Example
/// ```
/// let mut client = Client::new_local();
/// client.connect("localhost:8080").unwrap();
/// let stream_id = client.open_stream("/tty").unwrap();
/// ```
pub fn open_stream(&mut self, dst_path: &str) -> Result<u16, String> {
let request_id = self.next_request_id();
let transport = self.transport.as_mut().ok_or("not connected")?;
let full_path = if dst_path.starts_with('/') {
dst_path.to_string()
} else {
format!("{}/{}", self.current_path, dst_path)
};
let header = make_stream_open(&full_path, &self.base_path, request_id);
transport.send_frame(&header, None).map_err(|e| e.to_string())?;
let (header, payload) = transport.recv_frame().map_err(|e| e.to_string())?;
if header.frame_type != FrameType::Response {
return Err("unexpected response type".to_string());
}
let response = TreeResponse::from_bytes(&payload).map_err(|e| e.to_string())?;
match response {
TreeResponse::StreamOpened { stream_id } => {
self.streams.push(StreamState {
stream_id,
path: full_path,
});
Ok(stream_id)
}
_ => Err("expected StreamOpened".to_string()),
}
}
/// Send data on a stream.
///
/// # Arguments
/// * `stream_id` - The stream ID
/// * `data` - The data to send
///
/// # Example
/// ```
/// let mut client = Client::new_local();
/// client.connect("localhost:8080").unwrap();
/// let stream_id = client.open_stream("/tty").unwrap();
/// client.send_stream_data(stream_id, b"hello").unwrap();
/// ```
pub fn send_stream_data(&mut self, stream_id: u16, data: &[u8]) -> Result<(), String> {
let transport = self.transport.as_mut().ok_or("not connected")?;
let (header, payload) = make_stream_data(stream_id, data);
transport
.send_frame(&header, Some(&payload))
.map_err(|e| e.to_string())
}
/// Close a stream.
///
/// # Arguments
/// * `stream_id` - The stream ID to close
///
/// # Example
/// ```
/// let mut client = Client::new_local();
/// client.connect("localhost:8080").unwrap();
/// let stream_id = client.open_stream("/tty").unwrap();
/// client.close_stream(stream_id).unwrap();
/// ```
pub fn close_stream(&mut self, stream_id: u16) -> Result<(), String> {
let transport = self.transport.as_mut().ok_or("not connected")?;
let header = make_stream_close(stream_id);
transport
.send_frame(&header, None)
.map_err(|e| e.to_string())?;
self.streams.retain(|s| s.stream_id != stream_id);
Ok(())
}
/// Check if connected to remote.
///
/// # Returns
/// True if connected to a remote server
///
/// # Example
/// ```
/// let client = Client::new_local();
/// assert!(!client.is_connected());
/// ```
pub fn is_connected(&self) -> bool {
matches!(self.mode, ClientMode::Connected)
}
/// Get current path.
///
/// # Returns
/// The current working path
///
/// # Example
/// ```
/// let client = Client::new_local();
/// assert_eq!(client.current_path(), "/");
/// ```
pub fn current_path(&self) -> &str {
&self.current_path
}
/// Set current path.
///
/// # Arguments
/// * `path` - The new current path
///
/// # Example
/// ```
/// let mut client = Client::new_local();
/// client.set_path("/shell");
/// assert_eq!(client.current_path(), "/shell");
/// ```
pub fn set_path(&mut self, path: &str) {
self.current_path = path.to_string();
}
}
+74 -10
View File
@@ -1,37 +1,101 @@
//! # RemoteShell Leaf //! # RemoteShell Leaf
//!
//! This module provides command execution functionality.
use crate::protocol::{TreeRequest, TreeResponse, EndpointType}; use crate::protocol::{TreeRequest, TreeResponse, EndpointType};
use crate::tree::Endpoint; use crate::tree::Endpoint;
use std::string::String; use std::string::String;
use std::vec::Vec; use std::vec::Vec;
use std::result::Result; use std::result::Result;
use std::fmt;
pub struct RemoteShell { name: String } /// RemoteShell - executes commands locally.
///
/// # Example
/// ```
/// use ush_treetest::leaves::RemoteShell;
///
/// let shell = RemoteShell::new("shell");
/// ```
pub struct RemoteShell {
name: String,
}
impl RemoteShell { impl RemoteShell {
pub fn new(name: &str) -> Self { Self { name: name.to_string() } } /// Create a new RemoteShell endpoint.
///
/// # Arguments
/// * `name` - The name for this endpoint
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
}
}
fn execute(&self, cmd: &str) -> (i32, Vec<u8>, Vec<u8>) { fn execute(&self, cmd: &str) -> (i32, Vec<u8>, Vec<u8>) {
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
match Command::new("sh").args(["-c", cmd]).stdout(Stdio::piped()).stderr(Stdio::piped()).output() { match Command::new("sh")
.args(["-c", cmd])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output()
{
Ok(out) => (out.status.code().unwrap_or(-1), out.stdout, out.stderr), Ok(out) => (out.status.code().unwrap_or(-1), out.stdout, out.stderr),
Err(e) => (-1, Vec::new(), format!("{}\n", e).into_bytes()), Err(e) => (-1, Vec::new(), format!("{}\n", e).into_bytes()),
} }
} }
} }
impl fmt::Debug for RemoteShell {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RemoteShell")
.field("name", &self.name)
.finish()
}
}
impl Endpoint for RemoteShell { impl Endpoint for RemoteShell {
fn handle_request(&mut self, request: &TreeRequest, _src_path: &str) -> Result<TreeResponse, String> { fn handle_request(
&mut self,
request: &TreeRequest,
_src_path: &str,
) -> Result<TreeResponse, String> {
match request { match request {
TreeRequest::Exec { cmd } => { TreeRequest::Exec { cmd } => {
let (exit_code, stdout, stderr) = self.execute(cmd); let (exit_code, stdout, stderr) = self.execute(cmd);
Ok(TreeResponse::ExecOutput { exit_code, stdout, stderr }) Ok(TreeResponse::ExecOutput {
exit_code,
stdout,
stderr,
})
} }
_ => Err("unsupported request".to_string()), _ => Err("unsupported request".to_string()),
} }
} }
fn on_stream_open(&mut self, _stream_id: u16, _src_path: &str) -> Option<u16> { None }
fn on_stream_data(&mut self, _stream_id: u16, _data: &[u8]) -> bool { false } fn on_stream_open(
fn on_stream_close(&mut self, _stream_id: u16) {} &mut self,
fn endpoint_type(&self) -> EndpointType { EndpointType::Leaf } _stream_id: u16,
fn name(&self) -> &str { &self.name } _src_path: &str,
) -> Option<u16> {
None
}
fn on_stream_data(
&mut self,
_stream_id: u16,
_data: &[u8],
) -> bool {
false
}
fn on_stream_close(&mut self, _stream_id: u16) {}
fn endpoint_type(&self) -> EndpointType {
EndpointType::Leaf
}
fn name(&self) -> &str {
&self.name
}
} }
+62 -46
View File
@@ -8,8 +8,9 @@ use crate::tree::Endpoint;
use std::boxed::Box; use std::boxed::Box;
use std::result::Result; use std::result::Result;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt;
/// A PTY session - represents an active terminal session /// A PTY session - represents an active terminal session.
#[allow(dead_code)] #[allow(dead_code)]
pub struct PtySession { pub struct PtySession {
/// Stream ID for this session /// Stream ID for this session
@@ -17,10 +18,19 @@ pub struct PtySession {
/// Master file descriptor for the PTY /// Master file descriptor for the PTY
pub master: std::os::unix::io::RawFd, pub master: std::os::unix::io::RawFd,
/// Child process PID /// Child process PID
pub child_pid: u32 pub child_pid: u32,
} }
/// TTY endpoint - provides PTY streaming functionality impl fmt::Debug for PtySession {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PtySession")
.field("stream_id", &self.stream_id)
.field("child_pid", &self.child_pid)
.finish()
}
}
/// TTY endpoint - provides PTY streaming functionality.
pub struct TTY { pub struct TTY {
name: String, name: String,
sessions: HashMap<u16, Box<PtySession>>, sessions: HashMap<u16, Box<PtySession>>,
@@ -28,17 +38,29 @@ pub struct TTY {
next_id: u16, next_id: u16,
} }
impl fmt::Debug for TTY {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TTY")
.field("name", &self.name)
.field("sessions", &self.sessions.len())
.finish()
}
}
impl TTY { impl TTY {
/// Create a new TTY endpoint /// Create a new TTY endpoint.
///
/// # Arguments
/// * `name` - The name for this endpoint
pub fn new(name: &str) -> Self { pub fn new(name: &str) -> Self {
Self { Self {
name: name.to_string(), name: name.to_string(),
sessions: HashMap::new(), sessions: HashMap::new(),
next_id: 1 next_id: 1,
} }
} }
/// Open a new PTY session /// Open a new PTY session.
/// ///
/// # Arguments /// # Arguments
/// * `stream_id` - The stream ID for this session /// * `stream_id` - The stream ID for this session
@@ -46,25 +68,21 @@ impl TTY {
/// # Returns /// # Returns
/// Ok(()) on success, Err(message) on failure /// Ok(()) on success, Err(message) on failure
fn open_pty(&mut self, stream_id: u16) -> Result<(), String> { fn open_pty(&mut self, stream_id: u16) -> Result<(), String> {
// Open PTY master - must be unsafe
let master = unsafe { libc::posix_openpt(libc::O_RDWR | libc::O_NOCTTY) }; let master = unsafe { libc::posix_openpt(libc::O_RDWR | libc::O_NOCTTY) };
if master < 0 { if master < 0 {
return Err("failed to open PTY".to_string()); return Err("failed to open PTY".to_string());
} }
// Grant PTY access - unsafe
if unsafe { libc::grantpt(master) } != 0 { if unsafe { libc::grantpt(master) } != 0 {
unsafe { libc::close(master); } unsafe { libc::close(master) };
return Err("failed to grant PTY".to_string()); return Err("failed to grant PTY".to_string());
} }
// Unlock PTY - unsafe
if unsafe { libc::unlockpt(master) } != 0 { if unsafe { libc::unlockpt(master) } != 0 {
unsafe { libc::close(master); } unsafe { libc::close(master) };
return Err("failed to unlock PTY".to_string()); return Err("failed to unlock PTY".to_string());
} }
// Get slave name - unsafe but returns pointer we need to check
let slave_name = unsafe { let slave_name = unsafe {
let ptr = libc::ptsname(master); let ptr = libc::ptsname(master);
if ptr.is_null() { if ptr.is_null() {
@@ -74,26 +92,24 @@ impl TTY {
std::ffi::CStr::from_ptr(ptr).to_string_lossy().into_owned() std::ffi::CStr::from_ptr(ptr).to_string_lossy().into_owned()
}; };
// Fork - unsafe
let pid = unsafe { libc::fork() }; let pid = unsafe { libc::fork() };
if pid < 0 { if pid < 0 {
unsafe { libc::close(master); } unsafe { libc::close(master) };
return Err("fork failed".to_string()); return Err("fork failed".to_string());
} }
if pid == 0 { if pid == 0 {
// Child process - set up slave PTY and exec shell unsafe { libc::close(master) };
unsafe { libc::close(master); }
let slave = unsafe { libc::open(slave_name.as_ptr() as *const libc::c_char, libc::O_RDWR) }; let slave = unsafe {
libc::open(slave_name.as_ptr() as *const libc::c_char, libc::O_RDWR)
};
if slave < 0 { if slave < 0 {
unsafe { libc::exit(1); } unsafe { libc::exit(1) };
} }
// Set controlling terminal - unsafe unsafe { libc::ioctl(slave, libc::TIOCSCTTY, 0) };
unsafe { libc::ioctl(slave, libc::TIOCSCTTY, 0); }
// Redirect stdio - unsafe
unsafe { unsafe {
libc::dup2(slave, libc::STDIN_FILENO); libc::dup2(slave, libc::STDIN_FILENO);
libc::dup2(slave, libc::STDOUT_FILENO); libc::dup2(slave, libc::STDOUT_FILENO);
@@ -101,29 +117,26 @@ impl TTY {
libc::close(slave); libc::close(slave);
} }
// Exec shell - unsafe
unsafe { unsafe {
libc::execl( libc::execl(
"/bin/sh\0".as_ptr() as *const libc::c_char, "/bin/sh\0".as_ptr() as *const libc::c_char,
"sh\0".as_ptr() as *const libc::c_char, "sh\0".as_ptr() as *const libc::c_char,
std::ptr::null::<libc::c_char>() std::ptr::null::<libc::c_char>(),
); );
} }
// If exec fails, exit unsafe { libc::exit(1) };
unsafe { libc::exit(1); }
} }
// Parent - store session
self.sessions.insert(stream_id, Box::new(PtySession { self.sessions.insert(stream_id, Box::new(PtySession {
stream_id, stream_id,
master, master,
child_pid: pid as u32 child_pid: pid as u32,
})); }));
Ok(()) Ok(())
} }
/// Write data to a PTY session /// Write data to a PTY session.
/// ///
/// # Arguments /// # Arguments
/// * `stream_id` - The stream ID /// * `stream_id` - The stream ID
@@ -137,7 +150,7 @@ impl TTY {
libc::write( libc::write(
session.master, session.master,
data.as_ptr() as *const libc::c_void, data.as_ptr() as *const libc::c_void,
data.len() data.len(),
) )
}; };
if written < 0 { if written < 0 {
@@ -146,28 +159,28 @@ impl TTY {
Ok(()) Ok(())
} }
/// Close a PTY session /// Close a PTY session.
/// ///
/// # Arguments /// # Arguments
/// * `stream_id` - The stream ID to close /// * `stream_id` - The stream ID to close
fn close_pty(&mut self, stream_id: u16) { fn close_pty(&mut self, stream_id: u16) {
if let Some(session) = self.sessions.remove(&stream_id) { if let Some(session) = self.sessions.remove(&stream_id) {
// Send SIGTERM to child - unsafe unsafe { libc::kill(session.child_pid as i32, libc::SIGTERM) };
unsafe { libc::kill(session.child_pid as i32, libc::SIGTERM); }
// Wait for child - unsafe
let mut status: libc::c_int = 0; let mut status: libc::c_int = 0;
unsafe { libc::waitpid(session.child_pid as i32, &mut status, 0); } unsafe { libc::waitpid(session.child_pid as i32, &mut status, 0) };
// Close master - unsafe unsafe { libc::close(session.master) };
unsafe { libc::close(session.master); }
} }
} }
} }
impl Endpoint for TTY { impl Endpoint for TTY {
/// Handle a request - TTY only supports exec for basic commands fn handle_request(
fn handle_request(&mut self, request: &TreeRequest, _src_path: &str) -> Result<TreeResponse, String> { &mut self,
request: &TreeRequest,
_src_path: &str,
) -> Result<TreeResponse, String> {
match request { match request {
TreeRequest::Exec { cmd } => { TreeRequest::Exec { cmd } => {
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
@@ -180,35 +193,38 @@ impl Endpoint for TTY {
Ok(TreeResponse::ExecOutput { Ok(TreeResponse::ExecOutput {
exit_code: output.status.code().unwrap_or(-1), exit_code: output.status.code().unwrap_or(-1),
stdout: output.stdout, stdout: output.stdout,
stderr: output.stderr stderr: output.stderr,
}) })
} }
_ => Err("use stream for TTY".to_string()), _ => Err("use stream for TTY".to_string()),
} }
} }
/// Handle stream open - creates a new PTY session fn on_stream_open(
fn on_stream_open(&mut self, stream_id: u16, _src_path: &str) -> Option<u16> { &mut self,
stream_id: u16,
_src_path: &str,
) -> Option<u16> {
self.open_pty(stream_id).ok().map(|_| stream_id) self.open_pty(stream_id).ok().map(|_| stream_id)
} }
/// Handle stream data - writes to PTY fn on_stream_data(
fn on_stream_data(&mut self, stream_id: u16, data: &[u8]) -> bool { &mut self,
stream_id: u16,
data: &[u8],
) -> bool {
self.write_to_pty(stream_id, data).ok(); self.write_to_pty(stream_id, data).ok();
true true
} }
/// Handle stream close - closes PTY session
fn on_stream_close(&mut self, stream_id: u16) { fn on_stream_close(&mut self, stream_id: u16) {
self.close_pty(stream_id); self.close_pty(stream_id);
} }
/// Get endpoint type
fn endpoint_type(&self) -> EndpointType { fn endpoint_type(&self) -> EndpointType {
EndpointType::Stream EndpointType::Stream
} }
/// Get endpoint name
fn name(&self) -> &str { fn name(&self) -> &str {
&self.name &self.name
} }
+76 -203
View File
@@ -3,20 +3,46 @@
//! This is a testbed implementation of a tree-based routing protocol for unshell. //! This is a testbed implementation of a tree-based routing protocol for unshell.
//! It supports serving and connecting to tree endpoints, with leaves for RemoteShell //! It supports serving and connecting to tree endpoints, with leaves for RemoteShell
//! (command execution) and TTY (PTY streaming). //! (command execution) and TTY (PTY streaming).
//!
//! # Commands
//!
//! - `serve [addr]` - Start a server
//! - `connect [addr]` - Connect to a server and run CLI
//! - `run <command>` - Run a single command locally
//! - (default) - Run interactive CLI with local tree
//!
//! # Example
//!
//! ```bash
//! # Start server
//! $ ush-treetest serve 0.0.0.0:8080
//!
//! # Connect from another terminal
//! $ ush-treetest connect localhost:8080
//! ```
mod cli; mod cli;
mod client;
mod leaves; mod leaves;
mod protocol; mod protocol;
mod server;
mod tree; mod tree;
use crate::protocol::{FrameHeader, FrameType, TreeRequest, TreeResponse, make_response, make_handshake_ack, Transport}; use crate::cli::{Cli, parse_and_execute};
use crate::tree::Tree;
use crate::leaves::{RemoteShell, TTY};
use crate::protocol::TcpTransport;
use std::io::{self, Write}; use std::io::{self, Write};
use std::sync::{Arc, Mutex};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
/// CLI argument parser.
///
/// # Example
/// ```
/// let args = Args::parse();
/// match args.command {
/// Some(Command::Serve { addr }) => { ... }
/// Some(Command::Connect { addr }) => { ... }
/// _ => { ... }
/// }
/// ```
#[derive(Parser)] #[derive(Parser)]
#[command(name = "ush-treetest")] #[command(name = "ush-treetest")]
#[command(about = "Unshell tree protocol testbed")] #[command(about = "Unshell tree protocol testbed")]
@@ -28,22 +54,49 @@ struct Args {
addr: Option<String>, addr: Option<String>,
} }
/// Subcommands for the CLI.
///
/// # Variants
/// - `Serve` - Start a server
/// - `Connect` - Connect to a server
/// - `Run` - Run a single command locally
/// - `Cli` - Run interactive CLI (default)
#[derive(Subcommand)] #[derive(Subcommand)]
enum Command { enum Command {
/// Start a server
Serve { Serve {
/// Address to listen on
#[arg(default_value = "0.0.0.0:8080")] #[arg(default_value = "0.0.0.0:8080")]
addr: String, addr: String,
}, },
/// Connect to a server
Connect { Connect {
/// Server address to connect to
#[arg(default_value = "localhost:8080")] #[arg(default_value = "localhost:8080")]
addr: String, addr: String,
}, },
/// Run interactive CLI
Cli {}, Cli {},
/// Run a single command locally
Run { Run {
/// Command to execute
command: String, command: String,
}, },
} }
/// Main entry point.
///
/// # Example
/// ```
/// // Start server
/// $ ush-treetest serve
///
/// // Connect to server
/// $ ush-treetest connect localhost:8080
///
/// // Run locally
/// $ ush-treetest run "exec /shell echo hello"
/// ```
fn main() { fn main() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
@@ -51,7 +104,7 @@ fn main() {
match args.command { match args.command {
Some(Command::Serve { addr }) => { Some(Command::Serve { addr }) => {
run_server(&addr); server::run_server(&addr);
} }
Some(Command::Connect { addr }) => { Some(Command::Connect { addr }) => {
run_client(&addr); run_client(&addr);
@@ -65,201 +118,12 @@ fn main() {
} }
} }
fn run_server(addr: &str) { /// Run the client with connection to a server.
log::info!("Starting server on {}", addr);
let tree = Arc::new(Mutex::new(Tree::new()));
{
let mut tree = tree.lock().unwrap();
tree.add_endpoint("/shell", Box::new(RemoteShell::new("shell")));
tree.add_endpoint("/tty", Box::new(TTY::new("tty")));
}
let listener = TcpTransport::listen(addr).expect("failed to bind");
log::info!("Listening on {}", addr);
loop {
match TcpTransport::accept(&listener) {
Ok(transport) => {
log::info!("New connection from {:?}", transport.peer_addr());
let tree = Arc::clone(&tree);
std::thread::spawn(move || {
handle_connection(transport, tree);
});
}
Err(e) => {
log::error!("accept error: {:?}", e);
}
}
}
}
fn handle_connection(mut transport: TcpTransport, tree: Arc<Mutex<Tree>>) {
let (header, _payload) = match transport.recv_frame() {
Ok(h) => h,
Err(e) => {
log::error!("recv error: {:?}", e);
return;
}
};
if header.frame_type != FrameType::Handshake {
log::error!("expected handshake");
return;
}
log::info!("Client connected");
let (ack_header, ack_payload) = make_handshake_ack(true, "/client");
transport.send_frame(&ack_header, Some(&ack_payload)).expect("send failed");
loop {
match transport.recv_frame() {
Ok((header, payload)) => {
let response = handle_frame(&header, &payload, &tree);
if let Some(response) = response {
let (resp_header, resp_payload) = match response {
Ok((h, p)) => (h, p),
Err(e) => {
log::error!("handle error: {:?}", e);
break;
}
};
transport.send_frame(&resp_header, Some(&resp_payload)).expect("send failed");
}
if header.frame_type == FrameType::StreamClose {
break;
}
}
Err(e) => {
log::error!("recv error: {:?}", e);
break;
}
}
}
log::info!("Connection closed");
}
/// Handle a single frame and return an optional response
/// ///
/// # Arguments /// # Arguments
/// * `header` - The frame header /// * `addr` - Server address
/// * `payload` - The frame payload bytes
/// * `tree` - Shared access to the tree
///
/// # Returns
/// Some(Ok((header, payload))) for a response to send, Some(Err(e)) for an error, None for no response
fn handle_frame(header: &FrameHeader, payload: &[u8], tree: &Arc<Mutex<Tree>>) -> Option<Result<(FrameHeader, Vec<u8>), String>> {
match header.frame_type {
FrameType::Request => {
let request: TreeRequest = match TreeRequest::from_bytes(payload) {
Ok(r) => r,
Err(e) => return Some(Err(e.to_string())),
};
let dst_path = header.dst_path.as_deref().unwrap_or("/");
// Acquire lock for the entire request handling
let mut tree = match tree.lock() {
Ok(t) => t,
Err(e) => return Some(Err(format!("lock error: {}", e))),
};
let response = match request {
TreeRequest::ListNodes {} => {
let names = tree.list_nodes(dst_path).unwrap_or_default();
TreeResponse::NodeList { names }
}
TreeRequest::ListEndpoints {} => {
let endpoints = tree.list_endpoints(dst_path).unwrap_or_default();
TreeResponse::EndpointList { endpoints }
}
TreeRequest::ListLeaves {} => {
let leaves = tree.list_leaves();
TreeResponse::LeafList { leaves }
}
TreeRequest::GetInfo { path } => {
match tree.get_info(&path) {
Ok(info) => TreeResponse::NodeInfo { info },
Err(e) => return Some(Err(e)),
}
}
TreeRequest::Exec { ref cmd } => {
let (handler, matched_path) = match tree.find_handler(dst_path) {
Some(h) => h,
None => return Some(Err(format!("path not found: {}", dst_path))),
};
// Lock the handler and make the request
let result = {
let mut handler = match handler.lock() {
Ok(h) => h,
Err(e) => return Some(Err(format!("lock error: {}", e))),
};
handler.handle_request(&TreeRequest::Exec { cmd: cmd.clone() }, matched_path)
};
match result {
Ok(resp) => resp,
Err(e) => return Some(Err(e)),
}
}
TreeRequest::StreamOpen { path } => {
match tree.open_stream(&path, &header.src_path) {
Ok(stream_id) => TreeResponse::StreamOpened { stream_id },
Err(e) => return Some(Err(e)),
}
}
TreeRequest::Resize { .. } => {
return Some(Err("unsupported request: Resize".to_string()));
}
};
Some(Ok(make_response(&header.src_path, header.request_id.unwrap_or(0), &response)))
}
FrameType::StreamOpen => {
let dst_path = header.dst_path.as_deref().unwrap_or("/");
let mut tree = match tree.lock() {
Ok(t) => t,
Err(e) => return Some(Err(format!("lock error: {}", e))),
};
match tree.open_stream(dst_path, &header.src_path) {
Ok(stream_id) => {
let response = TreeResponse::StreamOpened { stream_id };
Some(Ok(make_response(&header.src_path, header.request_id.unwrap_or(0), &response)))
}
Err(e) => Some(Err(e)),
}
}
FrameType::StreamData => {
let mut tree = match tree.lock() {
Ok(t) => t,
Err(e) => return Some(Err(format!("lock error: {}", e))),
};
tree.route_stream_data(header, payload).ok();
None
}
FrameType::StreamClose => {
let mut tree = match tree.lock() {
Ok(t) => t,
Err(e) => return Some(Err(format!("lock error: {}", e))),
};
if let Some(stream_id) = header.stream_id {
tree.close_stream(stream_id).ok();
}
None
}
_ => Some(Err("unsupported frame type".to_string())),
}
}
fn run_client(addr: &str) { fn run_client(addr: &str) {
let mut cli = cli::Cli::new(); let mut cli = Cli::new();
if let Err(e) = cli.connect(addr) { if let Err(e) = cli.connect(addr) {
eprintln!("Failed to connect: {}", e); eprintln!("Failed to connect: {}", e);
@@ -270,8 +134,9 @@ fn run_client(addr: &str) {
run_cli_loop(&mut cli); run_cli_loop(&mut cli);
} }
/// Run an interactive CLI with a local tree.
fn run_interactive() { fn run_interactive() {
let mut cli = cli::Cli::new(); let mut cli = Cli::new();
println!("Unshell Tree Protocol Testbed"); println!("Unshell Tree Protocol Testbed");
println!("Type 'help' for commands\n"); println!("Type 'help' for commands\n");
@@ -284,7 +149,11 @@ fn run_interactive() {
run_cli_loop(&mut cli); run_cli_loop(&mut cli);
} }
fn run_cli_loop(cli: &mut cli::Cli) { /// Run the CLI command loop.
///
/// # Arguments
/// * `cli` - The CLI instance
fn run_cli_loop(cli: &mut Cli) {
loop { loop {
print!("{}> ", cli.current_path()); print!("{}> ", cli.current_path());
io::stdout().flush().ok(); io::stdout().flush().ok();
@@ -304,7 +173,7 @@ fn run_cli_loop(cli: &mut cli::Cli) {
break; break;
} }
match cli::parse_and_execute(cli, line) { match parse_and_execute(cli, line) {
Ok(output) => { Ok(output) => {
if !output.is_empty() { if !output.is_empty() {
println!("{}", output); println!("{}", output);
@@ -317,10 +186,14 @@ fn run_cli_loop(cli: &mut cli::Cli) {
} }
} }
/// Run a single command locally.
///
/// # Arguments
/// * `command` - The command to run
fn run_single_command(command: &str) { fn run_single_command(command: &str) {
let mut cli = cli::Cli::new(); let mut cli = Cli::new();
match cli::parse_and_execute(&mut cli, command) { match parse_and_execute(&mut cli, command) {
Ok(output) => { Ok(output) => {
if !output.is_empty() { if !output.is_empty() {
println!("{}", output); println!("{}", output);
+38
View File
@@ -1,4 +1,42 @@
//! # Protocol Module //! # Protocol Module
//!
//! This module defines the protocol types and transport layer for the unshell tree protocol.
//! It provides serialization via rkyv and TCP transport for frame passing.
//!
//! # Frame Format
//!
//! Each frame consists of:
//! - 4-byte header length (little-endian u32)
//! - Serialized header bytes (using rkyv)
//! - 4-byte payload length (little-endian u32)
//! - Payload bytes (optional)
//!
//! # Usage
//!
//! ```no_run
//! use ush_treetest::protocol::{
//! FrameType, FrameHeader, TreeRequest, TreeResponse,
//! TcpTransport, Transport,
//! };
//!
//! // Connect to server
//! let mut transport = TcpTransport::connect("localhost:8080").unwrap();
//!
//! // Send a request
//! let header = FrameHeader {
//! frame_type: FrameType::Request,
//! dst_path: Some("/shell".to_string()),
//! src_path: "/client".to_string(),
//! request_id: Some(1),
//! stream_id: None,
//! };
//! let payload = TreeRequest::Exec { cmd: "echo hello".to_string() }.to_bytes();
//! transport.send_frame(&header, Some(&payload)).unwrap();
//!
//! // Receive response
//! let (header, payload) = transport.recv_frame().unwrap();
//! let response = TreeResponse::from_bytes(&payload).unwrap();
//! ```
pub mod types; pub mod types;
pub mod transport; pub mod transport;
+241 -46
View File
@@ -1,27 +1,85 @@
//! # Transport Layer //! # Transport Layer
//! //!
//! This module provides the Transport trait and TCP implementation. //! This module provides the Transport trait and TCP implementation.
//! Uses a simple length-prefixed framing: [u32 header_len][header bytes][u32 payload_len][payload bytes] //! Uses a simple length-prefixed framing: `[u32 header_len][header bytes][u32 payload_len][payload bytes]`
//!
//! # Frame Format
//!
//! Each frame is encoded as:
//! - 4 bytes: header length (little-endian u32)
//! - N bytes: serialized header
//! - 4 bytes: payload length (little-endian u32)
//! - M bytes: payload (optional)
//!
//! # Usage
//!
//! ```no_run
//! use ush_treetest::protocol::{TcpTransport, Transport, FrameHeader, FrameType};
//!
//! // Connect to server
//! let mut transport = TcpTransport::connect("localhost:8080").unwrap();
//!
//! // Send a frame
//! let header = FrameHeader {
//! frame_type: FrameType::Request,
//! dst_path: Some("/shell".to_string()),
//! src_path: "/client".to_string(),
//! request_id: Some(1),
//! stream_id: None,
//! };
//! transport.send_frame(&header, Some(b"test payload")).unwrap();
//!
//! // Receive a frame
//! let (header, payload) = transport.recv_frame().unwrap();
//! ```
use crate::protocol::types::*; use crate::protocol::types::*;
use std::net::{TcpStream, TcpListener}; use std::net::{TcpStream, TcpListener};
use std::io::{Read, Write, Error}; use std::io::{Read, Write, Error};
/// Transport trait - interface for sending and receiving frames.
///
/// This trait defines the interface for all transport implementations.
/// Implementors must provide send_frame, recv_frame, and close methods.
pub trait Transport: Sized { pub trait Transport: Sized {
/// Error type for this transport
type Error: std::fmt::Debug; type Error: std::fmt::Debug;
/// Send a frame (header + optional payload)
fn send_frame(&mut self, header: &FrameHeader, payload: Option<&[u8]>) -> Result<(), Self::Error>; /// Send a frame (header + optional payload).
/// Receive a frame ///
/// # Arguments
/// * `header` - The frame header
/// * `payload` - Optional payload bytes
fn send_frame(
&mut self,
header: &FrameHeader,
payload: Option<&[u8]>,
) -> Result<(), Self::Error>;
/// Receive a frame.
///
/// # Returns
/// (header, payload) tuple
fn recv_frame(&mut self) -> Result<(FrameHeader, Vec<u8>), Self::Error>; fn recv_frame(&mut self) -> Result<(FrameHeader, Vec<u8>), Self::Error>;
/// Close the transport
/// Close the transport.
#[allow(dead_code)] #[allow(dead_code)]
fn close(&mut self) -> Result<(), Self::Error>; fn close(&mut self) -> Result<(), Self::Error>;
} }
/// Transport-level errors.
///
/// # Variants
/// * `ConnectionClosed` - The connection was closed
/// * `InvalidFrame` - The frame was invalid
/// * `Io` - I/O error
#[derive(Debug)] #[derive(Debug)]
pub enum TransportError { pub enum TransportError {
/// Connection was closed
ConnectionClosed, ConnectionClosed,
/// Invalid frame format
InvalidFrame(String), InvalidFrame(String),
/// I/O error
Io(String), Io(String),
} }
@@ -36,52 +94,106 @@ impl std::fmt::Display for TransportError {
} }
impl From<Error> for TransportError { impl From<Error> for TransportError {
fn from(e: Error) -> Self { TransportError::Io(e.to_string()) } fn from(e: Error) -> Self {
TransportError::Io(e.to_string())
}
} }
/// TCP transport implementation /// TCP transport implementation.
#[derive(Debug)]
pub struct TcpTransport { pub struct TcpTransport {
stream: TcpStream, stream: TcpStream,
} }
impl TcpTransport { impl TcpTransport {
/// Create a new TCP transport from an existing stream.
///
/// Sets read/write timeouts to 30 seconds for safety.
///
/// # Arguments
/// * `stream` - An existing TCP stream
pub fn new(stream: TcpStream) -> Self { pub fn new(stream: TcpStream) -> Self {
// Set timeouts for safety stream
stream.set_read_timeout(Some(std::time::Duration::from_secs(30))).ok(); .set_read_timeout(Some(std::time::Duration::from_secs(30)))
stream.set_write_timeout(Some(std::time::Duration::from_secs(30))).ok(); .ok();
stream
.set_write_timeout(Some(std::time::Duration::from_secs(30)))
.ok();
Self { stream } Self { stream }
} }
/// Connect to a remote address /// Connect to a remote address.
///
/// # Arguments
/// * `addr` - The address to connect to (e.g., "localhost:8080")
///
/// # Returns
/// Connected transport
///
/// # Example
/// ```
/// let transport = TcpTransport::connect("localhost:8080").unwrap();
/// ```
pub fn connect(addr: &str) -> Result<Self, TransportError> { pub fn connect(addr: &str) -> Result<Self, TransportError> {
let stream = TcpStream::connect(addr)?; let stream = TcpStream::connect(addr)?;
Ok(Self::new(stream)) Ok(Self::new(stream))
} }
/// Create a listening socket /// Create a listening socket.
///
/// # Arguments
/// * `addr` - The address to listen on
///
/// # Returns
/// TCP listener
///
/// # Example
/// ```
/// let listener = TcpTransport::listen("0.0.0.0:8080").unwrap();
/// ```
pub fn listen(addr: &str) -> Result<std::net::TcpListener, TransportError> { pub fn listen(addr: &str) -> Result<std::net::TcpListener, TransportError> {
let listener = TcpListener::bind(addr)?; let listener = TcpListener::bind(addr)?;
listener.set_nonblocking(false)?; listener.set_nonblocking(false)?;
Ok(listener) Ok(listener)
} }
/// Accept an incoming connection /// Accept an incoming connection.
///
/// # Arguments
/// * `listener` - The listening socket
///
/// # Returns
/// New transport for the connection
///
/// # Example
/// ```
/// let listener = TcpTransport::listen("0.0.0.0:8080").unwrap();
/// let transport = TcpTransport::accept(&listener).unwrap();
/// ```
pub fn accept(listener: &std::net::TcpListener) -> Result<Self, TransportError> { pub fn accept(listener: &std::net::TcpListener) -> Result<Self, TransportError> {
let stream = listener.accept()?.0; let stream = listener.accept()?.0;
Ok(Self::new(stream)) Ok(Self::new(stream))
} }
/// Get peer address /// Get peer address.
///
/// # Returns
/// The peer's socket address
pub fn peer_addr(&self) -> Result<std::net::SocketAddr, std::io::Error> { pub fn peer_addr(&self) -> Result<std::net::SocketAddr, std::io::Error> {
self.stream.peer_addr() self.stream.peer_addr()
} }
/// Read exactly n bytes /// Read exactly n bytes.
///
/// Will block until all bytes are read or an error occurs.
fn read_exact(&mut self, mut n: usize) -> Result<Vec<u8>, TransportError> { fn read_exact(&mut self, mut n: usize) -> Result<Vec<u8>, TransportError> {
let mut buf = Vec::with_capacity(n); let mut buf = Vec::with_capacity(n);
while n > 0 { while n > 0 {
let mut chunk = vec![0u8; n]; let mut chunk = vec![0u8; n];
let read = self.stream.read(&mut chunk).map_err(|e| TransportError::Io(e.to_string()))?; let read =
self.stream
.read(&mut chunk)
.map_err(|e| TransportError::Io(e.to_string()))?;
if read == 0 { if read == 0 {
return Err(TransportError::ConnectionClosed); return Err(TransportError::ConnectionClosed);
} }
@@ -95,41 +207,45 @@ impl TcpTransport {
impl Transport for TcpTransport { impl Transport for TcpTransport {
type Error = TransportError; type Error = TransportError;
fn send_frame(&mut self, header: &FrameHeader, payload: Option<&[u8]>) -> Result<(), Self::Error> { fn send_frame(
// Serialize header using rkyv &mut self,
header: &FrameHeader,
payload: Option<&[u8]>,
) -> Result<(), Self::Error> {
let header_bytes = header.to_bytes(); let header_bytes = header.to_bytes();
let header_len = header_bytes.len() as u32; let header_len = header_bytes.len() as u32;
// Get payload bytes
let payload_bytes = payload.unwrap_or(&[]); let payload_bytes = payload.unwrap_or(&[]);
let payload_len = payload_bytes.len() as u32; let payload_len = payload_bytes.len() as u32;
// Build frame: [u32 header_len][header][u32 payload_len][payload] let mut frame =
let mut frame = Vec::with_capacity(4 + header_len as usize + 4 + payload_len as usize); Vec::with_capacity(4 + header_len as usize + 4 + payload_len as usize);
frame.extend_from_slice(&header_len.to_le_bytes()); frame.extend_from_slice(&header_len.to_le_bytes());
frame.extend_from_slice(&header_bytes); frame.extend_from_slice(&header_bytes);
frame.extend_from_slice(&payload_len.to_le_bytes()); frame.extend_from_slice(&payload_len.to_le_bytes());
frame.extend_from_slice(payload_bytes); frame.extend_from_slice(payload_bytes);
self.stream.write_all(&frame).map_err(|e| TransportError::Io(e.to_string()))?; self.stream
self.stream.flush().map_err(|e| TransportError::Io(e.to_string()))?; .write_all(&frame)
.map_err(|e| TransportError::Io(e.to_string()))?;
self.stream
.flush()
.map_err(|e| TransportError::Io(e.to_string()))?;
Ok(()) Ok(())
} }
fn recv_frame(&mut self) -> Result<(FrameHeader, Vec<u8>), Self::Error> { fn recv_frame(&mut self) -> Result<(FrameHeader, Vec<u8>), Self::Error> {
// Read header length
let header_len_bytes = self.read_exact(4)?; let header_len_bytes = self.read_exact(4)?;
let header_len = u32::from_le_bytes(header_len_bytes.try_into().unwrap()) as usize; let header_len = u32::from_le_bytes(header_len_bytes.try_into().unwrap()) as usize;
// Read header
let header_bytes = self.read_exact(header_len)?; let header_bytes = self.read_exact(header_len)?;
let header = FrameHeader::from_bytes(&header_bytes).map_err(|e| TransportError::InvalidFrame(e))?; let header =
FrameHeader::from_bytes(&header_bytes).map_err(|e| TransportError::InvalidFrame(e))?;
// Read payload length
let payload_len_bytes = self.read_exact(4)?; let payload_len_bytes = self.read_exact(4)?;
let payload_len = u32::from_le_bytes(payload_len_bytes.try_into().unwrap()) as usize; let payload_len =
u32::from_le_bytes(payload_len_bytes.try_into().unwrap()) as usize;
// Read payload
let payload = if payload_len > 0 { let payload = if payload_len > 0 {
self.read_exact(payload_len)? self.read_exact(payload_len)?
} else { } else {
@@ -140,17 +256,37 @@ impl Transport for TcpTransport {
} }
fn close(&mut self) -> Result<(), Self::Error> { fn close(&mut self) -> Result<(), Self::Error> {
self.stream.shutdown(std::net::Shutdown::Both).map_err(|e| TransportError::Io(e.to_string()))?; self.stream
.shutdown(std::net::Shutdown::Both)
.map_err(|e| TransportError::Io(e.to_string()))?;
Ok(()) Ok(())
} }
} }
// ============================================================================= /// Create a request frame.
// Frame builder functions ///
// ============================================================================= /// # Arguments
/// * `dst_path` - Destination path
/// Create a request frame /// * `src_path` - Source path
pub fn make_request(dst_path: &str, src_path: &str, request_id: u64, request: &TreeRequest) -> (FrameHeader, Vec<u8>) { /// * `request_id` - Request ID
/// * `request` - The request payload
///
/// # Returns
/// (header, payload) tuple
///
/// # Example
/// ```
/// use ush_treetest::protocol::{make_request, TreeRequest};
///
/// let request = TreeRequest::Exec { cmd: "echo hello".to_string() };
/// let (header, payload) = make_request("/shell", "/client", 1, &request);
/// ```
pub fn make_request(
dst_path: &str,
src_path: &str,
request_id: u64,
request: &TreeRequest,
) -> (FrameHeader, Vec<u8>) {
let header = FrameHeader { let header = FrameHeader {
frame_type: FrameType::Request, frame_type: FrameType::Request,
dst_path: Some(dst_path.to_string()), dst_path: Some(dst_path.to_string()),
@@ -162,8 +298,20 @@ pub fn make_request(dst_path: &str, src_path: &str, request_id: u64, request: &T
(header, payload) (header, payload)
} }
/// Create a response frame /// Create a response frame.
pub fn make_response(src_path: &str, request_id: u64, response: &TreeResponse) -> (FrameHeader, Vec<u8>) { ///
/// # Arguments
/// * `src_path` - Source path
/// * `request_id` - Request ID
/// * `response` - The response payload
///
/// # Returns
/// (header, payload) tuple
pub fn make_response(
src_path: &str,
request_id: u64,
response: &TreeResponse,
) -> (FrameHeader, Vec<u8>) {
let header = FrameHeader { let header = FrameHeader {
frame_type: FrameType::Response, frame_type: FrameType::Response,
dst_path: None, dst_path: None,
@@ -175,7 +323,15 @@ pub fn make_response(src_path: &str, request_id: u64, response: &TreeResponse) -
(header, payload) (header, payload)
} }
/// Create a stream open frame /// Create a stream open frame.
///
/// # Arguments
/// * `dst_path` - Destination path
/// * `src_path` - Source path
/// * `request_id` - Request ID
///
/// # Returns
/// Frame header (no payload)
pub fn make_stream_open(dst_path: &str, src_path: &str, request_id: u64) -> FrameHeader { pub fn make_stream_open(dst_path: &str, src_path: &str, request_id: u64) -> FrameHeader {
FrameHeader { FrameHeader {
frame_type: FrameType::StreamOpen, frame_type: FrameType::StreamOpen,
@@ -186,7 +342,14 @@ pub fn make_stream_open(dst_path: &str, src_path: &str, request_id: u64) -> Fram
} }
} }
/// Create a stream data frame /// Create a stream data frame.
///
/// # Arguments
/// * `stream_id` - Stream ID
/// * `data` - Data to send
///
/// # Returns
/// (header, payload) tuple
pub fn make_stream_data(stream_id: u16, data: &[u8]) -> (FrameHeader, Vec<u8>) { pub fn make_stream_data(stream_id: u16, data: &[u8]) -> (FrameHeader, Vec<u8>) {
let header = FrameHeader { let header = FrameHeader {
frame_type: FrameType::StreamData, frame_type: FrameType::StreamData,
@@ -198,7 +361,13 @@ pub fn make_stream_data(stream_id: u16, data: &[u8]) -> (FrameHeader, Vec<u8>) {
(header, data.to_vec()) (header, data.to_vec())
} }
/// Create a stream close frame /// Create a stream close frame.
///
/// # Arguments
/// * `stream_id` - Stream ID to close
///
/// # Returns
/// Frame header (no payload)
pub fn make_stream_close(stream_id: u16) -> FrameHeader { pub fn make_stream_close(stream_id: u16) -> FrameHeader {
FrameHeader { FrameHeader {
frame_type: FrameType::StreamClose, frame_type: FrameType::StreamClose,
@@ -209,9 +378,25 @@ pub fn make_stream_close(stream_id: u16) -> FrameHeader {
} }
} }
/// Create a handshake frame /// Create a handshake frame.
///
/// # Arguments
/// * `registered_paths` - Paths to register
///
/// # Returns
/// (header, payload) tuple
///
/// # Example
/// ```
/// use ush_treetest::protocol::make_handshake;
///
/// let paths = vec!["/client".to_string()];
/// let (header, payload) = make_handshake(paths);
/// ```
pub fn make_handshake(registered_paths: Vec<String>) -> (FrameHeader, Vec<u8>) { pub fn make_handshake(registered_paths: Vec<String>) -> (FrameHeader, Vec<u8>) {
let handshake = Handshake { registered_paths }; let handshake = Handshake {
registered_paths,
};
let payload = handshake.to_bytes(); let payload = handshake.to_bytes();
let header = FrameHeader { let header = FrameHeader {
frame_type: FrameType::Handshake, frame_type: FrameType::Handshake,
@@ -223,11 +408,21 @@ pub fn make_handshake(registered_paths: Vec<String>) -> (FrameHeader, Vec<u8>) {
(header, payload) (header, payload)
} }
/// Create a handshake ack frame /// Create a handshake ack frame.
pub fn make_handshake_ack(accepted: bool, assigned_base_path: &str) -> (FrameHeader, Vec<u8>) { ///
/// # Arguments
/// * `accepted` - Whether handshake was accepted
/// * `assigned_base_path` - Base path to assign
///
/// # Returns
/// (header, payload) tuple
pub fn make_handshake_ack(
accepted: bool,
assigned_base_path: &str,
) -> (FrameHeader, Vec<u8>) {
let ack = HandshakeAck { let ack = HandshakeAck {
accepted, accepted,
assigned_base_path: assigned_base_path.to_string() assigned_base_path: assigned_base_path.to_string(),
}; };
let payload = ack.to_bytes(); let payload = ack.to_bytes();
let header = FrameHeader { let header = FrameHeader {
+273 -15
View File
@@ -2,27 +2,86 @@
//! //!
//! This module defines the core types for the UnShell protocol. //! This module defines the core types for the UnShell protocol.
//! Uses rkyv for zero-copy serialization. //! Uses rkyv for zero-copy serialization.
//!
//! # Serialization
//!
//! All types implement `rkyv::Archive`, `rkyv::Serialize`, and `rkyv::Deserialize`
//! for efficient serialization without runtime type information.
//!
//! # Example
//!
//! ```no_run
//! use ush_treetest::protocol::{TreeRequest, TreeResponse};
//!
//! // Serialize a request
//! let request = TreeRequest::Exec { cmd: "echo hello".to_string() };
//! let bytes = request.to_bytes();
//!
//! // Deserialize it back
//! let decoded = TreeRequest::from_bytes(&bytes).unwrap();
//! ```
use rkyv::{Archive, Serialize, Deserialize}; use rkyv::{Archive, Serialize, Deserialize};
use std::string::String; use std::string::String;
use std::vec::Vec; use std::vec::Vec;
/// Default buffer size for rkyv serialization.
///
/// This value is chosen to accommodate typical protocol messages.
const BUFFER_SIZE: usize = 4096; const BUFFER_SIZE: usize = 4096;
/// Frame type enum - distinguishes between different frame kinds /// Frame type enum - distinguishes between different frame kinds.
///
/// Each frame type has a specific purpose in the protocol:
/// - `Request` / `Response`: Request-response pairs
/// - `StreamOpen` / `StreamData` / `StreamClose`: Streaming operations
/// - `Handshake` / `HandshakeAck`: Connection setup
///
/// # Example
/// ```
/// use ush_treetest::protocol::FrameType;
///
/// let frame_type = FrameType::Request;
/// assert_eq!(frame_type as u8, 0x01);
/// ```
#[derive(Archive, Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] #[derive(Archive, Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)] #[repr(u8)]
pub enum FrameType { pub enum FrameType {
/// Request frame - client requesting an operation
Request = 0x01, Request = 0x01,
/// Response frame - server responding to a request
Response = 0x02, Response = 0x02,
/// Stream open frame - requesting a stream
StreamOpen = 0x03, StreamOpen = 0x03,
/// Stream data frame - sending data on a stream
StreamData = 0x04, StreamData = 0x04,
/// Stream close frame - closing a stream
StreamClose = 0x05, StreamClose = 0x05,
/// Handshake frame - connection initialization
Handshake = 0x10, Handshake = 0x10,
/// Handshake acknowledgement - connection acceptance
HandshakeAck = 0x11, HandshakeAck = 0x11,
} }
impl FrameType { impl FrameType {
/// Convert a byte value to a FrameType.
///
/// # Arguments
/// * `v` - The byte value to convert
///
/// # Returns
/// Some(FrameType) if valid, None otherwise
///
/// # Example
/// ```
/// use ush_treetest::protocol::FrameType;
///
/// let ft = FrameType::from_u8(0x01);
/// assert_eq!(ft, Some(FrameType::Request));
///
/// let invalid = FrameType::from_u8(0xFF);
/// assert_eq!(invalid, None);
/// ```
#[allow(dead_code)] #[allow(dead_code)]
pub fn from_u8(v: u8) -> Option<Self> { pub fn from_u8(v: u8) -> Option<Self> {
match v { match v {
@@ -38,124 +97,323 @@ impl FrameType {
} }
} }
/// Frame header - the metadata sent before each payload /// Frame header - the metadata sent before each payload.
///
/// The header contains routing information and identifies the frame type.
///
/// # Fields
/// * `frame_type` - The type of frame
/// * `dst_path` - Optional destination path for routing
/// * `src_path` - Source path for the frame
/// * `request_id` - Optional request ID for correlation
/// * `stream_id` - Optional stream ID for streaming
///
/// # Example
/// ```
/// use ush_treetest::protocol::{FrameHeader, FrameType};
///
/// let header = FrameHeader {
/// frame_type: FrameType::Request,
/// dst_path: Some("/shell".to_string()),
/// src_path: "/client".to_string(),
/// request_id: Some(1),
/// stream_id: None,
/// };
///
/// // Serialize and deserialize
/// let bytes = header.to_bytes();
/// let decoded = FrameHeader::from_bytes(&bytes).unwrap();
/// assert_eq!(decoded.frame_type, FrameType::Request);
/// ```
#[derive(Archive, Serialize, Deserialize, Debug, Clone)] #[derive(Archive, Serialize, Deserialize, Debug, Clone)]
pub struct FrameHeader { pub struct FrameHeader {
/// The type of this frame
pub frame_type: FrameType, pub frame_type: FrameType,
/// Destination path for routing (None for responses)
pub dst_path: Option<String>, pub dst_path: Option<String>,
/// Source path of the sender
pub src_path: String, pub src_path: String,
/// Request ID for correlation (for request/response)
pub request_id: Option<u64>, pub request_id: Option<u64>,
/// Stream ID (for stream operations)
pub stream_id: Option<u16>, pub stream_id: Option<u16>,
} }
impl FrameHeader { impl FrameHeader {
/// Serialize the header to bytes.
///
/// # Returns
/// Serialized bytes
pub fn to_bytes(&self) -> Vec<u8> { pub fn to_bytes(&self) -> Vec<u8> {
rkyv::to_bytes::<FrameHeader, BUFFER_SIZE>(self).unwrap().into_vec() rkyv::to_bytes::<FrameHeader, BUFFER_SIZE>(self)
.unwrap()
.into_vec()
} }
/// Deserialize header from bytes.
///
/// # Arguments
/// * `bytes` - Serialized bytes
///
/// # Returns
/// Deserialized header
pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> { pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string())
} }
} }
/// Tree request - operations on the tree /// Tree request - operations on the tree.
///
/// These requests are sent from clients to servers to perform operations.
///
/// # Example
/// ```
/// use ush_treetest::protocol::TreeRequest;
///
/// // Execute a command
/// let request = TreeRequest::Exec { cmd: "echo hello".to_string() };
/// let bytes = request.to_bytes();
/// let decoded = TreeRequest::from_bytes(&bytes).unwrap();
/// ```
#[derive(Archive, Serialize, Deserialize, Debug, Clone)] #[derive(Archive, Serialize, Deserialize, Debug, Clone)]
pub enum TreeRequest { pub enum TreeRequest {
/// List child nodes at a path
ListNodes {}, ListNodes {},
/// List endpoints at a path
ListEndpoints {}, ListEndpoints {},
/// List all leaf paths in the tree
ListLeaves {}, ListLeaves {},
/// Get information about a node
GetInfo { path: String }, GetInfo { path: String },
/// Execute a command
Exec { cmd: String }, Exec { cmd: String },
/// Open a stream to a path
StreamOpen { path: String }, StreamOpen { path: String },
/// Resize a terminal
Resize { rows: u16, cols: u16 }, Resize { rows: u16, cols: u16 },
} }
impl TreeRequest { impl TreeRequest {
/// Serialize the request to bytes.
pub fn to_bytes(&self) -> Vec<u8> { pub fn to_bytes(&self) -> Vec<u8> {
rkyv::to_bytes::<TreeRequest, BUFFER_SIZE>(self).unwrap().into_vec() rkyv::to_bytes::<TreeRequest, BUFFER_SIZE>(self)
.unwrap()
.into_vec()
} }
/// Deserialize request from bytes.
///
/// # Arguments
/// * `bytes` - Serialized bytes
pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> { pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string())
} }
} }
/// Tree response - results from tree operations /// Tree response - results from tree operations.
///
/// These responses are sent from servers to clients.
///
/// # Example
/// ```
/// use ush_treetest::protocol::TreeResponse;
///
/// let response = TreeResponse::ExecOutput {
/// exit_code: 0,
/// stdout: b"hello".to_vec(),
/// stderr: b"".to_vec(),
/// };
/// let bytes = response.to_bytes();
/// ```
#[derive(Archive, Serialize, Deserialize, Debug, Clone)] #[derive(Archive, Serialize, Deserialize, Debug, Clone)]
pub enum TreeResponse { pub enum TreeResponse {
/// List of child node names
NodeList { names: Vec<String> }, NodeList { names: Vec<String> },
/// List of endpoints
EndpointList { endpoints: Vec<EndpointInfo> }, EndpointList { endpoints: Vec<EndpointInfo> },
/// List of leaf paths
LeafList { leaves: Vec<String> }, LeafList { leaves: Vec<String> },
/// Node information
NodeInfo { info: NodeInfo }, NodeInfo { info: NodeInfo },
ExecOutput { exit_code: i32, stdout: Vec<u8>, stderr: Vec<u8> }, /// Command execution output
ExecOutput {
exit_code: i32,
stdout: Vec<u8>,
stderr: Vec<u8>,
},
/// Stream opened confirmation
StreamOpened { stream_id: u16 }, StreamOpened { stream_id: u16 },
} }
impl TreeResponse { impl TreeResponse {
/// Serialize the response to bytes.
pub fn to_bytes(&self) -> Vec<u8> { pub fn to_bytes(&self) -> Vec<u8> {
rkyv::to_bytes::<TreeResponse, BUFFER_SIZE>(self).unwrap().into_vec() rkyv::to_bytes::<TreeResponse, BUFFER_SIZE>(self)
.unwrap()
.into_vec()
} }
/// Deserialize response from bytes.
///
/// # Arguments
/// * `bytes` - Serialized bytes
pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> { pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string())
} }
} }
/// Information about an endpoint /// Information about an endpoint.
///
/// # Fields
/// * `name` - The endpoint name
/// * `path` - The path where the endpoint is registered
/// * `endpoint_type` - The type of endpoint
///
/// # Example
/// ```
/// use ush_treetest::protocol::{EndpointInfo, EndpointType};
///
/// let info = EndpointInfo {
/// name: "shell".to_string(),
/// path: "/shell".to_string(),
/// endpoint_type: EndpointType::Leaf,
/// };
/// ```
#[derive(Archive, Serialize, Deserialize, Debug, Clone)] #[derive(Archive, Serialize, Deserialize, Debug, Clone)]
pub struct EndpointInfo { pub struct EndpointInfo {
/// The endpoint name
pub name: String, pub name: String,
/// The path where this endpoint is registered
pub path: String, pub path: String,
/// The type of this endpoint
pub endpoint_type: EndpointType, pub endpoint_type: EndpointType,
} }
/// Type of endpoint /// Type of endpoint.
///
/// # Example
/// ```
/// use ush_treetest::protocol::EndpointType;
///
/// let leaf_type = EndpointType::Leaf;
/// assert!(matches!(leaf_type, EndpointType::Leaf));
/// ```
#[derive(Archive, Serialize, Deserialize, Debug, Clone, Copy)] #[derive(Archive, Serialize, Deserialize, Debug, Clone, Copy)]
#[repr(u8)] #[repr(u8)]
pub enum EndpointType { pub enum EndpointType {
/// Leaf endpoint - executes commands
Leaf = 0x01, Leaf = 0x01,
/// Proxy endpoint - routes to other endpoints
Proxy = 0x02, Proxy = 0x02,
/// Stream endpoint - provides streaming
Stream = 0x03, Stream = 0x03,
} }
/// Information about a node in the tree /// Information about a node in the tree.
///
/// # Fields
/// * `path` - The node path
/// * `is_leaf` - Whether this is a leaf node
/// * `has_children` - Whether this node has children
/// * `endpoints` - List of endpoint names at this node
///
/// # Example
/// ```
/// use ush_treetest::protocol::NodeInfo;
///
/// let info = NodeInfo {
/// path: "/shell".to_string(),
/// is_leaf: true,
/// has_children: false,
/// endpoints: vec!["shell".to_string()],
/// };
/// assert!(info.is_leaf);
/// ```
#[derive(Archive, Serialize, Deserialize, Debug, Clone)] #[derive(Archive, Serialize, Deserialize, Debug, Clone)]
pub struct NodeInfo { pub struct NodeInfo {
/// The node path
pub path: String, pub path: String,
/// Whether this is a leaf node (endpoint with no children)
pub is_leaf: bool, pub is_leaf: bool,
/// Whether this node has children
pub has_children: bool, pub has_children: bool,
/// Names of endpoints at this node
pub endpoints: Vec<String>, pub endpoints: Vec<String>,
} }
/// Handshake message - sent when connecting /// Handshake message - sent when connecting.
///
/// The client sends registered paths during handshake.
///
/// # Fields
/// * `registered_paths` - Paths the client wants to register
///
/// # Example
/// ```
/// use ush_treetest::protocol::Handshake;
///
/// let handshake = Handshake {
/// registered_paths: vec!["/client".to_string()],
/// };
/// let bytes = handshake.to_bytes();
/// ```
#[derive(Archive, Serialize, Deserialize, Debug, Clone)] #[derive(Archive, Serialize, Deserialize, Debug, Clone)]
pub struct Handshake { pub struct Handshake {
/// Paths the client wants to register
pub registered_paths: Vec<String>, pub registered_paths: Vec<String>,
} }
impl Handshake { impl Handshake {
/// Serialize the handshake to bytes.
pub fn to_bytes(&self) -> Vec<u8> { pub fn to_bytes(&self) -> Vec<u8> {
rkyv::to_bytes::<Handshake, BUFFER_SIZE>(self).unwrap().into_vec() rkyv::to_bytes::<Handshake, BUFFER_SIZE>(self)
.unwrap()
.into_vec()
} }
/// Deserialize handshake from bytes.
#[allow(dead_code)] #[allow(dead_code)]
pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> { pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string())
} }
} }
/// Handshake acknowledgement - router's response to handshake /// Handshake acknowledgement - router's response to handshake.
///
/// # Fields
/// * `accepted` - Whether the handshake was accepted
/// * `assigned_base_path` - Base path assigned by the server
///
/// # Example
/// ```
/// use ush_treetest::protocol::HandshakeAck;
///
/// let ack = HandshakeAck {
/// accepted: true,
/// assigned_base_path: "/client".to_string(),
/// };
/// assert!(ack.accepted);
/// ```
#[derive(Archive, Serialize, Deserialize, Debug, Clone)] #[derive(Archive, Serialize, Deserialize, Debug, Clone)]
pub struct HandshakeAck { pub struct HandshakeAck {
/// Whether the handshake was accepted
pub accepted: bool, pub accepted: bool,
/// Base path assigned by the server
pub assigned_base_path: String, pub assigned_base_path: String,
} }
impl HandshakeAck { impl HandshakeAck {
/// Serialize the acknowledgement to bytes.
pub fn to_bytes(&self) -> Vec<u8> { pub fn to_bytes(&self) -> Vec<u8> {
rkyv::to_bytes::<HandshakeAck, BUFFER_SIZE>(self).unwrap().into_vec() rkyv::to_bytes::<HandshakeAck, BUFFER_SIZE>(self)
.unwrap()
.into_vec()
} }
/// Deserialize acknowledgement from bytes.
///
/// # Arguments
/// * `bytes` - Serialized bytes
pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> { pub fn from_bytes(bytes: &[u8]) -> Result<Self, String> {
unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string()) unsafe { rkyv::from_bytes_unchecked(bytes) }.map_err(|e| e.to_string())
} }
+264
View File
@@ -0,0 +1,264 @@
//! # Server Implementation
//!
//! This module provides the server functionality for handling incoming connections.
use crate::protocol::{
FrameHeader, FrameType, TreeRequest, TreeResponse, TcpTransport, Transport,
make_response, make_handshake_ack,
};
use crate::tree::Tree;
use crate::leaves::{RemoteShell, TTY};
use std::sync::{Arc, Mutex};
/// Default listening address for the server.
///
/// # Example
/// ```
/// let addr = ush_treetest::server::DEFAULT_ADDR;
/// assert_eq!(addr, "0.0.0.0:8080");
/// ```
#[allow(dead_code)]
pub const DEFAULT_ADDR: &str = "0.0.0.0:8080";
/// Run the server with the given address.
///
/// This function starts listening on the specified address and handles incoming
/// connections in separate threads.
///
/// # Arguments
/// * `addr` - The address to listen on (e.g., "0.0.0.0:8080")
///
/// # Example
/// ```
/// run_server("0.0.0.0:8080");
/// ```
pub fn run_server(addr: &str) -> ! {
log::info!("Starting server on {}", addr);
let tree = Arc::new(Mutex::new(Tree::new()));
{
let mut tree = tree.lock().unwrap();
tree.add_endpoint("/shell", Box::new(RemoteShell::new("shell")));
tree.add_endpoint("/tty", Box::new(TTY::new("tty")));
}
let listener = TcpTransport::listen(addr).expect("failed to bind");
log::info!("Listening on {}", addr);
loop {
match TcpTransport::accept(&listener) {
Ok(transport) => {
log::info!("New connection from {:?}", transport.peer_addr());
let tree = Arc::clone(&tree);
std::thread::spawn(move || {
handle_connection(transport, tree);
});
}
Err(e) => {
log::error!("accept error: {:?}", e);
}
}
}
}
/// Handle a single connection.
///
/// This function handles the handshake and then processes frames in a loop until
/// the connection is closed.
///
/// # Arguments
/// * `transport` - The TCP transport for the connection
/// * `tree` - Shared access to the tree
pub fn handle_connection(mut transport: TcpTransport, tree: Arc<Mutex<Tree>>) {
let (header, _payload) = match transport.recv_frame() {
Ok(h) => h,
Err(e) => {
log::error!("recv error: {:?}", e);
return;
}
};
if header.frame_type != FrameType::Handshake {
log::error!("expected handshake");
return;
}
log::info!("Client connected");
let (ack_header, ack_payload) = make_handshake_ack(true, "/client");
transport.send_frame(&ack_header, Some(&ack_payload)).expect("send failed");
loop {
match transport.recv_frame() {
Ok((header, payload)) => {
let response = handle_frame(&header, &payload, &tree);
if let Some(response) = response {
let (resp_header, resp_payload) = match response {
Ok((h, p)) => (h, p),
Err(e) => {
log::error!("handle error: {:?}", e);
break;
}
};
transport.send_frame(&resp_header, Some(&resp_payload)).expect("send failed");
}
if header.frame_type == FrameType::StreamClose {
break;
}
}
Err(e) => {
log::error!("recv error: {:?}", e);
break;
}
}
}
log::info!("Connection closed");
}
/// Handle a single frame and return an optional response.
///
/// # Arguments
/// * `header` - The frame header
/// * `payload` - The frame payload bytes
/// * `tree` - Shared access to the tree
///
/// # Returns
/// * `Some(Ok((header, payload)))` for a response to send
/// * `Some(Err(e))` for an error
/// * `None` for no response (async handling)
///
/// # Example
/// ```
/// use ush_treetest::protocol::{FrameType, FrameHeader, TcpTransport};
///
/// let header = FrameHeader {
/// frame_type: FrameType::Request,
/// dst_path: Some("/shell".to_string()),
/// src_path: "/client".to_string(),
/// request_id: Some(1),
/// stream_id: None,
/// };
/// let payload = vec![];
///
/// if let Some(result) = handle_frame(&header, &payload, &tree) {
/// // Handle response
/// }
/// ```
pub fn handle_frame(
header: &FrameHeader,
payload: &[u8],
tree: &Arc<Mutex<Tree>>,
) -> Option<Result<(FrameHeader, Vec<u8>), String>> {
match header.frame_type {
FrameType::Request => {
let request: TreeRequest = match TreeRequest::from_bytes(payload) {
Ok(r) => r,
Err(e) => return Some(Err(e.to_string())),
};
let dst_path = header.dst_path.as_deref().unwrap_or("/");
let mut tree = match tree.lock() {
Ok(t) => t,
Err(e) => return Some(Err(format!("lock error: {}", e))),
};
let response = match request {
TreeRequest::ListNodes {} => {
let names = tree.list_nodes(dst_path).unwrap_or_default();
TreeResponse::NodeList { names }
}
TreeRequest::ListEndpoints {} => {
let endpoints = tree.list_endpoints(dst_path).unwrap_or_default();
TreeResponse::EndpointList { endpoints }
}
TreeRequest::ListLeaves {} => {
let leaves = tree.list_leaves();
TreeResponse::LeafList { leaves }
}
TreeRequest::GetInfo { path } => {
match tree.get_info(&path) {
Ok(info) => TreeResponse::NodeInfo { info },
Err(e) => return Some(Err(e)),
}
}
TreeRequest::Exec { ref cmd } => {
let (handler, matched_path) = match tree.find_handler(dst_path) {
Some(h) => h,
None => return Some(Err(format!("path not found: {}", dst_path))),
};
let result = {
let mut handler = match handler.lock() {
Ok(h) => h,
Err(e) => return Some(Err(format!("lock error: {}", e))),
};
handler.handle_request(&TreeRequest::Exec { cmd: cmd.clone() }, matched_path)
};
match result {
Ok(resp) => resp,
Err(e) => return Some(Err(e)),
}
}
TreeRequest::StreamOpen { path } => {
match tree.open_stream(&path, &header.src_path) {
Ok(stream_id) => TreeResponse::StreamOpened { stream_id },
Err(e) => return Some(Err(e)),
}
}
TreeRequest::Resize { .. } => {
return Some(Err("unsupported request: Resize".to_string()));
}
};
Some(Ok(make_response(
&header.src_path,
header.request_id.unwrap_or(0),
&response,
)))
}
FrameType::StreamOpen => {
let dst_path = header.dst_path.as_deref().unwrap_or("/");
let mut tree = match tree.lock() {
Ok(t) => t,
Err(e) => return Some(Err(format!("lock error: {}", e))),
};
match tree.open_stream(dst_path, &header.src_path) {
Ok(stream_id) => {
let response = TreeResponse::StreamOpened { stream_id };
Some(Ok(make_response(
&header.src_path,
header.request_id.unwrap_or(0),
&response,
)))
}
Err(e) => Some(Err(e)),
}
}
FrameType::StreamData => {
let mut tree = match tree.lock() {
Ok(t) => t,
Err(e) => return Some(Err(format!("lock error: {}", e))),
};
tree.route_stream_data(header, payload).ok();
None
}
FrameType::StreamClose => {
let mut tree = match tree.lock() {
Ok(t) => t,
Err(e) => return Some(Err(format!("lock error: {}", e))),
};
if let Some(stream_id) = header.stream_id {
tree.close_stream(stream_id).ok();
}
None
}
_ => Some(Err("unsupported frame type".to_string())),
}
}
+2 -1
View File
@@ -5,11 +5,12 @@
use crate::protocol::{TreeRequest, TreeResponse, EndpointType}; use crate::protocol::{TreeRequest, TreeResponse, EndpointType};
use std::string::String; use std::string::String;
use std::fmt;
/// Endpoint trait - implemented by all leaf handlers in the tree /// Endpoint trait - implemented by all leaf handlers in the tree
/// ///
/// This trait is object-safe and must be Send + Sync to allow sharing across threads. /// This trait is object-safe and must be Send + Sync to allow sharing across threads.
pub trait Endpoint: Send + Sync { pub trait Endpoint: Send + Sync + fmt::Debug {
/// Handle a request and return a response /// Handle a request and return a response
fn handle_request(&mut self, request: &TreeRequest, src_path: &str) -> Result<TreeResponse, String>; fn handle_request(&mut self, request: &TreeRequest, src_path: &str) -> Result<TreeResponse, String>;
+22 -3
View File
@@ -14,16 +14,26 @@ use std::vec::Vec;
use std::boxed::Box; use std::boxed::Box;
use std::result::Result; use std::result::Result;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::fmt;
/// A node in the tree - contains an optional endpoint and child nodes /// A node in the tree - contains an optional endpoint and child nodes.
pub struct Node { pub struct Node {
endpoint: Option<Arc<Mutex<Box<dyn Endpoint>>>>, endpoint: Option<Arc<Mutex<Box<dyn Endpoint + 'static>>>>,
children: BTreeMap<String, Node>, children: BTreeMap<String, Node>,
streams: BTreeMap<u16, Stream>, streams: BTreeMap<u16, Stream>,
next_stream_id: u16, next_stream_id: u16,
path: String, path: String,
} }
impl fmt::Debug for Node {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Node")
.field("path", &self.path)
.field("children", &self.children.keys().cloned().collect::<Vec<_>>())
.finish()
}
}
impl Node { impl Node {
/// Create a new node with the given path /// Create a new node with the given path
pub fn new(path: &str) -> Self { pub fn new(path: &str) -> Self {
@@ -108,11 +118,20 @@ impl Node {
} }
} }
/// Tree structure for routing - contains the root node /// Tree structure for routing - contains the root node.
#[allow(dead_code)]
pub struct Tree { pub struct Tree {
root: Node, root: Node,
} }
impl fmt::Debug for Tree {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Tree")
.field("root", &self.root.path)
.finish()
}
}
impl Tree { impl Tree {
/// Create a new empty tree /// Create a new empty tree
pub fn new() -> Self { pub fn new() -> Self {