diff --git a/src/client/cli.rs b/src/client/cli.rs index 71f57f5..3b17a06 100644 --- a/src/client/cli.rs +++ b/src/client/cli.rs @@ -1,72 +1,71 @@ -use std::{io::Write, net::SocketAddr, thread}; - use unshell_rs_lib::{ Error, - connection::{PacketError, Packets}, - layers::build_client, - networkers::{ClientTrait, Connection, TCPClient}, + connection::{ConnectionConfig, Node}, }; pub struct Cli; impl Cli { - pub fn connect(addr: SocketAddr) -> Result<(), Error> { - let mut client = build_client(TCPClient::connect(&addr)?, vec![])?; + pub fn connect( + id: String, + clients: Vec, + listeners: Vec, + ) -> Result<(), Error> { + // let mut client = build_client(TCPClient::connect(&addr)?, vec![])?; - let stdin = std::io::stdin(); - let mut stdout = std::io::stdout(); + // let stdin = std::io::stdin(); + // let mut stdout = std::io::stdout(); - let mut client_clone = client.try_clone()?; - thread::spawn(move || { - // let data = client.read()?; + Node::run_node(id, clients, listeners) - let packet = Packets::decode(client_clone.read().unwrap().as_str()).unwrap(); + // let mut client_clone = client.try_clone()?; + // thread::spawn(move || { + // // let data = client.read()?; - match packet { - Packets::UpdateConnections(items) => { - for item in items { - println!("{}", item); - } - } - Packets::UpdateRoutes(items) => { - for item in items { - println!("{}", item); - } - } - _ => { - client_clone - .write( - Packets::Error(PacketError::UnsupportedType) - .encode() - .unwrap() - .as_str(), - ) - .unwrap(); - warn!("Invalid packet: {:?}", packet) - } - } - }); + // let packet = Packets::decode(client_clone.read().unwrap().as_str()).unwrap(); - loop { - print!("> "); - stdout.flush()?; + // match packet { + // Packets::UpdateConnections(items) => { + // for item in items { + // println!("{}", item); + // } + // } + // Packets::UpdateRoutes(items) => { + // for item in items { + // println!("{}", item); + // } + // } + // _ => { + // client_clone + // .write( + // Packets::Error(PacketError::UnsupportedType) + // .encode() + // .unwrap() + // .as_str(), + // ) + // .unwrap(); + // warn!("Invalid packet: {:?}", packet) + // } + // } + // }); - let mut input = String::new(); - stdin.read_line(&mut input)?; - let input = input.trim(); + // loop { + // print!("> "); + // stdout.flush()?; - match input.split(" ").nth(0).unwrap() { - "clients" => { - client.write(Packets::GetConnections.encode()?.as_str())?; - } - "routes" => { - client.write(Packets::GetRoutes.encode()?.as_str())?; - } - _ => { - warn!("Invalid command!") - } - } + // let mut input = String::new(); + // stdin.read_line(&mut input)?; + // let input = input.trim(); - // client.write(input)?; - } + // match input.split(" ").nth(0).unwrap() { + // "ping" => { + // // client.write(Packets::GetConnections.encode()?.as_str())?; + // } + // _ => { + // warn!("Invalid command!") + // } + // } + + // // client.write(input)?; + // } } } diff --git a/src/client/client_node.rs b/src/client/client_node.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/client/mod.rs b/src/client/mod.rs index cd57d57..24ae707 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,2 +1,4 @@ mod cli; +mod client_node; + pub use cli::Cli; diff --git a/src/lib.rs b/src/lib.rs index 76788b7..8576980 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#[macro_use] +// #[macro_use] extern crate log; mod client; diff --git a/src/main.rs b/src/main.rs index afc6f0f..8d27ead 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,10 +8,7 @@ use std::{ use clap::{Parser, Subcommand}; use log::error; use unshell_rs::Cli; -use unshell_rs_lib::{ - connection::{ConnectionConfig, Node}, - layers::LayerConfig, -}; +use unshell_rs_lib::connection::ConnectionConfig; pub static DEFAULT_CONFIG_FILEPATH: &'static str = "server_config.json"; @@ -33,10 +30,16 @@ struct Args { #[derive(Debug, Subcommand)] enum Commands { - Start, - Middle, - End, - + // Start, + // Middle, + // End, + // + Test1, + Test2, + Test3, + Test4, + Test5, + Test6, // Run as a service, and potentially hosting a website // #[command(arg_required_else_help = true)] // Relay { @@ -54,15 +57,15 @@ enum Commands { // // #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)] // // web_port: u16, // }, - /// Connect to remote server - Connect { - /// Remote server to connect to - host: String, + // /// Connect to remote server + // Connect { + // /// Remote server to connect to + // host: String, - /// Port listen to for command clients - #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)] - port: u16, - }, + // /// Port listen to for command clients + // #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)] + // port: u16, + // }, } fn main() -> Result<(), Box> { @@ -80,43 +83,87 @@ fn main() -> Result<(), Box> { // error!("{}", e); // } // } - Commands::Start {} => Node::run_master( - ConnectionConfig { - socket: SocketAddr::from_str("127.0.0.1:13370")?, - layers: vec![], - }, + Commands::Test1 {} => Cli::connect( + "Test1".to_string(), + vec![], vec![ConnectionConfig { socket: SocketAddr::from_str("127.0.0.1:13371")?, layers: vec![], }], ), - Commands::Middle {} => Node::run_node( - ConnectionConfig { + Commands::Test2 {} => Cli::connect( + "Test2".to_string(), + vec![ConnectionConfig { socket: SocketAddr::from_str("127.0.0.1:13371")?, layers: vec![], - }, + }], vec![ConnectionConfig { socket: SocketAddr::from_str("127.0.0.1:13372")?, - layers: vec![LayerConfig::Base64], + layers: vec![], }], ), - Commands::End {} => Node::run_node( - ConnectionConfig { + Commands::Test3 {} => Cli::connect( + "Test3".to_string(), + vec![ConnectionConfig { socket: SocketAddr::from_str("127.0.0.1:13372")?, - layers: vec![LayerConfig::Base64], - }, + layers: vec![], + }], + vec![ConnectionConfig { + socket: SocketAddr::from_str("127.0.0.1:13373")?, + layers: vec![], + }], + ), + Commands::Test4 {} => Cli::connect( + "Test4".to_string(), + vec![ConnectionConfig { + socket: SocketAddr::from_str("127.0.0.1:13371")?, + layers: vec![], + }], + vec![ConnectionConfig { + socket: SocketAddr::from_str("127.0.0.1:13374")?, + layers: vec![], + }], + ), + Commands::Test5 {} => Cli::connect( + "Test5".to_string(), + vec![ + ConnectionConfig { + socket: SocketAddr::from_str("127.0.0.1:13372")?, + layers: vec![], + }, + ConnectionConfig { + socket: SocketAddr::from_str("127.0.0.1:13374")?, + layers: vec![], + }, + ], + vec![ConnectionConfig { + socket: SocketAddr::from_str("127.0.0.1:13375")?, + layers: vec![], + }], + ), + Commands::Test6 {} => Cli::connect( + "Test6".to_string(), + vec![ + ConnectionConfig { + socket: SocketAddr::from_str("127.0.0.1:13373")?, + layers: vec![], + }, + ConnectionConfig { + socket: SocketAddr::from_str("127.0.0.1:13375")?, + layers: vec![], + }, + ], vec![], ), - - Commands::Connect { host, port } => { - let addr = SocketAddr::from_str(format!("{}:{}", host, port).as_str()); - Cli::connect(if let Ok(addr) = addr { - addr - } else { - error!("Could not parse address!"); - return Ok(()); - }) - } + // Commands::Connect { host, port } => { + // let addr = SocketAddr::from_str(format!("{}:{}", host, port).as_str()); + // Cli::connect(if let Ok(addr) = addr { + // addr + // } else { + // error!("Could not parse address!"); + // return Ok(()); + // }) + // } } { error!("{}", e); }; diff --git a/unshell-rs-lib/src/connection/listener.rs b/unshell-rs-lib/src/connection/listener.rs index ab0b11c..2923e01 100644 --- a/unshell-rs-lib/src/connection/listener.rs +++ b/unshell-rs-lib/src/connection/listener.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::layers::LayerConfig; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct ConnectionConfig { pub socket: SocketAddr, pub layers: Vec, diff --git a/unshell-rs-lib/src/connection/mod.rs b/unshell-rs-lib/src/connection/mod.rs index 018ac35..0358d70 100644 --- a/unshell-rs-lib/src/connection/mod.rs +++ b/unshell-rs-lib/src/connection/mod.rs @@ -4,5 +4,5 @@ mod packets; pub use listener::ConnectionConfig; pub use node::Node; -pub use packets::PacketError; +// pub use packets::PacketError; pub use packets::Packets; diff --git a/unshell-rs-lib/src/connection/node-test b/unshell-rs-lib/src/connection/node-test new file mode 100644 index 0000000..b64b9d0 --- /dev/null +++ b/unshell-rs-lib/src/connection/node-test @@ -0,0 +1,366 @@ +use std::{ + collections::HashMap, + f32::consts::PI, + sync::{Arc, Mutex, MutexGuard}, + thread::{self, Thread}, + time::Duration, +}; + +use uuid::Uuid; + +use crate::{ + Error, + connection::{listener::ConnectionConfig, packets::Packets}, + layers::build_client, + networkers::{ClientTrait, Connection, ServerTrait, TCPClient, TCPServer, run_listener_state}, +}; + +pub struct Node { + id: String, + connections: HashMap>, + map: HashMap>, +} + +fn read(c: &mut Box) -> Result { + let a = Packets::decode(c.read()?.as_str()); + info!("Data: {:?}", a); + a +} + +fn write(c: &mut Box, packet: Packets) -> Result<(), Error> { + info!("Wrote: {:?}", packet); + c.write(packet.encode()?.as_str()) +} + +impl Node { + pub fn run_node( + id: String, + clients: Vec, + listeners: Vec, + ) -> Result<(), Error> { + // let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?; + + let state = Arc::new(Mutex::new(Self { + id: id, //Uuid::new_v4().to_string(), //TODO: Calling an OS RNG can pose a problem for security; + connections: HashMap::new(), + map: HashMap::new(), + })); + + for listener in listeners { + run_listener_state( + TCPServer::bind(&listener.socket)?, + listener.layers, + Self::on_listener_client, + Arc::clone(&state), + ); + } + + for client in clients { + let state = Arc::clone(&state); + thread::spawn(move || { + loop { + if let Err(e) = Self::run_client(client.clone(), &state) { + error!("{}", e); + } + + thread::sleep(Duration::from_millis(1000)); + } + }); + } + + thread::sleep(Duration::MAX); + + Ok(()) + } + + fn run_client(client: ConnectionConfig, state: &Arc>) -> Result<(), Error> { + Self::run_connection( + build_client(TCPClient::connect(&client.socket)?, client.layers)?, + state, + ); + + Ok(()) + } + + fn on_listener_client( + connection: Box, + state: Arc>, + ) { + thread::spawn(move || { + Self::run_connection(connection, &state); + }); + } + + fn run_connection(connection: Box, state: &Arc>) { + let mut connection = connection; + let s = state.lock().unwrap(); + + let this_uuid = s.id.clone(); + std::mem::drop(s); + + write( + &mut connection, + Packets::UpdateRoutes(this_uuid, (&mut state.lock().unwrap()).get_known_clients()), + ) + .unwrap(); + + let other_uuid = if let Ok(Packets::UpdateRoutes(src, routes)) = read(&mut connection) { + (&mut state.lock().unwrap()).extend_routes(src.clone(), routes); + src + } else { + error!("Could not get UUID!"); + return; + }; + + // info!("Connection: {}", other_uuid); + + (&mut state.lock().unwrap()) + .connections + .insert(other_uuid.clone(), connection.try_clone().unwrap()); + + // (&mut state.lock().unwrap()).connect(other_uuid.clone(), None); + + // let is_root = s.parent.is_none(); + + loop { + match read(&mut connection) { + Ok(packet) => { + let result: Result<(), Error> = match packet { + // Packets::UpdateRoutes(src, routes) => { + // Ok((&mut state.lock().unwrap()).extend_routes(src, routes)) + // } + // Packets::Connect(id) => { + // let source = if other_uuid == id { + // None + // } else { + // Some(other_uuid.clone()) + // }; + + // Ok((&mut state.lock().unwrap()).connect(id, source)) + // } + Packets::Disconnect(id) => { + let direct = other_uuid == id; + Ok((&mut state.lock().unwrap()).disconnect(id, direct)) + } + Packets::UpdateRoutes(src, routes) => { + Ok((&mut state.lock().unwrap()).extend_routes(src, routes)) + } + _ => { + error!("Unsupported packet: {:?}", packet); + + Ok(()) + } + }; + + if let Err(e) = result { + error!("Got error: {}", e); + } + } + Err(e) => { + if !connection.is_alive() { + warn!("Connection {} Disconnected!", connection.get_info()); + let state = &mut state.lock().unwrap(); + state.connections.remove(&other_uuid); + state.disconnect(other_uuid, true); + + break; + } + + error!("Got error: {}", e); + } + } + } + } + + // fn get_best_route(&self, uuid: &String) -> Result<(usize, String), Error> { + // let routes = self.map.get(uuid).ok_or("Route does not exist!")?; + + // if routes.is_empty() { + // return Ok((0, uuid.clone())); + // } + + // let mut min_hops = usize::MAX; + // let mut min_uuid = "".to_string(); + + // for (hops, uuid) in routes.iter() { + // if hops < &min_hops { + // min_hops = hops.clone(); + // min_uuid = uuid.clone(); + // } + // } + + // Ok((min_hops, min_uuid)) + // } + + // fn route_get_index_of_uuid(routes: &Vec<(usize, String)>, uuid: &String) -> Option { + // for (i, route) in routes.iter().enumerate() { + // if &route.1 == uuid { + // return Some(i); + // } + // } + + // return None; + // } + + // fn route_inc_one(routes: Vec<(usize, String)>) -> Vec<(usize, String)> { + // routes.iter().map(|r| (r.0 + 1, *r.1)).collect() + // } + + // fn get_known_routes(&self) -> Vec<(usize, String)> { + // let mut routes = self + // .map + // .keys() + // .map(|k| self.get_best_route(k).unwrap()) + // .collect::>(); + + // routes.push((0, self.id.clone())); + + // println!("Known routes: {:?}", routes); + + // routes + // } + + fn get_known_clients(&self) -> Vec { + let mut clients = self.map.keys().map(|k| k.clone()).collect::>(); + + clients.push(self.id.clone()); + + clients + } + + fn knows_client(&self, id: &String) -> bool { + self.get_known_clients().contains(id) + // self.connections.contains_key(id) + } + + fn broadcast(&mut self, data: Packets, disclude: Option<&String>) { + for (uuid, connection) in self.connections.iter_mut() { + if disclude.is_some() && disclude.unwrap() == uuid { + continue; + } + if let Err(e) = write(connection, data.clone()) { + error!("Failed to send packet to {}, {}", uuid, e); + } + } + } + + fn broadcast_table(&mut self, disclude: Option<&String>) { + self.broadcast( + Packets::UpdateRoutes(self.id.clone(), self.get_known_clients()), + disclude, + ); + } + + // fn connect(&mut self, id: String, source: Option) { + // if !self.knows_client(&id) && id != self.id { + // self.broadcast(Packets::Connect(id.clone()), Some(&id)); + // if let Some(source) = source { + // // Is direct + // self.extend_routes(id.clone(), vec![(9999, source.clone())]); + // self.broadcast_table(None); + // } else { + // self.map.insert(id.clone(), Vec::new()); + // self.broadcast_table(None); + // } + // self.print_map(); + // } + // } + + fn disconnect(&mut self, id: String, direct: bool) { + if self.knows_client(&id) { + self.broadcast(Packets::Disconnect(id.clone()), None); + + if direct { + for (i, uuid) in self.get_known_clients().iter().enumerate() { + // if let Some(i) = + // Self::route_get_index_of_uuid(self.map.get(&uuid).unwrap(), &uuid) + // { + // self.map.get_mut(&uuid).unwrap().remove(i); + + // self.broadcast(Packets::Disconnect(uuid.clone()), Some(&uuid)); + // } + + if self.map.contains_key(uuid) { + self.map.get_mut(uuid).unwrap().remove(i); + + self.broadcast(Packets::Disconnect(uuid.clone()), Some(&uuid)); + } + } + } + + self.map.remove(&id); + + // self.broadcast_table(Some(&id)); + self.print_map(); + } + } + + fn extend_routes(&mut self, src: String, routes: Vec) { + let mut updated = false; + + println!("{:?}", routes); + for route in routes { + if route == self.id { + continue; + } + + if self.map.contains_key(&route) { + // println!("{:?}", route); + + // let update = if let Some(i) = + // Self::route_get_index_of_uuid(self.map.get(&route).unwrap(), &route) + // { + // println!( + // "contains, {}, {:?} {:?}", + // i, + // self.map.get(&route.1).unwrap(), + // route + // ); + // if self.map.get(&route.1).unwrap()[i].0 != route.0 { + // let _ = self.map.get_mut(&route.1).unwrap().remove(i); + + // println!("true"); + + // true + // } else { + // println!("false"); + // false + // } + // // println!("{:?}", self.map.get(&route.1).unwrap()); + // // true + // } else { + // true + // }; + + // if update { + // self.map + // .get_mut(&route.1) + // .unwrap() + // .push((route.0 + 1, src.clone())); + // updated = true; + // } + if !self.map.get(&route).unwrap().contains(&route) { + self.map.get_mut(&route).unwrap().push(src.clone()); + updated = true; + } + } else { + self.map.insert(route.clone(), vec![route]); + updated = true; + } + } + + if updated { + self.broadcast_table(None); + self.print_map(); + } + } + + fn print_map(&self) { + info!("\n\n"); + info!("Local addr: {}", self.id); + info!("Table: "); + for (uuid, route) in self.map.iter() { + info!("{} -> [ {:?} ]", uuid, route); + } + } +} diff --git a/unshell-rs-lib/src/connection/node.rs b/unshell-rs-lib/src/connection/node.rs index ccb65fd..628029e 100644 --- a/unshell-rs-lib/src/connection/node.rs +++ b/unshell-rs-lib/src/connection/node.rs @@ -1,12 +1,10 @@ use std::{ - f32::consts::PI, + collections::HashMap, sync::{Arc, Mutex}, thread, time::Duration, }; -use uuid::Uuid; - use crate::{ Error, connection::{listener::ConnectionConfig, packets::Packets}, @@ -15,144 +13,70 @@ use crate::{ }; pub struct Node { - // parent: Box, - clients: Vec, -} - -pub struct Client { - connection: Box, - uuid: String, - route: Vec, -} - -impl Client { - pub fn get_info(&self) -> String { - format!("{} ({})", self.uuid, self.route.join("->")) - } + id: String, + connections: HashMap>, + map: HashMap>, } fn read(c: &mut Box) -> Result { - Packets::decode(c.read()?.as_str()) + let a = Packets::decode(c.read()?.as_str()); + info!("Data: {:?}", a); + a } fn write(c: &mut Box, packet: Packets) -> Result<(), Error> { + info!("Wrote: {:?}", packet); c.write(packet.encode()?.as_str()) } impl Node { - fn run_listeners( - state: &Arc>, - listeners: Vec, - ) -> Result<(), Error> { - // Start server listeners - for listener in listeners { - run_listener_state( - TCPServer::bind(&listener.socket)?, - listener.layers, - Self::on_listener_client, - Arc::clone(state), - ); - } - - Ok(()) - } - pub fn run_node( - parent: ConnectionConfig, - listeners: Vec, - ) -> Result<(), Error> { - let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?; - - let state = Arc::new(Mutex::new(Self { - // parent: parent_clone, - clients: Vec::new(), - })); - - Self::run_listeners(&state, listeners)?; - - while parent.is_alive() { - match read(&mut parent) { - Ok(packet) => match packet { - Packets::GetRoutes => write( - &mut parent, - Packets::UpdateRoutes(state.lock().unwrap().get_routes()), - )?, - _ => {} - }, - Err(e) => { - error!("Error: {}", e) - } - } - } - - Ok(()) - } - - pub fn run_master( - server: ConnectionConfig, + id: String, + clients: Vec, listeners: Vec, ) -> Result<(), Error> { // let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?; let state = Arc::new(Mutex::new(Self { - // parent: parent_clone, - clients: Vec::new(), + id: id, //Uuid::new_v4().to_string(), //TODO: Calling an OS RNG can pose a problem for security; + connections: HashMap::new(), + map: HashMap::new(), })); - run_listener_state( - TCPServer::bind(&server.socket)?, - server.layers, - Self::on_command_client, - Arc::clone(&state), - ); + for listener in listeners { + run_listener_state( + TCPServer::bind(&listener.socket)?, + listener.layers, + Self::on_listener_client, + Arc::clone(&state), + ); + } - Self::run_listeners(&state, listeners)?; + for client in clients { + let state = Arc::clone(&state); + thread::spawn(move || { + loop { + if let Err(e) = Self::run_client(client.clone(), &state) { + error!("{}", e); + } + + thread::sleep(Duration::from_millis(1000)); + } + }); + } thread::sleep(Duration::MAX); Ok(()) } - fn on_command_client( - connection: Box, - state: Arc>, - ) { - thread::spawn(move || { - let mut connection = connection; - loop { - match read(&mut connection) { - Ok(packet) => { - let result = match packet { - Packets::GetConnections => write( - &mut connection, - Packets::UpdateConnections(state.lock().unwrap().get_clients()), - ), - Packets::GetRoutes => write( - &mut connection, - Packets::UpdateRoutes(state.lock().unwrap().get_routes()), - ), - _ => { - error!("Unsupported packet: {:?}", packet); + fn run_client(client: ConnectionConfig, state: &Arc>) -> Result<(), Error> { + Self::run_connection( + build_client(TCPClient::connect(&client.socket)?, client.layers)?, + state, + )?; - Ok(()) - } - }; - - if let Err(e) = result { - error!("Got error: {}", e); - } - } - Err(e) => { - if !connection.is_alive() { - warn!("Connection {} disconnected!", connection.get_info()); - break; - } else { - error!("Got error: {}", e); - } - } - } - } - }); + Ok(()) } fn on_listener_client( @@ -160,104 +84,175 @@ impl Node { state: Arc>, ) { thread::spawn(move || { - let mut connection = connection; - let mut s = state.lock().unwrap(); - let index = s.clients.len(); - - let uuid = Uuid::new_v4().to_string(); //TODO: Calling an OS RNG can pose a problem for security; - - s.clients.push(Client { - uuid: uuid.clone(), - connection: connection.try_clone().unwrap(), - route: vec![uuid], - }); - - write( - &mut connection, - Packets::OnClientConnect { - id: s.clients.last().unwrap().uuid.clone(), - route: s.clients.last().unwrap().route.clone(), - }, - ) - .unwrap(); - - std::mem::drop(s); - - // let is_root = s.parent.is_none(); - - loop { - match read(&mut connection) { - Ok(packet) => { - let result = match packet { - Packets::GetConnections => write( - &mut connection, - Packets::UpdateConnections(state.lock().unwrap().get_clients()), - ), - Packets::GetRoutes => write( - &mut connection, - Packets::UpdateRoutes(state.lock().unwrap().get_routes()), - ), - Packets::OnClientConnect { id, route } => Ok(()), - _ => { - error!("Unsupported packet: {:?}", packet); - - Ok(()) - } - }; - - if let Err(e) = result { - error!("Got error: {}", e); - } - } - Err(e) => { - if !connection.is_alive() { - (&mut state.lock().unwrap()).clients.remove(index); - warn!("Connection {} Disconnected!", connection.get_info()); - break; - } - - error!("Got error: {}", e); - } - } + if let Err(e) = Self::run_connection(connection, &state) { + error!("{}", e); } }); } - fn get_clients(&self) -> Vec { - self.clients - .iter() - .map(|c| format!("Client {}", c.get_info())) - .collect() - } + fn run_connection( + connection: Box, + state: &Arc>, + ) -> Result<(), Error> { + let mut connection = connection; + let s = state.lock().unwrap(); - fn get_routes(&mut self) -> Vec { - let mut routes = Vec::new(); + let this_uuid = s.id.clone(); + std::mem::drop(s); - for client in &mut self.clients { - let prefix = client.connection.get_info(); + write( + &mut connection, + Packets::UpdateRoutes( + this_uuid, + (&mut state.lock().unwrap()).get_self_and_known_clients(), + ), + )?; - routes.push(prefix.clone()); + let other_uuid = if let Packets::UpdateRoutes(src, routes) = read(&mut connection)? { + (&mut state.lock().unwrap()).extend_routes(src.clone(), routes); + src + } else { + return Err("Could not get UUID!".into()); + }; - if let Err(e) = write(&mut client.connection, Packets::GetRoutes) { - error!("Failed to send packet: {}", e); - } + // info!("Connection: {}", other_uuid); - if let Ok(Packets::UpdateRoutes(new_routes)) = read(&mut client.connection) { - routes.append( - new_routes - .iter() - .map(|c| format!("{} -> {}", prefix, c)) - .collect::>() - .as_mut(), - ); + (&mut state.lock().unwrap()) + .connections + .insert(other_uuid.clone(), connection.try_clone()?); + + loop { + match read(&mut connection) { + Ok(packet) => { + let result: Result<(), Error> = match packet { + Packets::Disconnect(id) => { + let direct = other_uuid == id; + Ok((&mut state.lock().unwrap()).disconnect(id, direct)) + } + Packets::UpdateRoutes(src, routes) => { + Ok((&mut state.lock().unwrap()).extend_routes(src, routes)) + } + _ => { + error!("Unsupported packet: {:?}", packet); + + Ok(()) + } + }; + + if let Err(e) = result { + error!("Got error: {}", e); + } + } + Err(e) => { + if !connection.is_alive() { + warn!("Connection {} Disconnected!", connection.get_info()); + let state = &mut state.lock().unwrap(); + state.connections.remove(&other_uuid); + state.disconnect(other_uuid, true); + + break; + } + + error!("Got error: {}", e); + } } } - routes + Ok(()) + } - // self.clients - // .iter() - // .map(|c| format!("Client {}", c.get_info())) - // .collect() + fn get_known_clients(&self) -> Vec { + self.map.keys().map(|k| k.clone()).collect::>() + } + + fn get_self_and_known_clients(&self) -> Vec { + let mut clients = self.get_known_clients(); + clients.push(self.id.clone()); + clients + } + + fn knows_client(&self, id: &String) -> bool { + self.get_known_clients().contains(id) + } + + fn broadcast(&mut self, data: Packets, disclude: Option<&String>) { + for (uuid, connection) in self.connections.iter_mut() { + if disclude.is_some() && disclude.unwrap() == uuid { + continue; + } + if let Err(e) = write(connection, data.clone()) { + error!("Failed to send packet to {}, {}", uuid, e); + } + } + } + + fn broadcast_table(&mut self, disclude: Option<&String>) { + self.broadcast( + Packets::UpdateRoutes(self.id.clone(), self.get_self_and_known_clients()), + disclude, + ); + } + + fn disconnect(&mut self, id: String, direct: bool) { + if self.knows_client(&id) { + self.broadcast(Packets::Disconnect(id.clone()), None); + + if direct { + for uuid in self.get_known_clients() { + if self.map.get(&uuid).unwrap().contains(&id) { + let index = self + .map + .get_mut(&uuid) + .unwrap() + .iter() + .position(|r| r == &id) + .unwrap(); + + self.map.get_mut(&uuid).unwrap().remove(index); + + // self.broadcast(Packets::Disconnect(uuid.clone()), Some(&uuid)); + } + } + } + + self.map.remove(&id); + + self.print_map(); + } + } + + fn extend_routes(&mut self, src: String, routes: Vec) { + let mut updated = false; + + println!("{:?}", routes); + for route in routes { + if route == self.id { + continue; + } + + if self.map.contains_key(&route) { + if !self.map.get(&route).unwrap().contains(&src) { + self.map.get_mut(&route).unwrap().push(src.clone()); + updated = true; + } + } else { + self.map.insert(route.clone(), vec![src.clone()]); + updated = true; + } + } + + if updated { + self.broadcast_table(None); + self.print_map(); + } + } + + fn print_map(&self) { + info!("\n\n"); + info!("Local addr: {}", self.id); + info!("Table: "); + for (uuid, route) in self.map.iter() { + info!("{} -> [ {:?} ]", uuid, route); + } } } diff --git a/unshell-rs-lib/src/connection/packets.rs b/unshell-rs-lib/src/connection/packets.rs index 36e0e7d..b7d3c91 100644 --- a/unshell-rs-lib/src/connection/packets.rs +++ b/unshell-rs-lib/src/connection/packets.rs @@ -2,23 +2,12 @@ use serde::{Deserialize, Serialize}; use crate::Error; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub enum Packets { - GetConnections, - UpdateConnections(Vec), - - GetRoutes, - UpdateRoutes(Vec), - - OnClientConnect { id: String, route: Vec }, - OnClientDisconnect { id: String }, - - Error(PacketError), -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum PacketError { - UnsupportedType, + UpdateRoutes(String, Vec), + Connect(String), + Disconnect(String), + Data { source: String, data: String }, } impl Packets { diff --git a/unshell-rs-lib/src/layers/mod.rs b/unshell-rs-lib/src/layers/mod.rs index a306ec8..e4d3656 100644 --- a/unshell-rs-lib/src/layers/mod.rs +++ b/unshell-rs-lib/src/layers/mod.rs @@ -1,7 +1,7 @@ use serde::Deserialize; use serde::Serialize; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub enum LayerConfig { Base64, Handshake,