diff --git a/unshell-rs-lib/src/connection/node-test b/unshell-rs-lib/src/connection/node-test deleted file mode 100644 index b64b9d0..0000000 --- a/unshell-rs-lib/src/connection/node-test +++ /dev/null @@ -1,366 +0,0 @@ -use std::{ - collections::HashMap, - f32::consts::PI, - sync::{Arc, Mutex, MutexGuard}, - thread::{self, Thread}, - time::Duration, -}; - -use uuid::Uuid; - -use crate::{ - Error, - connection::{listener::ConnectionConfig, packets::Packets}, - layers::build_client, - networkers::{ClientTrait, Connection, ServerTrait, TCPClient, TCPServer, run_listener_state}, -}; - -pub struct Node { - id: String, - connections: HashMap>, - map: HashMap>, -} - -fn read(c: &mut Box) -> Result { - let a = Packets::decode(c.read()?.as_str()); - info!("Data: {:?}", a); - a -} - -fn write(c: &mut Box, packet: Packets) -> Result<(), Error> { - info!("Wrote: {:?}", packet); - c.write(packet.encode()?.as_str()) -} - -impl Node { - pub fn run_node( - id: String, - clients: Vec, - listeners: Vec, - ) -> Result<(), Error> { - // let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?; - - let state = Arc::new(Mutex::new(Self { - id: id, //Uuid::new_v4().to_string(), //TODO: Calling an OS RNG can pose a problem for security; - connections: HashMap::new(), - map: HashMap::new(), - })); - - for listener in listeners { - run_listener_state( - TCPServer::bind(&listener.socket)?, - listener.layers, - Self::on_listener_client, - Arc::clone(&state), - ); - } - - for client in clients { - let state = Arc::clone(&state); - thread::spawn(move || { - loop { - if let Err(e) = Self::run_client(client.clone(), &state) { - error!("{}", e); - } - - thread::sleep(Duration::from_millis(1000)); - } - }); - } - - thread::sleep(Duration::MAX); - - Ok(()) - } - - fn run_client(client: ConnectionConfig, state: &Arc>) -> Result<(), Error> { - Self::run_connection( - build_client(TCPClient::connect(&client.socket)?, client.layers)?, - state, - ); - - Ok(()) - } - - fn on_listener_client( - connection: Box, - state: Arc>, - ) { - thread::spawn(move || { - Self::run_connection(connection, &state); - }); - } - - fn run_connection(connection: Box, state: &Arc>) { - let mut connection = connection; - let s = state.lock().unwrap(); - - let this_uuid = s.id.clone(); - std::mem::drop(s); - - write( - &mut connection, - Packets::UpdateRoutes(this_uuid, (&mut state.lock().unwrap()).get_known_clients()), - ) - .unwrap(); - - let other_uuid = if let Ok(Packets::UpdateRoutes(src, routes)) = read(&mut connection) { - (&mut state.lock().unwrap()).extend_routes(src.clone(), routes); - src - } else { - error!("Could not get UUID!"); - return; - }; - - // info!("Connection: {}", other_uuid); - - (&mut state.lock().unwrap()) - .connections - .insert(other_uuid.clone(), connection.try_clone().unwrap()); - - // (&mut state.lock().unwrap()).connect(other_uuid.clone(), None); - - // let is_root = s.parent.is_none(); - - loop { - match read(&mut connection) { - Ok(packet) => { - let result: Result<(), Error> = match packet { - // Packets::UpdateRoutes(src, routes) => { - // Ok((&mut state.lock().unwrap()).extend_routes(src, routes)) - // } - // Packets::Connect(id) => { - // let source = if other_uuid == id { - // None - // } else { - // Some(other_uuid.clone()) - // }; - - // Ok((&mut state.lock().unwrap()).connect(id, source)) - // } - Packets::Disconnect(id) => { - let direct = other_uuid == id; - Ok((&mut state.lock().unwrap()).disconnect(id, direct)) - } - Packets::UpdateRoutes(src, routes) => { - Ok((&mut state.lock().unwrap()).extend_routes(src, routes)) - } - _ => { - error!("Unsupported packet: {:?}", packet); - - Ok(()) - } - }; - - if let Err(e) = result { - error!("Got error: {}", e); - } - } - Err(e) => { - if !connection.is_alive() { - warn!("Connection {} Disconnected!", connection.get_info()); - let state = &mut state.lock().unwrap(); - state.connections.remove(&other_uuid); - state.disconnect(other_uuid, true); - - break; - } - - error!("Got error: {}", e); - } - } - } - } - - // fn get_best_route(&self, uuid: &String) -> Result<(usize, String), Error> { - // let routes = self.map.get(uuid).ok_or("Route does not exist!")?; - - // if routes.is_empty() { - // return Ok((0, uuid.clone())); - // } - - // let mut min_hops = usize::MAX; - // let mut min_uuid = "".to_string(); - - // for (hops, uuid) in routes.iter() { - // if hops < &min_hops { - // min_hops = hops.clone(); - // min_uuid = uuid.clone(); - // } - // } - - // Ok((min_hops, min_uuid)) - // } - - // fn route_get_index_of_uuid(routes: &Vec<(usize, String)>, uuid: &String) -> Option { - // for (i, route) in routes.iter().enumerate() { - // if &route.1 == uuid { - // return Some(i); - // } - // } - - // return None; - // } - - // fn route_inc_one(routes: Vec<(usize, String)>) -> Vec<(usize, String)> { - // routes.iter().map(|r| (r.0 + 1, *r.1)).collect() - // } - - // fn get_known_routes(&self) -> Vec<(usize, String)> { - // let mut routes = self - // .map - // .keys() - // .map(|k| self.get_best_route(k).unwrap()) - // .collect::>(); - - // routes.push((0, self.id.clone())); - - // println!("Known routes: {:?}", routes); - - // routes - // } - - fn get_known_clients(&self) -> Vec { - let mut clients = self.map.keys().map(|k| k.clone()).collect::>(); - - clients.push(self.id.clone()); - - clients - } - - fn knows_client(&self, id: &String) -> bool { - self.get_known_clients().contains(id) - // self.connections.contains_key(id) - } - - fn broadcast(&mut self, data: Packets, disclude: Option<&String>) { - for (uuid, connection) in self.connections.iter_mut() { - if disclude.is_some() && disclude.unwrap() == uuid { - continue; - } - if let Err(e) = write(connection, data.clone()) { - error!("Failed to send packet to {}, {}", uuid, e); - } - } - } - - fn broadcast_table(&mut self, disclude: Option<&String>) { - self.broadcast( - Packets::UpdateRoutes(self.id.clone(), self.get_known_clients()), - disclude, - ); - } - - // fn connect(&mut self, id: String, source: Option) { - // if !self.knows_client(&id) && id != self.id { - // self.broadcast(Packets::Connect(id.clone()), Some(&id)); - // if let Some(source) = source { - // // Is direct - // self.extend_routes(id.clone(), vec![(9999, source.clone())]); - // self.broadcast_table(None); - // } else { - // self.map.insert(id.clone(), Vec::new()); - // self.broadcast_table(None); - // } - // self.print_map(); - // } - // } - - fn disconnect(&mut self, id: String, direct: bool) { - if self.knows_client(&id) { - self.broadcast(Packets::Disconnect(id.clone()), None); - - if direct { - for (i, uuid) in self.get_known_clients().iter().enumerate() { - // if let Some(i) = - // Self::route_get_index_of_uuid(self.map.get(&uuid).unwrap(), &uuid) - // { - // self.map.get_mut(&uuid).unwrap().remove(i); - - // self.broadcast(Packets::Disconnect(uuid.clone()), Some(&uuid)); - // } - - if self.map.contains_key(uuid) { - self.map.get_mut(uuid).unwrap().remove(i); - - self.broadcast(Packets::Disconnect(uuid.clone()), Some(&uuid)); - } - } - } - - self.map.remove(&id); - - // self.broadcast_table(Some(&id)); - self.print_map(); - } - } - - fn extend_routes(&mut self, src: String, routes: Vec) { - let mut updated = false; - - println!("{:?}", routes); - for route in routes { - if route == self.id { - continue; - } - - if self.map.contains_key(&route) { - // println!("{:?}", route); - - // let update = if let Some(i) = - // Self::route_get_index_of_uuid(self.map.get(&route).unwrap(), &route) - // { - // println!( - // "contains, {}, {:?} {:?}", - // i, - // self.map.get(&route.1).unwrap(), - // route - // ); - // if self.map.get(&route.1).unwrap()[i].0 != route.0 { - // let _ = self.map.get_mut(&route.1).unwrap().remove(i); - - // println!("true"); - - // true - // } else { - // println!("false"); - // false - // } - // // println!("{:?}", self.map.get(&route.1).unwrap()); - // // true - // } else { - // true - // }; - - // if update { - // self.map - // .get_mut(&route.1) - // .unwrap() - // .push((route.0 + 1, src.clone())); - // updated = true; - // } - if !self.map.get(&route).unwrap().contains(&route) { - self.map.get_mut(&route).unwrap().push(src.clone()); - updated = true; - } - } else { - self.map.insert(route.clone(), vec![route]); - updated = true; - } - } - - if updated { - self.broadcast_table(None); - self.print_map(); - } - } - - fn print_map(&self) { - info!("\n\n"); - info!("Local addr: {}", self.id); - info!("Table: "); - for (uuid, route) in self.map.iter() { - info!("{} -> [ {:?} ]", uuid, route); - } - } -} diff --git a/unshell-rs-lib/src/connection/node.rs b/unshell-rs-lib/src/connection/node.rs index 628029e..88f2914 100644 --- a/unshell-rs-lib/src/connection/node.rs +++ b/unshell-rs-lib/src/connection/node.rs @@ -100,44 +100,42 @@ impl Node { let this_uuid = s.id.clone(); std::mem::drop(s); - write( - &mut connection, - Packets::UpdateRoutes( - this_uuid, - (&mut state.lock().unwrap()).get_self_and_known_clients(), - ), - )?; + // Send UUID to new connection + write(&mut connection, Packets::SyncUUID(this_uuid.clone()))?; - let other_uuid = if let Packets::UpdateRoutes(src, routes) = read(&mut connection)? { - (&mut state.lock().unwrap()).extend_routes(src.clone(), routes); - src + // Recieve UUID + let other_uuid = if let Packets::SyncUUID(source) = read(&mut connection)? { + source } else { return Err("Could not get UUID!".into()); }; - // info!("Connection: {}", other_uuid); + info!("Connection from {} to {}", this_uuid, other_uuid); + // Add connection (&mut state.lock().unwrap()) .connections .insert(other_uuid.clone(), connection.try_clone()?); + // Update direct connections and the new connections with the new table + (&mut state.lock().unwrap()).broadcast_table(None); + loop { match read(&mut connection) { Ok(packet) => { - let result: Result<(), Error> = match packet { - Packets::Disconnect(id) => { - let direct = other_uuid == id; - Ok((&mut state.lock().unwrap()).disconnect(id, direct)) - } - Packets::UpdateRoutes(src, routes) => { - Ok((&mut state.lock().unwrap()).extend_routes(src, routes)) - } - _ => { - error!("Unsupported packet: {:?}", packet); + let result: Result<(), Error> = + match packet { + Packets::Disconnect { routes } => Ok( + (&mut state.lock().unwrap()).disconnect(&other_uuid, routes, false) + ), + Packets::Update { routes } => Ok((&mut state.lock().unwrap()) + .extend_routes(other_uuid.clone(), routes)), + _ => { + error!("Unsupported packet: {:?}", packet); - Ok(()) - } - }; + Ok(()) + } + }; if let Err(e) = result { error!("Got error: {}", e); @@ -148,7 +146,7 @@ impl Node { warn!("Connection {} Disconnected!", connection.get_info()); let state = &mut state.lock().unwrap(); state.connections.remove(&other_uuid); - state.disconnect(other_uuid, true); + state.disconnect(&this_uuid, vec![other_uuid.clone()], true); break; } @@ -165,16 +163,21 @@ impl Node { self.map.keys().map(|k| k.clone()).collect::>() } - fn get_self_and_known_clients(&self) -> Vec { - let mut clients = self.get_known_clients(); - clients.push(self.id.clone()); - clients + fn get_direct_connections(&self) -> Vec { + self.connections + .keys() + .map(|k| k.clone()) + .collect::>() } fn knows_client(&self, id: &String) -> bool { self.get_known_clients().contains(id) } + fn remove_null_nodes(&mut self) { + self.map.retain(|_, routes| !routes.is_empty()); + } + fn broadcast(&mut self, data: Packets, disclude: Option<&String>) { for (uuid, connection) in self.connections.iter_mut() { if disclude.is_some() && disclude.unwrap() == uuid { @@ -186,65 +189,149 @@ impl Node { } } - fn broadcast_table(&mut self, disclude: Option<&String>) { - self.broadcast( - Packets::UpdateRoutes(self.id.clone(), self.get_self_and_known_clients()), - disclude, - ); + fn get_routes_to(&self, recv_uuid: &String) -> Vec { + let mut tx_routes: Vec = Vec::new(); + + for (map_uuid, routes) in self.map.iter() { + // Do not transmit a route, which bounces directly back to the sender + if routes.len() == 1 && &routes[0] == recv_uuid { + continue; + } + + tx_routes.push(map_uuid.clone()); + } + + tx_routes.append(&mut self.get_direct_connections()); + + tx_routes } - fn disconnect(&mut self, id: String, direct: bool) { - if self.knows_client(&id) { - self.broadcast(Packets::Disconnect(id.clone()), None); + fn broadcast_table(&mut self, disclude: Option<&String>) { + let packets = self + .connections + .iter() + .map(|(recv_uuid, _)| self.get_routes_to(&recv_uuid)) + .collect::>>(); - if direct { - for uuid in self.get_known_clients() { - if self.map.get(&uuid).unwrap().contains(&id) { - let index = self - .map - .get_mut(&uuid) - .unwrap() - .iter() - .position(|r| r == &id) - .unwrap(); - - self.map.get_mut(&uuid).unwrap().remove(index); - - // self.broadcast(Packets::Disconnect(uuid.clone()), Some(&uuid)); - } + for (i, (recv_uuid, connection)) in self.connections.iter_mut().enumerate() { + if let Some(disclude) = disclude { + if disclude == recv_uuid { + continue; } } - self.map.remove(&id); - - self.print_map(); + if let Err(e) = write( + connection, + Packets::Update { + routes: packets[i].clone(), + }, + ) { + error!("Failed to send packet to {}, {}", recv_uuid, e); + } } } + fn disconnect(&mut self, source: &String, routes: Vec, direct: bool) { + let mut resend_table = false; + let mut remove_uuids = Vec::new(); + + for remove_uuid in routes { + // Sanity check, in case the current client is still connected + if self.get_direct_connections().contains(&remove_uuid) { + resend_table = true; + continue; + } + + // Check if client still exists, or if it was a direct connection + // Prevents infinite network loops + if direct || self.knows_client(&remove_uuid) { + self.map.remove(&remove_uuid); + remove_uuids.push(remove_uuid.clone()); + + for (uuid, route) in self.map.iter_mut() { + if route.contains(&remove_uuid) { + let index = route.iter().position(|r| r == &remove_uuid).unwrap(); + route.remove(index); + remove_uuids.push(uuid.clone()); + } + } + + self.remove_null_nodes(); + } + + // for uuid in remove_uuids { + } + + if !remove_uuids.is_empty() { + self.broadcast( + Packets::Disconnect { + routes: remove_uuids, + }, + Some(source), + ); + } + + if resend_table { + self.broadcast_table(None); + } + + // } + + self.print_map(); + } + fn extend_routes(&mut self, src: String, routes: Vec) { let mut updated = false; - println!("{:?}", routes); + // Quick sanity check + if !self.get_direct_connections().contains(&src) { + return; + } + + // Loop through all of the routes in the new recieved route map for route in routes { + // If the route loops back to self, disregard. if route == self.id { continue; } + // If the connection is already established directly, disregard + if self.get_direct_connections().contains(&route) { + continue; + } + + // If there is already an entry created for this route if self.map.contains_key(&route) { + // If the route does not already contain the new one if !self.map.get(&route).unwrap().contains(&src) { + // If the neighbor can be acessed directly, disregard self.map.get_mut(&route).unwrap().push(src.clone()); updated = true; + } else { + // Else, do nothing + continue; } } else { + // Else, create the new route entry self.map.insert(route.clone(), vec![src.clone()]); updated = true; } } - if updated { - self.broadcast_table(None); - self.print_map(); + // Solves the case that if a remote node has said that a neighbor has connected before itself has + let direct_connections = self.get_direct_connections(); + for connection in direct_connections { + if self.map.contains_key(&connection) { + self.map.remove(&connection); + } } + + // If something has updated, rebroadcast + // Prevents infinite network loops + if updated { + self.broadcast_table(Some(&src)); + } + self.print_map(); } fn print_map(&self) { @@ -254,5 +341,6 @@ impl Node { for (uuid, route) in self.map.iter() { info!("{} -> [ {:?} ]", uuid, route); } + info!("Direct: {:?}", self.get_direct_connections()); } } diff --git a/unshell-rs-lib/src/connection/packets.rs b/unshell-rs-lib/src/connection/packets.rs index b7d3c91..22a820f 100644 --- a/unshell-rs-lib/src/connection/packets.rs +++ b/unshell-rs-lib/src/connection/packets.rs @@ -4,9 +4,9 @@ use crate::Error; #[derive(Debug, Serialize, Deserialize, Clone)] pub enum Packets { - UpdateRoutes(String, Vec), - Connect(String), - Disconnect(String), + SyncUUID(String), + Update { routes: Vec }, + Disconnect { routes: Vec }, Data { source: String, data: String }, }