From d7f350bd408d8d8335aea6500ab332adebafa936 Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Thu, 12 Jun 2025 05:44:54 -0600 Subject: [PATCH] binary data transfer, begin CLI, packet routing --- Cargo.toml | 1 + src/client/cli.rs | 80 ++++-- src/endpoint/endpoint.rs | 33 +++ src/endpoint/mod.rs | 3 + src/lib.rs | 8 +- src/main.rs | 229 +++++++++--------- src/mod.rs | 3 - src/packets.rs | 8 + unshell-rs-lib/Cargo.toml | 4 +- unshell-rs-lib/src/connection/packets.rs | 20 -- unshell-rs-lib/src/layers/base64.rs | 17 +- unshell-rs-lib/src/layers/builder.rs | 4 +- unshell-rs-lib/src/layers/handshake.rs | 28 +-- unshell-rs-lib/src/layers/mod.rs | 5 +- unshell-rs-lib/src/lib.rs | 5 +- unshell-rs-lib/src/networkers/tcp.rs | 41 +++- unshell-rs-lib/src/networkers/traits.rs | 15 +- .../src/{connection => nodes}/listener.rs | 4 +- .../src/{connection => nodes}/mod.rs | 0 .../src/{connection => nodes}/node.rs | 158 +++++++++--- unshell-rs-lib/src/nodes/packets.rs | 51 ++++ 21 files changed, 457 insertions(+), 260 deletions(-) create mode 100644 src/endpoint/endpoint.rs create mode 100644 src/endpoint/mod.rs delete mode 100644 src/mod.rs create mode 100644 src/packets.rs delete mode 100644 unshell-rs-lib/src/connection/packets.rs rename unshell-rs-lib/src/{connection => nodes}/listener.rs (64%) rename unshell-rs-lib/src/{connection => nodes}/mod.rs (100%) rename unshell-rs-lib/src/{connection => nodes}/node.rs (68%) create mode 100644 unshell-rs-lib/src/nodes/packets.rs diff --git a/Cargo.toml b/Cargo.toml index 7296f01..323e3dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] +bincode = "2.0.1" clap = { version = "4.5.39", features = ["derive"] } crossbeam-channel = "0.5.15" lazy_static = "1.5.0" diff --git a/src/client/cli.rs b/src/client/cli.rs index 3b17a06..3293dfd 100644 --- a/src/client/cli.rs +++ b/src/client/cli.rs @@ -1,21 +1,29 @@ +use std::{io::Write, net::SocketAddr}; + use unshell_rs_lib::{ Error, - connection::{ConnectionConfig, Node}, + nodes::{ConnectionConfig, Node}, }; + +use crate::C2Packet; + pub struct Cli; impl Cli { - pub fn connect( - id: String, - clients: Vec, - listeners: Vec, - ) -> Result<(), Error> { + pub fn connect(socket: SocketAddr) -> Result<(), Error> { // let mut client = build_client(TCPClient::connect(&addr)?, vec![])?; - // let stdin = std::io::stdin(); - // let mut stdout = std::io::stdout(); + let stdin = std::io::stdin(); + let mut stdout = std::io::stdout(); - Node::run_node(id, clients, listeners) + let node = Node::::run_node( + "Client".to_string(), + vec![ConnectionConfig { + socket, + layers: vec![], + }], + vec![], + )?; // let mut client_clone = client.try_clone()?; // thread::spawn(move || { @@ -48,24 +56,46 @@ impl Cli { // } // }); - // loop { - // print!("> "); - // stdout.flush()?; + let selected_node: Option = None; - // let mut input = String::new(); - // stdin.read_line(&mut input)?; - // let input = input.trim(); + loop { + print!("> "); + stdout.flush()?; - // match input.split(" ").nth(0).unwrap() { - // "ping" => { - // // client.write(Packets::GetConnections.encode()?.as_str())?; - // } - // _ => { - // warn!("Invalid command!") - // } - // } + let mut input = String::new(); + stdin.read_line(&mut input)?; + let input = input.trim(); - // // client.write(input)?; - // } + let mut node_state = node.state.lock().unwrap(); + + let mut split = input.split(" "); + + match split.next().unwrap() { + "nodes" => { + for (i, node) in node_state.get_all_nodes().iter().enumerate() { + println!("{} -> {}", i, node); + } + } + "ping" => { + // if split.count().clone() <= 1 { + // warn!("You must specify an option"); + // continue; + // } + + if let Ok(i) = str::parse::(split.next().unwrap()) { + let nodes = node_state.get_all_nodes(); + let node = nodes.get(i).unwrap().clone(); + node_state.send_unrouted(node, &C2Packet::Aa).unwrap(); + } else { + println!(""); + } + } + _ => { + warn!("Invalid command!") + } + } + + // client.write(input)?; + } } } diff --git a/src/endpoint/endpoint.rs b/src/endpoint/endpoint.rs new file mode 100644 index 0000000..94a6ac6 --- /dev/null +++ b/src/endpoint/endpoint.rs @@ -0,0 +1,33 @@ +use std::net::SocketAddr; + +use unshell_rs_lib::{ + Error, + nodes::{ConnectionConfig, Node}, +}; + +use crate::C2Packet; + +pub fn run_endpoint(socket: SocketAddr) -> Result<(), Error> { + let node = Node::::run_node( + "Server".to_string(), + vec![], + vec![ConnectionConfig { + socket, + layers: vec![], + }], + )?; + + loop { + match node.rx.recv()? { + C2Packet::Aa => { + info!("1"); + } + C2Packet::Bb => { + info!("2"); + } + C2Packet::Cc => { + info!("3"); + } + } + } +} diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs new file mode 100644 index 0000000..32f1d84 --- /dev/null +++ b/src/endpoint/mod.rs @@ -0,0 +1,3 @@ +mod endpoint; + +pub use endpoint::run_endpoint; diff --git a/src/lib.rs b/src/lib.rs index 8576980..2f080b2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,15 @@ -// #[macro_use] +#[macro_use] extern crate log; mod client; -// mod server; +mod endpoint; +mod packets; pub use client::Cli; +pub use endpoint::run_endpoint; +pub use packets::C2Packet; + // pub use client::UnshellClient; // pub use client::UnshellGui; // pub use server::UnshellServer; diff --git a/src/main.rs b/src/main.rs index 8d27ead..fd24f83 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,8 +7,8 @@ use std::{ use clap::{Parser, Subcommand}; use log::error; -use unshell_rs::Cli; -use unshell_rs_lib::connection::ConnectionConfig; +use unshell_rs::{Cli, run_endpoint}; +use unshell_rs_lib::nodes::ConnectionConfig; pub static DEFAULT_CONFIG_FILEPATH: &'static str = "server_config.json"; @@ -30,42 +30,32 @@ struct Args { #[derive(Debug, Subcommand)] enum Commands { - // Start, - // Middle, - // End, - // - Test1, - Test2, - Test3, - Test4, - Test5, - Test6, // Run as a service, and potentially hosting a website - // #[command(arg_required_else_help = true)] - // Relay { - // /// IPv4 to listen for clients on. - // host: String, + Relay { + /// IPv4 to listen for clients on. + #[arg(short, long, default_value_t = ("0.0.0.0".to_string()))] + host: String, - // /// Port listen to for command clients - // #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)] - // port: u16, + /// Port listen to for command clients + #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)] + port: u16, - // /// Json file to store config - // #[arg(short, long, default_value_t = DEFAULT_CONFIG_FILEPATH.to_string())] - // config_filepath: String, - // // /// Port to listen for website traffic (0 is disabled) - // // #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)] - // // web_port: u16, - // }, - // /// Connect to remote server - // Connect { - // /// Remote server to connect to - // host: String, + /// Json file to store config + #[arg(short, long, default_value_t = DEFAULT_CONFIG_FILEPATH.to_string())] + config_filepath: String, + // /// Port to listen for website traffic (0 is disabled) + // #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)] + // web_port: u16, + }, + /// Connect to remote server + Connect { + /// Remote server to connect on + host: String, - // /// Port listen to for command clients - // #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)] - // port: u16, - // }, + #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)] + /// Port listen to for command clients + port: u16, + }, } fn main() -> Result<(), Box> { @@ -83,87 +73,96 @@ fn main() -> Result<(), Box> { // error!("{}", e); // } // } - Commands::Test1 {} => Cli::connect( - "Test1".to_string(), - vec![], - vec![ConnectionConfig { - socket: SocketAddr::from_str("127.0.0.1:13371")?, - layers: vec![], - }], - ), - Commands::Test2 {} => Cli::connect( - "Test2".to_string(), - vec![ConnectionConfig { - socket: SocketAddr::from_str("127.0.0.1:13371")?, - layers: vec![], - }], - vec![ConnectionConfig { - socket: SocketAddr::from_str("127.0.0.1:13372")?, - layers: vec![], - }], - ), - Commands::Test3 {} => Cli::connect( - "Test3".to_string(), - vec![ConnectionConfig { - socket: SocketAddr::from_str("127.0.0.1:13372")?, - layers: vec![], - }], - vec![ConnectionConfig { - socket: SocketAddr::from_str("127.0.0.1:13373")?, - layers: vec![], - }], - ), - Commands::Test4 {} => Cli::connect( - "Test4".to_string(), - vec![ConnectionConfig { - socket: SocketAddr::from_str("127.0.0.1:13371")?, - layers: vec![], - }], - vec![ConnectionConfig { - socket: SocketAddr::from_str("127.0.0.1:13374")?, - layers: vec![], - }], - ), - Commands::Test5 {} => Cli::connect( - "Test5".to_string(), - vec![ - ConnectionConfig { - socket: SocketAddr::from_str("127.0.0.1:13372")?, - layers: vec![], - }, - ConnectionConfig { - socket: SocketAddr::from_str("127.0.0.1:13374")?, - layers: vec![], - }, - ], - vec![ConnectionConfig { - socket: SocketAddr::from_str("127.0.0.1:13375")?, - layers: vec![], - }], - ), - Commands::Test6 {} => Cli::connect( - "Test6".to_string(), - vec![ - ConnectionConfig { - socket: SocketAddr::from_str("127.0.0.1:13373")?, - layers: vec![], - }, - ConnectionConfig { - socket: SocketAddr::from_str("127.0.0.1:13375")?, - layers: vec![], - }, - ], - vec![], - ), - // Commands::Connect { host, port } => { - // let addr = SocketAddr::from_str(format!("{}:{}", host, port).as_str()); - // Cli::connect(if let Ok(addr) = addr { - // addr - // } else { - // error!("Could not parse address!"); - // return Ok(()); - // }) - // } + // Commands::Test1 {} => Cli::connect( + // "Test1".to_string(), + // vec![], + // vec![ConnectionConfig { + // socket: SocketAddr::from_str("127.0.0.1:13371")?, + // layers: vec![], + // }], + // ), + // Commands::Test2 {} => Cli::connect( + // "Test2".to_string(), + // vec![ConnectionConfig { + // socket: SocketAddr::from_str("127.0.0.1:13371")?, + // layers: vec![], + // }], + // vec![ConnectionConfig { + // socket: SocketAddr::from_str("127.0.0.1:13372")?, + // layers: vec![], + // }], + // ), + // Commands::Test3 {} => Cli::connect( + // "Test3".to_string(), + // vec![ConnectionConfig { + // socket: SocketAddr::from_str("127.0.0.1:13372")?, + // layers: vec![], + // }], + // vec![], + // ), // Commands::Test4 {} => Cli::connect( + // "Test4".to_string(), + // vec![ConnectionConfig { + // socket: SocketAddr::from_str("127.0.0.1:13371")?, + // layers: vec![], + // }], + // vec![ConnectionConfig { + // socket: SocketAddr::from_str("127.0.0.1:13374")?, + // layers: vec![], + // }], + // ), + // Commands::Test5 {} => Cli::connect( + // "Test5".to_string(), + // vec![ + // ConnectionConfig { + // socket: SocketAddr::from_str("127.0.0.1:13372")?, + // layers: vec![], + // }, + // ConnectionConfig { + // socket: SocketAddr::from_str("127.0.0.1:13374")?, + // layers: vec![], + // }, + // ], + // vec![ConnectionConfig { + // socket: SocketAddr::from_str("127.0.0.1:13375")?, + // layers: vec![], + // }], + // ), + // Commands::Test6 {} => Cli::connect( + // "Test6".to_string(), + // vec![ + // ConnectionConfig { + // socket: SocketAddr::from_str("127.0.0.1:13373")?, + // layers: vec![], + // }, + // ConnectionConfig { + // socket: SocketAddr::from_str("127.0.0.1:13375")?, + // layers: vec![], + // }, + // ], + // vec![], + // ), + Commands::Connect { host, port } => { + let addr = SocketAddr::from_str(format!("{}:{}", host, port).as_str()); + Cli::connect(if let Ok(addr) = addr { + addr + } else { + error!("Could not parse address!"); + return Ok(()); + }) + } + Commands::Relay { + host, + port, + config_filepath, + } => { + let addr = SocketAddr::from_str(format!("{}:{}", host, port).as_str()); + run_endpoint(if let Ok(addr) = addr { + addr + } else { + error!("Could not parse address!"); + return Ok(()); + }) + } } { error!("{}", e); }; diff --git a/src/mod.rs b/src/mod.rs deleted file mode 100644 index 2c4c24b..0000000 --- a/src/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod cli; - -pub use cli::Cli; diff --git a/src/packets.rs b/src/packets.rs new file mode 100644 index 0000000..05d8635 --- /dev/null +++ b/src/packets.rs @@ -0,0 +1,8 @@ +use bincode::{Decode, Encode}; + +#[derive(Debug, Encode, Decode, Clone)] +pub enum C2Packet { + Aa, + Bb, + Cc, +} diff --git a/unshell-rs-lib/Cargo.toml b/unshell-rs-lib/Cargo.toml index 3d57035..ad8268f 100644 --- a/unshell-rs-lib/Cargo.toml +++ b/unshell-rs-lib/Cargo.toml @@ -4,8 +4,8 @@ edition = "2024" [dependencies] base64 = "0.22.1" +bincode = "2.0.1" crossbeam-channel = "0.5.15" log = "0.4.27" -serde = { version = "1.0.219", features = ["derive"] } -serde_json = "1.0.140" +rand = "0.9.1" uuid = { version = "1.17.0", features = ["v4"] } diff --git a/unshell-rs-lib/src/connection/packets.rs b/unshell-rs-lib/src/connection/packets.rs deleted file mode 100644 index 22a820f..0000000 --- a/unshell-rs-lib/src/connection/packets.rs +++ /dev/null @@ -1,20 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use crate::Error; - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub enum Packets { - SyncUUID(String), - Update { routes: Vec }, - Disconnect { routes: Vec }, - Data { source: String, data: String }, -} - -impl Packets { - pub fn encode(&self) -> Result { - Ok(serde_json::to_string(self)?) - } - pub fn decode(string: &str) -> Result { - Ok(serde_json::from_str::(string)?) - } -} diff --git a/unshell-rs-lib/src/layers/base64.rs b/unshell-rs-lib/src/layers/base64.rs index 5cdd8ac..ee1231c 100644 --- a/unshell-rs-lib/src/layers/base64.rs +++ b/unshell-rs-lib/src/layers/base64.rs @@ -17,18 +17,15 @@ impl Connection for Base64Layer { self.inner.is_alive() } - fn read(&mut self) -> Result { - Ok(str::from_utf8( - &general_purpose::STANDARD - .decode(&self.inner.read()?) - .unwrap(), - ) - .unwrap() - .to_string()) + fn read(&mut self) -> Result, Error> { + Ok(general_purpose::STANDARD + .decode(&self.inner.read()?) + .unwrap()) } - fn write(&mut self, data: &str) -> Result<(), Error> { - self.inner.write(&general_purpose::STANDARD.encode(data)) + fn write(&mut self, data: &[u8]) -> Result<(), Error> { + self.inner + .write(general_purpose::STANDARD.encode(data).as_bytes()) } fn try_clone(&self) -> Result, Error> { diff --git a/unshell-rs-lib/src/layers/builder.rs b/unshell-rs-lib/src/layers/builder.rs index 7b5d88f..d377c74 100644 --- a/unshell-rs-lib/src/layers/builder.rs +++ b/unshell-rs-lib/src/layers/builder.rs @@ -13,11 +13,11 @@ impl Connection for Box { (**self).is_alive() } - fn read(&mut self) -> Result { + fn read(&mut self) -> Result, Error> { (**self).read() } - fn write(&mut self, data: &str) -> Result<(), Error> { + fn write(&mut self, data: &[u8]) -> Result<(), Error> { (**self).write(data) } diff --git a/unshell-rs-lib/src/layers/handshake.rs b/unshell-rs-lib/src/layers/handshake.rs index bad2450..9b0b112 100644 --- a/unshell-rs-lib/src/layers/handshake.rs +++ b/unshell-rs-lib/src/layers/handshake.rs @@ -22,14 +22,14 @@ impl Connection for HandshakeLayer { self.inner.is_alive() } - fn read(&mut self) -> Result { + fn read(&mut self) -> Result, Error> { if !self.finished_handshake.load(Ordering::Relaxed) { return Err("NotComplete".into()); } self.inner.read() } - fn write(&mut self, data: &str) -> Result<(), Error> { + fn write(&mut self, data: &[u8]) -> Result<(), Error> { if !self.finished_handshake.load(Ordering::Relaxed) { return Err("NotComplete".into()); } @@ -54,21 +54,21 @@ impl ProtocolLayer for HandshakeLayer { fn initialize_client(&mut self) -> Result<(), Error> { // Step 1: Client sends SYN - self.inner.write("SYN")?; + self.inner.write("SYN".as_bytes())?; // Step 2: Client receives SYN-ACK let response = self.inner.read()?; - if response != "SYN-ACK" { - return Err(format!("Expected SYN-ACK, got: {}", response).into()); + if response != "SYN-ACK".as_bytes() { + return Err(format!("Expected SYN-ACK, got: {:?}", response).into()); } // Step 3: Client sends ACK - self.inner.write("ACK")?; + self.inner.write("ACK".as_bytes())?; // Step 4: Client receives FIN (final confirmation) let response = self.inner.read()?; - if response != "FIN" { - return Err(format!("Expected FIN, got: {}", response).into()); + if response != "FIN".as_bytes() { + return Err(format!("Expected FIN, got: {:?}", response).into()); } info!("Handshake complete!"); @@ -80,20 +80,20 @@ impl ProtocolLayer for HandshakeLayer { fn initialize_server(&mut self) -> Result<(), Error> { // Step 1: Server receives SYN let request = self.inner.read()?; - if request != "SYN" { - return Err(format!("Expected SYN, got: {}", request).into()); + if request != "SYN".as_bytes() { + return Err(format!("Expected SYN, got: {:?}", request).into()); } // Step 2: Server sends SYN-ACK - self.inner.write("SYN-ACK")?; + self.inner.write("SYN-ACK".as_bytes())?; // Step 3: Server receives ACK let response = self.inner.read()?; - if response != "ACK" { - return Err(format!("Expected ACK, got: {}", response).into()); + if response != "ACK".as_bytes() { + return Err(format!("Expected ACK, got: {:?}", response).into()); } // Step 4: Server sends FIN (final confirmation) - self.inner.write("FIN")?; + self.inner.write("FIN".as_bytes())?; info!("Handshake complete!"); self.finished_handshake.swap(true, Ordering::Relaxed); diff --git a/unshell-rs-lib/src/layers/mod.rs b/unshell-rs-lib/src/layers/mod.rs index e4d3656..5f095c6 100644 --- a/unshell-rs-lib/src/layers/mod.rs +++ b/unshell-rs-lib/src/layers/mod.rs @@ -1,7 +1,6 @@ -use serde::Deserialize; -use serde::Serialize; +use bincode::{Decode, Encode}; -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Encode, Decode, Debug, Clone)] pub enum LayerConfig { Base64, Handshake, diff --git a/unshell-rs-lib/src/lib.rs b/unshell-rs-lib/src/lib.rs index d902651..89b232e 100644 --- a/unshell-rs-lib/src/lib.rs +++ b/unshell-rs-lib/src/lib.rs @@ -3,7 +3,8 @@ extern crate log; pub type Error = Box; -// pub mod config; -pub mod connection; +static BINCODE_CONFIG: bincode::config::Configuration = bincode::config::standard(); + pub mod layers; pub mod networkers; +pub mod nodes; diff --git a/unshell-rs-lib/src/networkers/tcp.rs b/unshell-rs-lib/src/networkers/tcp.rs index 900e3a5..a942731 100644 --- a/unshell-rs-lib/src/networkers/tcp.rs +++ b/unshell-rs-lib/src/networkers/tcp.rs @@ -1,5 +1,5 @@ use std::{ - io::{BufRead, BufReader, Write}, + io::{BufReader, Read, Write}, net::{SocketAddr, TcpListener, TcpStream}, sync::{ Arc, @@ -34,23 +34,42 @@ impl Connection for TCPConnection { self.is_alive.load(Ordering::Relaxed) } - fn read(&mut self) -> Result { - let mut line = String::new(); - let n = self.reader.read_line(&mut line)?; + fn read(&mut self) -> Result, Error> { + let mut len_bytes = [0u8; 4]; - // Stream sends a null buffer if it is disconnected - if n == 0 { + if let Err(e) = self.reader.read_exact(&mut len_bytes) { self.is_alive.swap(false, Ordering::Relaxed); + return Err(format!("Stream disconnected! ({})", e).into()); } - // println!("Recieved: {}", line.trim_end().to_string()); + let len = u32::from_be_bytes(len_bytes) as usize; - Ok(line.trim_end().to_string()) + let mut buffer = vec![0u8; len]; + + // In case the + match self.reader.read_exact(&mut buffer) { + Ok(()) => Ok(buffer.to_vec()), + Err(e) => { + self.is_alive.swap(false, Ordering::Relaxed); + Err(format!("Stream disconnected! ({})", e).into()) + } + } + + // let mut buf = Vec::new(); + // let n = self.reader.read(&mut buf)?; + + // Stream sends a null buffer if it is disconnected + // if n == 0 { + // self.is_alive.swap(false, Ordering::Relaxed); + // } + + // println!("Recieved: {}", line.trim_end().to_string()); } - fn write(&mut self, data: &str) -> Result<(), Error> { - // println!("Recsent: {}", data); - writeln!(self.stream, "{}", data)?; + fn write(&mut self, data: &[u8]) -> Result<(), Error> { + let len = data.len() as u32; + self.stream.write_all(&len.to_be_bytes())?; + self.stream.write_all(data)?; self.stream.flush()?; Ok(()) } diff --git a/unshell-rs-lib/src/networkers/traits.rs b/unshell-rs-lib/src/networkers/traits.rs index 93d1d2a..5a1c0bd 100644 --- a/unshell-rs-lib/src/networkers/traits.rs +++ b/unshell-rs-lib/src/networkers/traits.rs @@ -7,8 +7,8 @@ pub trait Connection: Send + Sync { fn get_info(&self) -> String; fn is_alive(&self) -> bool; - fn read(&mut self) -> Result; - fn write(&mut self, data: &str) -> Result<(), Error>; + fn read(&mut self) -> Result, Error>; + fn write(&mut self, data: &[u8]) -> Result<(), Error>; fn try_clone(&self) -> Result, Error>; } @@ -26,17 +26,6 @@ pub trait ProtocolLayer: Connection { } } -// impl Sized for dyn Connection {} - -// pub trait AsyncConnection -// where -// C: Connection, -// { -// fn as_async( -// connection: C, -// ) -> (Sender, Receiver); -// } - pub trait ServerTrait { fn get_info(&self) -> String; fn accept(&self) -> Result; diff --git a/unshell-rs-lib/src/connection/listener.rs b/unshell-rs-lib/src/nodes/listener.rs similarity index 64% rename from unshell-rs-lib/src/connection/listener.rs rename to unshell-rs-lib/src/nodes/listener.rs index 2923e01..885949f 100644 --- a/unshell-rs-lib/src/connection/listener.rs +++ b/unshell-rs-lib/src/nodes/listener.rs @@ -1,10 +1,10 @@ use std::net::SocketAddr; -use serde::{Deserialize, Serialize}; +use bincode::{Decode, Encode}; use crate::layers::LayerConfig; -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Encode, Decode, Debug, Clone)] pub struct ConnectionConfig { pub socket: SocketAddr, pub layers: Vec, diff --git a/unshell-rs-lib/src/connection/mod.rs b/unshell-rs-lib/src/nodes/mod.rs similarity index 100% rename from unshell-rs-lib/src/connection/mod.rs rename to unshell-rs-lib/src/nodes/mod.rs diff --git a/unshell-rs-lib/src/connection/node.rs b/unshell-rs-lib/src/nodes/node.rs similarity index 68% rename from unshell-rs-lib/src/connection/node.rs rename to unshell-rs-lib/src/nodes/node.rs index 88f2914..859878f 100644 --- a/unshell-rs-lib/src/connection/node.rs +++ b/unshell-rs-lib/src/nodes/node.rs @@ -1,46 +1,72 @@ use std::{ collections::HashMap, + fmt::Debug, sync::{Arc, Mutex}, thread, time::Duration, }; +use bincode::{Decode, Encode}; +use crossbeam_channel::{Receiver, Sender}; +use rand::{seq::IndexedRandom, thread_rng}; + use crate::{ Error, - connection::{listener::ConnectionConfig, packets::Packets}, layers::build_client, networkers::{ClientTrait, Connection, ServerTrait, TCPClient, TCPServer, run_listener_state}, + nodes::{ + listener::ConnectionConfig, + packets::{Packets, decode_vec, encode_vec}, + }, }; -pub struct Node { +pub struct NodeState

+where + P: Encode + Decode<()> + Debug + Clone + 'static, +{ id: String, connections: HashMap>, map: HashMap>, + packet_listener: Sender

, } fn read(c: &mut Box) -> Result { - let a = Packets::decode(c.read()?.as_str()); - info!("Data: {:?}", a); - a + Packets::decode(c.read()?.as_slice()) } fn write(c: &mut Box, packet: Packets) -> Result<(), Error> { - info!("Wrote: {:?}", packet); - c.write(packet.encode()?.as_str()) + c.write(&(packet.encode()?)) } -impl Node { +pub struct Node

+where + P: Encode + Decode<()> + Debug + Clone + 'static, +{ + pub state: Arc>>, + pub rx: Receiver

, +} + +impl

Node

+where + P: Encode + Decode<()> + Debug + Clone + Send + 'static, +{ pub fn run_node( id: String, clients: Vec, listeners: Vec, - ) -> Result<(), Error> { + ) -> Result + where + P: Encode + Decode<()> + Debug + Clone + 'static, + { // let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?; - let state = Arc::new(Mutex::new(Self { + let (tx, rx) = crossbeam_channel::unbounded(); + + let state = Arc::new(Mutex::new(NodeState::

{ id: id, //Uuid::new_v4().to_string(), //TODO: Calling an OS RNG can pose a problem for security; connections: HashMap::new(), map: HashMap::new(), + packet_listener: tx, })); for listener in listeners { @@ -57,7 +83,7 @@ impl Node { thread::spawn(move || { loop { if let Err(e) = Self::run_client(client.clone(), &state) { - error!("{}", e); + error!("Could not connect to server; {:?}", e); } thread::sleep(Duration::from_millis(1000)); @@ -65,12 +91,10 @@ impl Node { }); } - thread::sleep(Duration::MAX); - - Ok(()) + Ok(Self { state, rx }) } - fn run_client(client: ConnectionConfig, state: &Arc>) -> Result<(), Error> { + fn run_client(client: ConnectionConfig, state: &Arc>>) -> Result<(), Error> { Self::run_connection( build_client(TCPClient::connect(&client.socket)?, client.layers)?, state, @@ -81,18 +105,18 @@ impl Node { fn on_listener_client( connection: Box, - state: Arc>, + state: Arc>>, ) { thread::spawn(move || { if let Err(e) = Self::run_connection(connection, &state) { - error!("{}", e); + error!("Could not connect; {}", e); } }); } fn run_connection( connection: Box, - state: &Arc>, + state: &Arc>>, ) -> Result<(), Error> { let mut connection = connection; let s = state.lock().unwrap(); @@ -110,7 +134,7 @@ impl Node { return Err("Could not get UUID!".into()); }; - info!("Connection from {} to {}", this_uuid, other_uuid); + info!("New Node! {} (direct)", other_uuid); // Add connection (&mut state.lock().unwrap()) @@ -130,6 +154,11 @@ impl Node { ), Packets::Update { routes } => Ok((&mut state.lock().unwrap()) .extend_routes(other_uuid.clone(), routes)), + Packets::DataUnrouted { + src: source, + dest, + data, + } => (&mut state.lock().unwrap()).route_packet(source, dest, data), _ => { error!("Unsupported packet: {:?}", packet); @@ -138,7 +167,7 @@ impl Node { }; if let Err(e) = result { - error!("Got error: {}", e); + error!("Could not parse; {}", e); } } Err(e) => { @@ -151,19 +180,26 @@ impl Node { break; } - error!("Got error: {}", e); + error!("Could not read; {}", e); } } } Ok(()) } +} - fn get_known_clients(&self) -> Vec { +impl

NodeState

+where + P: Encode + Decode<()> + Debug + Clone + Send + 'static, +{ + // Get list of all nodes in map + fn get_known_nodes(&self) -> Vec { self.map.keys().map(|k| k.clone()).collect::>() } - fn get_direct_connections(&self) -> Vec { + // Get list of node UUIDs that are directly connected to this node + fn get_direct_nodes(&self) -> Vec { self.connections .keys() .map(|k| k.clone()) @@ -171,13 +207,15 @@ impl Node { } fn knows_client(&self, id: &String) -> bool { - self.get_known_clients().contains(id) + self.get_known_nodes().contains(id) } + // Remove all nodes where the routes are empty fn remove_null_nodes(&mut self) { self.map.retain(|_, routes| !routes.is_empty()); } + // Send packet to all directly connected nodes, except maybe one fn broadcast(&mut self, data: Packets, disclude: Option<&String>) { for (uuid, connection) in self.connections.iter_mut() { if disclude.is_some() && disclude.unwrap() == uuid { @@ -189,9 +227,11 @@ impl Node { } } + // Get list of nodes to send to another as known routes fn get_routes_to(&self, recv_uuid: &String) -> Vec { let mut tx_routes: Vec = Vec::new(); + // Append for (map_uuid, routes) in self.map.iter() { // Do not transmit a route, which bounces directly back to the sender if routes.len() == 1 && &routes[0] == recv_uuid { @@ -201,7 +241,8 @@ impl Node { tx_routes.push(map_uuid.clone()); } - tx_routes.append(&mut self.get_direct_connections()); + // Append directly connected nodes + tx_routes.append(&mut self.get_direct_nodes()); tx_routes } @@ -237,7 +278,7 @@ impl Node { for remove_uuid in routes { // Sanity check, in case the current client is still connected - if self.get_direct_connections().contains(&remove_uuid) { + if self.get_direct_nodes().contains(&remove_uuid) { resend_table = true; continue; } @@ -248,6 +289,12 @@ impl Node { self.map.remove(&remove_uuid); remove_uuids.push(remove_uuid.clone()); + info!( + "Node disconnected! {} ({})", + remove_uuid, + if direct { "direct" } else { "indirect" } + ); + for (uuid, route) in self.map.iter_mut() { if route.contains(&remove_uuid) { let index = route.iter().position(|r| r == &remove_uuid).unwrap(); @@ -258,8 +305,6 @@ impl Node { self.remove_null_nodes(); } - - // for uuid in remove_uuids { } if !remove_uuids.is_empty() { @@ -275,16 +320,14 @@ impl Node { self.broadcast_table(None); } - // } - - self.print_map(); + // self.print_map(); } fn extend_routes(&mut self, src: String, routes: Vec) { let mut updated = false; // Quick sanity check - if !self.get_direct_connections().contains(&src) { + if !self.get_direct_nodes().contains(&src) { return; } @@ -296,7 +339,7 @@ impl Node { } // If the connection is already established directly, disregard - if self.get_direct_connections().contains(&route) { + if self.get_direct_nodes().contains(&route) { continue; } @@ -306,6 +349,7 @@ impl Node { if !self.map.get(&route).unwrap().contains(&src) { // If the neighbor can be acessed directly, disregard self.map.get_mut(&route).unwrap().push(src.clone()); + info!("Node update: {} (indirect)", src); updated = true; } else { // Else, do nothing @@ -314,12 +358,13 @@ impl Node { } else { // Else, create the new route entry self.map.insert(route.clone(), vec![src.clone()]); + info!("Node update: {} (indirect)", src); updated = true; } } // Solves the case that if a remote node has said that a neighbor has connected before itself has - let direct_connections = self.get_direct_connections(); + let direct_connections = self.get_direct_nodes(); for connection in direct_connections { if self.map.contains_key(&connection) { self.map.remove(&connection); @@ -331,9 +376,50 @@ impl Node { if updated { self.broadcast_table(Some(&src)); } - self.print_map(); + // self.print_map(); } + fn route_packet(&mut self, src: String, dest: String, data: Vec) -> Result<(), Error> { + if dest == self.id { + self.packet_listener.send(decode_vec::

(&data)?)?; + } else { + if self.connections.contains_key(&dest) { + write( + self.connections.get_mut(&dest).unwrap(), + Packets::DataUnrouted { src, dest, data }, + )?; + } else if self.map.contains_key(&dest) { + let next_uuid = self + .map + .get(&dest) + .unwrap() + .choose(&mut thread_rng()) + .unwrap() + .clone(); + + write( + self.connections.get_mut(&next_uuid).unwrap(), + Packets::DataUnrouted { src, dest, data }, + )?; + } else { + error!("Could not find route from {} to {}!", src, dest); + } + } + + Ok(()) + } + + pub fn send_unrouted(&mut self, dest: String, data: &P) -> Result<(), Error> { + self.route_packet(self.id.clone(), dest, encode_vec(data)?) + } + + pub fn get_all_nodes(&self) -> Vec { + let mut uuids = self.get_known_nodes(); + uuids.append(&mut self.get_direct_nodes()); + uuids + } + + #[allow(dead_code)] fn print_map(&self) { info!("\n\n"); info!("Local addr: {}", self.id); @@ -341,6 +427,6 @@ impl Node { for (uuid, route) in self.map.iter() { info!("{} -> [ {:?} ]", uuid, route); } - info!("Direct: {:?}", self.get_direct_connections()); + info!("Direct: {:?}", self.get_direct_nodes()); } } diff --git a/unshell-rs-lib/src/nodes/packets.rs b/unshell-rs-lib/src/nodes/packets.rs new file mode 100644 index 0000000..9bf7e62 --- /dev/null +++ b/unshell-rs-lib/src/nodes/packets.rs @@ -0,0 +1,51 @@ +use std::fmt::Debug; + +use bincode::{Decode, Encode, config::Configuration}; + +use crate::Error; + +#[derive(Debug, Encode, Decode, Clone)] +pub enum Packets { + SyncUUID(String), + Update { + routes: Vec, + }, + Disconnect { + routes: Vec, + }, + DataUnrouted { + src: String, + dest: String, + data: Vec, + }, + DataRouted { + path: Vec, + data: Vec, + }, +} + +impl Packets { + pub fn encode(&self) -> Result, Error> { + encode_vec(self) + } + pub fn decode(data: &[u8]) -> Result { + decode_vec(data) + } +} + +pub fn encode_vec

(object: &P) -> Result, Error> +where + P: Encode + Decode<()> + Debug + Clone + 'static, +{ + Ok(bincode::encode_to_vec(object, crate::BINCODE_CONFIG)?) +} + +pub fn decode_vec

(data: &[u8]) -> Result +where + P: Encode + Decode<()> + Debug + Clone + 'static, +{ + let (decoded, _) = + bincode::decode_from_slice::(&data[..], crate::BINCODE_CONFIG)?; + + Ok(decoded) +}