Create the internet for malware

This commit is contained in:
Michael Mikovsky
2025-06-11 12:28:55 -06:00
parent c5f6e2920c
commit c88b9623c6
11 changed files with 714 additions and 316 deletions
+55 -56
View File
@@ -1,72 +1,71 @@
use std::{io::Write, net::SocketAddr, thread};
use unshell_rs_lib::{ use unshell_rs_lib::{
Error, Error,
connection::{PacketError, Packets}, connection::{ConnectionConfig, Node},
layers::build_client,
networkers::{ClientTrait, Connection, TCPClient},
}; };
pub struct Cli; pub struct Cli;
impl Cli { impl Cli {
pub fn connect(addr: SocketAddr) -> Result<(), Error> { pub fn connect(
let mut client = build_client(TCPClient::connect(&addr)?, vec![])?; id: String,
clients: Vec<ConnectionConfig>,
listeners: Vec<ConnectionConfig>,
) -> Result<(), Error> {
// let mut client = build_client(TCPClient::connect(&addr)?, vec![])?;
let stdin = std::io::stdin(); // let stdin = std::io::stdin();
let mut stdout = std::io::stdout(); // let mut stdout = std::io::stdout();
let mut client_clone = client.try_clone()?; Node::run_node(id, clients, listeners)
thread::spawn(move || {
// let data = client.read()?;
let packet = Packets::decode(client_clone.read().unwrap().as_str()).unwrap(); // let mut client_clone = client.try_clone()?;
// thread::spawn(move || {
// // let data = client.read()?;
match packet { // let packet = Packets::decode(client_clone.read().unwrap().as_str()).unwrap();
Packets::UpdateConnections(items) => {
for item in items {
println!("{}", item);
}
}
Packets::UpdateRoutes(items) => {
for item in items {
println!("{}", item);
}
}
_ => {
client_clone
.write(
Packets::Error(PacketError::UnsupportedType)
.encode()
.unwrap()
.as_str(),
)
.unwrap();
warn!("Invalid packet: {:?}", packet)
}
}
});
loop { // match packet {
print!("> "); // Packets::UpdateConnections(items) => {
stdout.flush()?; // for item in items {
// println!("{}", item);
// }
// }
// Packets::UpdateRoutes(items) => {
// for item in items {
// println!("{}", item);
// }
// }
// _ => {
// client_clone
// .write(
// Packets::Error(PacketError::UnsupportedType)
// .encode()
// .unwrap()
// .as_str(),
// )
// .unwrap();
// warn!("Invalid packet: {:?}", packet)
// }
// }
// });
let mut input = String::new(); // loop {
stdin.read_line(&mut input)?; // print!("> ");
let input = input.trim(); // stdout.flush()?;
match input.split(" ").nth(0).unwrap() { // let mut input = String::new();
"clients" => { // stdin.read_line(&mut input)?;
client.write(Packets::GetConnections.encode()?.as_str())?; // let input = input.trim();
}
"routes" => {
client.write(Packets::GetRoutes.encode()?.as_str())?;
}
_ => {
warn!("Invalid command!")
}
}
// client.write(input)?; // match input.split(" ").nth(0).unwrap() {
} // "ping" => {
// // client.write(Packets::GetConnections.encode()?.as_str())?;
// }
// _ => {
// warn!("Invalid command!")
// }
// }
// // client.write(input)?;
// }
} }
} }
View File
+2
View File
@@ -1,2 +1,4 @@
mod cli; mod cli;
mod client_node;
pub use cli::Cli; pub use cli::Cli;
+1 -1
View File
@@ -1,4 +1,4 @@
#[macro_use] // #[macro_use]
extern crate log; extern crate log;
mod client; mod client;
+84 -37
View File
@@ -8,10 +8,7 @@ use std::{
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use log::error; use log::error;
use unshell_rs::Cli; use unshell_rs::Cli;
use unshell_rs_lib::{ use unshell_rs_lib::connection::ConnectionConfig;
connection::{ConnectionConfig, Node},
layers::LayerConfig,
};
pub static DEFAULT_CONFIG_FILEPATH: &'static str = "server_config.json"; pub static DEFAULT_CONFIG_FILEPATH: &'static str = "server_config.json";
@@ -33,10 +30,16 @@ struct Args {
#[derive(Debug, Subcommand)] #[derive(Debug, Subcommand)]
enum Commands { enum Commands {
Start, // Start,
Middle, // Middle,
End, // End,
//
Test1,
Test2,
Test3,
Test4,
Test5,
Test6,
// Run as a service, and potentially hosting a website // Run as a service, and potentially hosting a website
// #[command(arg_required_else_help = true)] // #[command(arg_required_else_help = true)]
// Relay { // Relay {
@@ -54,15 +57,15 @@ enum Commands {
// // #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)] // // #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)]
// // web_port: u16, // // web_port: u16,
// }, // },
/// Connect to remote server // /// Connect to remote server
Connect { // Connect {
/// Remote server to connect to // /// Remote server to connect to
host: String, // host: String,
/// Port listen to for command clients // /// Port listen to for command clients
#[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)] // #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)]
port: u16, // port: u16,
}, // },
} }
fn main() -> Result<(), Box<dyn Error>> { fn main() -> Result<(), Box<dyn Error>> {
@@ -80,43 +83,87 @@ fn main() -> Result<(), Box<dyn Error>> {
// error!("{}", e); // error!("{}", e);
// } // }
// } // }
Commands::Start {} => Node::run_master( Commands::Test1 {} => Cli::connect(
ConnectionConfig { "Test1".to_string(),
socket: SocketAddr::from_str("127.0.0.1:13370")?, vec![],
layers: vec![],
},
vec![ConnectionConfig { vec![ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13371")?, socket: SocketAddr::from_str("127.0.0.1:13371")?,
layers: vec![], layers: vec![],
}], }],
), ),
Commands::Middle {} => Node::run_node( Commands::Test2 {} => Cli::connect(
ConnectionConfig { "Test2".to_string(),
vec![ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13371")?, socket: SocketAddr::from_str("127.0.0.1:13371")?,
layers: vec![], layers: vec![],
}, }],
vec![ConnectionConfig { vec![ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13372")?, socket: SocketAddr::from_str("127.0.0.1:13372")?,
layers: vec![LayerConfig::Base64], layers: vec![],
}], }],
), ),
Commands::End {} => Node::run_node( Commands::Test3 {} => Cli::connect(
"Test3".to_string(),
vec![ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13372")?,
layers: vec![],
}],
vec![ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13373")?,
layers: vec![],
}],
),
Commands::Test4 {} => Cli::connect(
"Test4".to_string(),
vec![ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13371")?,
layers: vec![],
}],
vec![ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13374")?,
layers: vec![],
}],
),
Commands::Test5 {} => Cli::connect(
"Test5".to_string(),
vec![
ConnectionConfig { ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13372")?, socket: SocketAddr::from_str("127.0.0.1:13372")?,
layers: vec![LayerConfig::Base64], layers: vec![],
}, },
ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13374")?,
layers: vec![],
},
],
vec![ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13375")?,
layers: vec![],
}],
),
Commands::Test6 {} => Cli::connect(
"Test6".to_string(),
vec![
ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13373")?,
layers: vec![],
},
ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13375")?,
layers: vec![],
},
],
vec![], vec![],
), ),
// Commands::Connect { host, port } => {
Commands::Connect { host, port } => { // let addr = SocketAddr::from_str(format!("{}:{}", host, port).as_str());
let addr = SocketAddr::from_str(format!("{}:{}", host, port).as_str()); // Cli::connect(if let Ok(addr) = addr {
Cli::connect(if let Ok(addr) = addr { // addr
addr // } else {
} else { // error!("Could not parse address!");
error!("Could not parse address!"); // return Ok(());
return Ok(()); // })
}) // }
}
} { } {
error!("{}", e); error!("{}", e);
}; };
+1 -1
View File
@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use crate::layers::LayerConfig; use crate::layers::LayerConfig;
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ConnectionConfig { pub struct ConnectionConfig {
pub socket: SocketAddr, pub socket: SocketAddr,
pub layers: Vec<LayerConfig>, pub layers: Vec<LayerConfig>,
+1 -1
View File
@@ -4,5 +4,5 @@ mod packets;
pub use listener::ConnectionConfig; pub use listener::ConnectionConfig;
pub use node::Node; pub use node::Node;
pub use packets::PacketError; // pub use packets::PacketError;
pub use packets::Packets; pub use packets::Packets;
+366
View File
@@ -0,0 +1,366 @@
use std::{
collections::HashMap,
f32::consts::PI,
sync::{Arc, Mutex, MutexGuard},
thread::{self, Thread},
time::Duration,
};
use uuid::Uuid;
use crate::{
Error,
connection::{listener::ConnectionConfig, packets::Packets},
layers::build_client,
networkers::{ClientTrait, Connection, ServerTrait, TCPClient, TCPServer, run_listener_state},
};
pub struct Node {
id: String,
connections: HashMap<String, Box<dyn Connection + Send>>,
map: HashMap<String, Vec<String>>,
}
fn read(c: &mut Box<dyn Connection + Send>) -> Result<Packets, Error> {
let a = Packets::decode(c.read()?.as_str());
info!("Data: {:?}", a);
a
}
fn write(c: &mut Box<dyn Connection + Send>, packet: Packets) -> Result<(), Error> {
info!("Wrote: {:?}", packet);
c.write(packet.encode()?.as_str())
}
impl Node {
pub fn run_node(
id: String,
clients: Vec<ConnectionConfig>,
listeners: Vec<ConnectionConfig>,
) -> Result<(), Error> {
// let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?;
let state = Arc::new(Mutex::new(Self {
id: id, //Uuid::new_v4().to_string(), //TODO: Calling an OS RNG can pose a problem for security;
connections: HashMap::new(),
map: HashMap::new(),
}));
for listener in listeners {
run_listener_state(
TCPServer::bind(&listener.socket)?,
listener.layers,
Self::on_listener_client,
Arc::clone(&state),
);
}
for client in clients {
let state = Arc::clone(&state);
thread::spawn(move || {
loop {
if let Err(e) = Self::run_client(client.clone(), &state) {
error!("{}", e);
}
thread::sleep(Duration::from_millis(1000));
}
});
}
thread::sleep(Duration::MAX);
Ok(())
}
fn run_client(client: ConnectionConfig, state: &Arc<Mutex<Node>>) -> Result<(), Error> {
Self::run_connection(
build_client(TCPClient::connect(&client.socket)?, client.layers)?,
state,
);
Ok(())
}
fn on_listener_client(
connection: Box<dyn Connection + Send + 'static>,
state: Arc<Mutex<Node>>,
) {
thread::spawn(move || {
Self::run_connection(connection, &state);
});
}
fn run_connection(connection: Box<dyn Connection + Send + 'static>, state: &Arc<Mutex<Node>>) {
let mut connection = connection;
let s = state.lock().unwrap();
let this_uuid = s.id.clone();
std::mem::drop(s);
write(
&mut connection,
Packets::UpdateRoutes(this_uuid, (&mut state.lock().unwrap()).get_known_clients()),
)
.unwrap();
let other_uuid = if let Ok(Packets::UpdateRoutes(src, routes)) = read(&mut connection) {
(&mut state.lock().unwrap()).extend_routes(src.clone(), routes);
src
} else {
error!("Could not get UUID!");
return;
};
// info!("Connection: {}", other_uuid);
(&mut state.lock().unwrap())
.connections
.insert(other_uuid.clone(), connection.try_clone().unwrap());
// (&mut state.lock().unwrap()).connect(other_uuid.clone(), None);
// let is_root = s.parent.is_none();
loop {
match read(&mut connection) {
Ok(packet) => {
let result: Result<(), Error> = match packet {
// Packets::UpdateRoutes(src, routes) => {
// Ok((&mut state.lock().unwrap()).extend_routes(src, routes))
// }
// Packets::Connect(id) => {
// let source = if other_uuid == id {
// None
// } else {
// Some(other_uuid.clone())
// };
// Ok((&mut state.lock().unwrap()).connect(id, source))
// }
Packets::Disconnect(id) => {
let direct = other_uuid == id;
Ok((&mut state.lock().unwrap()).disconnect(id, direct))
}
Packets::UpdateRoutes(src, routes) => {
Ok((&mut state.lock().unwrap()).extend_routes(src, routes))
}
_ => {
error!("Unsupported packet: {:?}", packet);
Ok(())
}
};
if let Err(e) = result {
error!("Got error: {}", e);
}
}
Err(e) => {
if !connection.is_alive() {
warn!("Connection {} Disconnected!", connection.get_info());
let state = &mut state.lock().unwrap();
state.connections.remove(&other_uuid);
state.disconnect(other_uuid, true);
break;
}
error!("Got error: {}", e);
}
}
}
}
// fn get_best_route(&self, uuid: &String) -> Result<(usize, String), Error> {
// let routes = self.map.get(uuid).ok_or("Route does not exist!")?;
// if routes.is_empty() {
// return Ok((0, uuid.clone()));
// }
// let mut min_hops = usize::MAX;
// let mut min_uuid = "".to_string();
// for (hops, uuid) in routes.iter() {
// if hops < &min_hops {
// min_hops = hops.clone();
// min_uuid = uuid.clone();
// }
// }
// Ok((min_hops, min_uuid))
// }
// fn route_get_index_of_uuid(routes: &Vec<(usize, String)>, uuid: &String) -> Option<usize> {
// for (i, route) in routes.iter().enumerate() {
// if &route.1 == uuid {
// return Some(i);
// }
// }
// return None;
// }
// fn route_inc_one(routes: Vec<(usize, String)>) -> Vec<(usize, String)> {
// routes.iter().map(|r| (r.0 + 1, *r.1)).collect()
// }
// fn get_known_routes(&self) -> Vec<(usize, String)> {
// let mut routes = self
// .map
// .keys()
// .map(|k| self.get_best_route(k).unwrap())
// .collect::<Vec<(usize, String)>>();
// routes.push((0, self.id.clone()));
// println!("Known routes: {:?}", routes);
// routes
// }
fn get_known_clients(&self) -> Vec<String> {
let mut clients = self.map.keys().map(|k| k.clone()).collect::<Vec<String>>();
clients.push(self.id.clone());
clients
}
fn knows_client(&self, id: &String) -> bool {
self.get_known_clients().contains(id)
// self.connections.contains_key(id)
}
fn broadcast(&mut self, data: Packets, disclude: Option<&String>) {
for (uuid, connection) in self.connections.iter_mut() {
if disclude.is_some() && disclude.unwrap() == uuid {
continue;
}
if let Err(e) = write(connection, data.clone()) {
error!("Failed to send packet to {}, {}", uuid, e);
}
}
}
fn broadcast_table(&mut self, disclude: Option<&String>) {
self.broadcast(
Packets::UpdateRoutes(self.id.clone(), self.get_known_clients()),
disclude,
);
}
// fn connect(&mut self, id: String, source: Option<String>) {
// if !self.knows_client(&id) && id != self.id {
// self.broadcast(Packets::Connect(id.clone()), Some(&id));
// if let Some(source) = source {
// // Is direct
// self.extend_routes(id.clone(), vec![(9999, source.clone())]);
// self.broadcast_table(None);
// } else {
// self.map.insert(id.clone(), Vec::new());
// self.broadcast_table(None);
// }
// self.print_map();
// }
// }
fn disconnect(&mut self, id: String, direct: bool) {
if self.knows_client(&id) {
self.broadcast(Packets::Disconnect(id.clone()), None);
if direct {
for (i, uuid) in self.get_known_clients().iter().enumerate() {
// if let Some(i) =
// Self::route_get_index_of_uuid(self.map.get(&uuid).unwrap(), &uuid)
// {
// self.map.get_mut(&uuid).unwrap().remove(i);
// self.broadcast(Packets::Disconnect(uuid.clone()), Some(&uuid));
// }
if self.map.contains_key(uuid) {
self.map.get_mut(uuid).unwrap().remove(i);
self.broadcast(Packets::Disconnect(uuid.clone()), Some(&uuid));
}
}
}
self.map.remove(&id);
// self.broadcast_table(Some(&id));
self.print_map();
}
}
fn extend_routes(&mut self, src: String, routes: Vec<String>) {
let mut updated = false;
println!("{:?}", routes);
for route in routes {
if route == self.id {
continue;
}
if self.map.contains_key(&route) {
// println!("{:?}", route);
// let update = if let Some(i) =
// Self::route_get_index_of_uuid(self.map.get(&route).unwrap(), &route)
// {
// println!(
// "contains, {}, {:?} {:?}",
// i,
// self.map.get(&route.1).unwrap(),
// route
// );
// if self.map.get(&route.1).unwrap()[i].0 != route.0 {
// let _ = self.map.get_mut(&route.1).unwrap().remove(i);
// println!("true");
// true
// } else {
// println!("false");
// false
// }
// // println!("{:?}", self.map.get(&route.1).unwrap());
// // true
// } else {
// true
// };
// if update {
// self.map
// .get_mut(&route.1)
// .unwrap()
// .push((route.0 + 1, src.clone()));
// updated = true;
// }
if !self.map.get(&route).unwrap().contains(&route) {
self.map.get_mut(&route).unwrap().push(src.clone());
updated = true;
}
} else {
self.map.insert(route.clone(), vec![route]);
updated = true;
}
}
if updated {
self.broadcast_table(None);
self.print_map();
}
}
fn print_map(&self) {
info!("\n\n");
info!("Local addr: {}", self.id);
info!("Table: ");
for (uuid, route) in self.map.iter() {
info!("{} -> [ {:?} ]", uuid, route);
}
}
}
+163 -168
View File
@@ -1,12 +1,10 @@
use std::{ use std::{
f32::consts::PI, collections::HashMap,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
thread, thread,
time::Duration, time::Duration,
}; };
use uuid::Uuid;
use crate::{ use crate::{
Error, Error,
connection::{listener::ConnectionConfig, packets::Packets}, connection::{listener::ConnectionConfig, packets::Packets},
@@ -15,189 +13,125 @@ use crate::{
}; };
pub struct Node { pub struct Node {
// parent: Box<dyn Connection + Send + Sync>, id: String,
clients: Vec<Client>, connections: HashMap<String, Box<dyn Connection + Send>>,
} map: HashMap<String, Vec<String>>,
pub struct Client {
connection: Box<dyn Connection + Send>,
uuid: String,
route: Vec<String>,
}
impl Client {
pub fn get_info(&self) -> String {
format!("{} ({})", self.uuid, self.route.join("->"))
}
} }
fn read(c: &mut Box<dyn Connection + Send>) -> Result<Packets, Error> { fn read(c: &mut Box<dyn Connection + Send>) -> Result<Packets, Error> {
Packets::decode(c.read()?.as_str()) let a = Packets::decode(c.read()?.as_str());
info!("Data: {:?}", a);
a
} }
fn write(c: &mut Box<dyn Connection + Send>, packet: Packets) -> Result<(), Error> { fn write(c: &mut Box<dyn Connection + Send>, packet: Packets) -> Result<(), Error> {
info!("Wrote: {:?}", packet);
c.write(packet.encode()?.as_str()) c.write(packet.encode()?.as_str())
} }
impl Node { impl Node {
fn run_listeners(
state: &Arc<Mutex<Self>>,
listeners: Vec<ConnectionConfig>,
) -> Result<(), Error> {
// Start server listeners
for listener in listeners {
run_listener_state(
TCPServer::bind(&listener.socket)?,
listener.layers,
Self::on_listener_client,
Arc::clone(state),
);
}
Ok(())
}
pub fn run_node( pub fn run_node(
parent: ConnectionConfig, id: String,
listeners: Vec<ConnectionConfig>, clients: Vec<ConnectionConfig>,
) -> Result<(), Error> {
let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?;
let state = Arc::new(Mutex::new(Self {
// parent: parent_clone,
clients: Vec::new(),
}));
Self::run_listeners(&state, listeners)?;
while parent.is_alive() {
match read(&mut parent) {
Ok(packet) => match packet {
Packets::GetRoutes => write(
&mut parent,
Packets::UpdateRoutes(state.lock().unwrap().get_routes()),
)?,
_ => {}
},
Err(e) => {
error!("Error: {}", e)
}
}
}
Ok(())
}
pub fn run_master(
server: ConnectionConfig,
listeners: Vec<ConnectionConfig>, listeners: Vec<ConnectionConfig>,
) -> Result<(), Error> { ) -> Result<(), Error> {
// let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?; // let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?;
let state = Arc::new(Mutex::new(Self { let state = Arc::new(Mutex::new(Self {
// parent: parent_clone, id: id, //Uuid::new_v4().to_string(), //TODO: Calling an OS RNG can pose a problem for security;
clients: Vec::new(), connections: HashMap::new(),
map: HashMap::new(),
})); }));
for listener in listeners {
run_listener_state( run_listener_state(
TCPServer::bind(&server.socket)?, TCPServer::bind(&listener.socket)?,
server.layers, listener.layers,
Self::on_command_client, Self::on_listener_client,
Arc::clone(&state), Arc::clone(&state),
); );
}
Self::run_listeners(&state, listeners)?; for client in clients {
let state = Arc::clone(&state);
thread::spawn(move || {
loop {
if let Err(e) = Self::run_client(client.clone(), &state) {
error!("{}", e);
}
thread::sleep(Duration::from_millis(1000));
}
});
}
thread::sleep(Duration::MAX); thread::sleep(Duration::MAX);
Ok(()) Ok(())
} }
fn on_command_client( fn run_client(client: ConnectionConfig, state: &Arc<Mutex<Node>>) -> Result<(), Error> {
connection: Box<dyn Connection + Send + 'static>, Self::run_connection(
state: Arc<Mutex<Node>>, build_client(TCPClient::connect(&client.socket)?, client.layers)?,
) { state,
thread::spawn(move || { )?;
let mut connection = connection;
loop {
match read(&mut connection) {
Ok(packet) => {
let result = match packet {
Packets::GetConnections => write(
&mut connection,
Packets::UpdateConnections(state.lock().unwrap().get_clients()),
),
Packets::GetRoutes => write(
&mut connection,
Packets::UpdateRoutes(state.lock().unwrap().get_routes()),
),
_ => {
error!("Unsupported packet: {:?}", packet);
Ok(()) Ok(())
} }
};
if let Err(e) = result {
error!("Got error: {}", e);
}
}
Err(e) => {
if !connection.is_alive() {
warn!("Connection {} disconnected!", connection.get_info());
break;
} else {
error!("Got error: {}", e);
}
}
}
}
});
}
fn on_listener_client( fn on_listener_client(
connection: Box<dyn Connection + Send + 'static>, connection: Box<dyn Connection + Send + 'static>,
state: Arc<Mutex<Node>>, state: Arc<Mutex<Node>>,
) { ) {
thread::spawn(move || { thread::spawn(move || {
let mut connection = connection; if let Err(e) = Self::run_connection(connection, &state) {
let mut s = state.lock().unwrap(); error!("{}", e);
let index = s.clients.len(); }
let uuid = Uuid::new_v4().to_string(); //TODO: Calling an OS RNG can pose a problem for security;
s.clients.push(Client {
uuid: uuid.clone(),
connection: connection.try_clone().unwrap(),
route: vec![uuid],
}); });
}
fn run_connection(
connection: Box<dyn Connection + Send + 'static>,
state: &Arc<Mutex<Node>>,
) -> Result<(), Error> {
let mut connection = connection;
let s = state.lock().unwrap();
let this_uuid = s.id.clone();
std::mem::drop(s);
write( write(
&mut connection, &mut connection,
Packets::OnClientConnect { Packets::UpdateRoutes(
id: s.clients.last().unwrap().uuid.clone(), this_uuid,
route: s.clients.last().unwrap().route.clone(), (&mut state.lock().unwrap()).get_self_and_known_clients(),
}, ),
) )?;
.unwrap();
std::mem::drop(s); let other_uuid = if let Packets::UpdateRoutes(src, routes) = read(&mut connection)? {
(&mut state.lock().unwrap()).extend_routes(src.clone(), routes);
src
} else {
return Err("Could not get UUID!".into());
};
// let is_root = s.parent.is_none(); // info!("Connection: {}", other_uuid);
(&mut state.lock().unwrap())
.connections
.insert(other_uuid.clone(), connection.try_clone()?);
loop { loop {
match read(&mut connection) { match read(&mut connection) {
Ok(packet) => { Ok(packet) => {
let result = match packet { let result: Result<(), Error> = match packet {
Packets::GetConnections => write( Packets::Disconnect(id) => {
&mut connection, let direct = other_uuid == id;
Packets::UpdateConnections(state.lock().unwrap().get_clients()), Ok((&mut state.lock().unwrap()).disconnect(id, direct))
), }
Packets::GetRoutes => write( Packets::UpdateRoutes(src, routes) => {
&mut connection, Ok((&mut state.lock().unwrap()).extend_routes(src, routes))
Packets::UpdateRoutes(state.lock().unwrap().get_routes()), }
),
Packets::OnClientConnect { id, route } => Ok(()),
_ => { _ => {
error!("Unsupported packet: {:?}", packet); error!("Unsupported packet: {:?}", packet);
@@ -211,8 +145,11 @@ impl Node {
} }
Err(e) => { Err(e) => {
if !connection.is_alive() { if !connection.is_alive() {
(&mut state.lock().unwrap()).clients.remove(index);
warn!("Connection {} Disconnected!", connection.get_info()); warn!("Connection {} Disconnected!", connection.get_info());
let state = &mut state.lock().unwrap();
state.connections.remove(&other_uuid);
state.disconnect(other_uuid, true);
break; break;
} }
@@ -220,44 +157,102 @@ impl Node {
} }
} }
} }
});
Ok(())
} }
fn get_clients(&self) -> Vec<String> { fn get_known_clients(&self) -> Vec<String> {
self.clients self.map.keys().map(|k| k.clone()).collect::<Vec<String>>()
.iter()
.map(|c| format!("Client {}", c.get_info()))
.collect()
} }
fn get_routes(&mut self) -> Vec<String> { fn get_self_and_known_clients(&self) -> Vec<String> {
let mut routes = Vec::new(); let mut clients = self.get_known_clients();
clients.push(self.id.clone());
for client in &mut self.clients { clients
let prefix = client.connection.get_info();
routes.push(prefix.clone());
if let Err(e) = write(&mut client.connection, Packets::GetRoutes) {
error!("Failed to send packet: {}", e);
} }
if let Ok(Packets::UpdateRoutes(new_routes)) = read(&mut client.connection) { fn knows_client(&self, id: &String) -> bool {
routes.append( self.get_known_clients().contains(id)
new_routes }
.iter()
.map(|c| format!("{} -> {}", prefix, c)) fn broadcast(&mut self, data: Packets, disclude: Option<&String>) {
.collect::<Vec<String>>() for (uuid, connection) in self.connections.iter_mut() {
.as_mut(), if disclude.is_some() && disclude.unwrap() == uuid {
continue;
}
if let Err(e) = write(connection, data.clone()) {
error!("Failed to send packet to {}, {}", uuid, e);
}
}
}
fn broadcast_table(&mut self, disclude: Option<&String>) {
self.broadcast(
Packets::UpdateRoutes(self.id.clone(), self.get_self_and_known_clients()),
disclude,
); );
} }
fn disconnect(&mut self, id: String, direct: bool) {
if self.knows_client(&id) {
self.broadcast(Packets::Disconnect(id.clone()), None);
if direct {
for uuid in self.get_known_clients() {
if self.map.get(&uuid).unwrap().contains(&id) {
let index = self
.map
.get_mut(&uuid)
.unwrap()
.iter()
.position(|r| r == &id)
.unwrap();
self.map.get_mut(&uuid).unwrap().remove(index);
// self.broadcast(Packets::Disconnect(uuid.clone()), Some(&uuid));
}
}
} }
routes self.map.remove(&id);
// self.clients self.print_map();
// .iter() }
// .map(|c| format!("Client {}", c.get_info())) }
// .collect()
fn extend_routes(&mut self, src: String, routes: Vec<String>) {
let mut updated = false;
println!("{:?}", routes);
for route in routes {
if route == self.id {
continue;
}
if self.map.contains_key(&route) {
if !self.map.get(&route).unwrap().contains(&src) {
self.map.get_mut(&route).unwrap().push(src.clone());
updated = true;
}
} else {
self.map.insert(route.clone(), vec![src.clone()]);
updated = true;
}
}
if updated {
self.broadcast_table(None);
self.print_map();
}
}
fn print_map(&self) {
info!("\n\n");
info!("Local addr: {}", self.id);
info!("Table: ");
for (uuid, route) in self.map.iter() {
info!("{} -> [ {:?} ]", uuid, route);
}
} }
} }
+5 -16
View File
@@ -2,23 +2,12 @@ use serde::{Deserialize, Serialize};
use crate::Error; use crate::Error;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub enum Packets { pub enum Packets {
GetConnections, UpdateRoutes(String, Vec<String>),
UpdateConnections(Vec<String>), Connect(String),
Disconnect(String),
GetRoutes, Data { source: String, data: String },
UpdateRoutes(Vec<String>),
OnClientConnect { id: String, route: Vec<String> },
OnClientDisconnect { id: String },
Error(PacketError),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum PacketError {
UnsupportedType,
} }
impl Packets { impl Packets {
+1 -1
View File
@@ -1,7 +1,7 @@
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub enum LayerConfig { pub enum LayerConfig {
Base64, Base64,
Handshake, Handshake,