Work on tree and routing

This commit is contained in:
Michael Mikovsky
2025-06-10 06:12:18 -06:00
parent a148e4e0a8
commit c5f6e2920c
18 changed files with 543 additions and 238 deletions
+1
View File
@@ -13,6 +13,7 @@ serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140" serde_json = "1.0.140"
# slint = "1.11.0" # slint = "1.11.0"
unshell-rs-lib = { path = "./unshell-rs-lib" } unshell-rs-lib = { path = "./unshell-rs-lib" }
uuid = { version = "1.17.0", features = ["v4"] }
# [build-dependencies] # [build-dependencies]
+50 -11
View File
@@ -1,24 +1,51 @@
use std::{error::Error, io::Write, net::SocketAddr}; use std::{io::Write, net::SocketAddr, thread};
use unshell_rs_lib::{ use unshell_rs_lib::{
layers::{LayerConfig, build_client}, Error,
connection::{PacketError, Packets},
layers::build_client,
networkers::{ClientTrait, Connection, TCPClient}, networkers::{ClientTrait, Connection, TCPClient},
}; };
use crate::client;
pub struct Cli; pub struct Cli;
impl Cli { impl Cli {
pub fn connect(addr: SocketAddr) -> Result<(), Box<dyn Error>> { pub fn connect(addr: SocketAddr) -> Result<(), Error> {
let mut client = build_client( let mut client = build_client(TCPClient::connect(&addr)?, vec![])?;
TCPClient::connect(&addr)?,
vec![LayerConfig::Handshake, LayerConfig::Base64],
)?;
let stdin = std::io::stdin(); let stdin = std::io::stdin();
let mut stdout = std::io::stdout(); let mut stdout = std::io::stdout();
let mut client_clone = client.try_clone()?;
thread::spawn(move || {
// let data = client.read()?;
let packet = Packets::decode(client_clone.read().unwrap().as_str()).unwrap();
match packet {
Packets::UpdateConnections(items) => {
for item in items {
println!("{}", item);
}
}
Packets::UpdateRoutes(items) => {
for item in items {
println!("{}", item);
}
}
_ => {
client_clone
.write(
Packets::Error(PacketError::UnsupportedType)
.encode()
.unwrap()
.as_str(),
)
.unwrap();
warn!("Invalid packet: {:?}", packet)
}
}
});
loop { loop {
print!("> "); print!("> ");
stdout.flush()?; stdout.flush()?;
@@ -27,7 +54,19 @@ impl Cli {
stdin.read_line(&mut input)?; stdin.read_line(&mut input)?;
let input = input.trim(); let input = input.trim();
client.write(input)?; match input.split(" ").nth(0).unwrap() {
"clients" => {
client.write(Packets::GetConnections.encode()?.as_str())?;
}
"routes" => {
client.write(Packets::GetRoutes.encode()?.as_str())?;
}
_ => {
warn!("Invalid command!")
}
}
// client.write(input)?;
} }
} }
} }
-1
View File
@@ -1,3 +1,2 @@
mod cli; mod cli;
pub use cli::Cli; pub use cli::Cli;
+61 -37
View File
@@ -8,9 +8,10 @@ use std::{
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use log::error; use log::error;
use unshell_rs::Cli; use unshell_rs::Cli;
use unshell_rs_lib::connection::Node; use unshell_rs_lib::{
// use unshell_rs::{UnshellClient, UnshellGui, UnshellServer}; connection::{ConnectionConfig, Node},
// use unshell_rs layers::LayerConfig,
};
pub static DEFAULT_CONFIG_FILEPATH: &'static str = "server_config.json"; pub static DEFAULT_CONFIG_FILEPATH: &'static str = "server_config.json";
@@ -32,23 +33,27 @@ struct Args {
#[derive(Debug, Subcommand)] #[derive(Debug, Subcommand)]
enum Commands { enum Commands {
/// Run as a service, and potentially hosting a website Start,
#[command(arg_required_else_help = true)] Middle,
Relay { End,
/// IPv4 to listen for clients on.
host: String,
/// Port listen to for command clients // Run as a service, and potentially hosting a website
#[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)] // #[command(arg_required_else_help = true)]
port: u16, // Relay {
// /// IPv4 to listen for clients on.
// host: String,
/// Json file to store config // /// Port listen to for command clients
#[arg(short, long, default_value_t = DEFAULT_CONFIG_FILEPATH.to_string())]
config_filepath: String,
// /// Port to listen for website traffic (0 is disabled)
// #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)] // #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)]
// web_port: u16, // port: u16,
},
// /// Json file to store config
// #[arg(short, long, default_value_t = DEFAULT_CONFIG_FILEPATH.to_string())]
// config_filepath: String,
// // /// Port to listen for website traffic (0 is disabled)
// // #[arg(short, long, default_value_t = DEFAULT_SERVICE_PORT)]
// // web_port: u16,
// },
/// Connect to remote server /// Connect to remote server
Connect { Connect {
/// Remote server to connect to /// Remote server to connect to
@@ -68,33 +73,52 @@ fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init(); pretty_env_logger::init();
let args = Args::parse(); let args = Args::parse();
match args.command { if let Err(e) = match args.command {
Commands::Relay { // Commands::Relay { host, port, .. } => {
host, // let addr = SocketAddr::from_str(format!("{}:{}", host, port).as_str());
port, // if let Err(e) = Node::run() {
config_filepath, // error!("{}", e);
} => { // }
let addr = SocketAddr::from_str(format!("{}:{}", host, port).as_str()); // }
if let Err(e) = Node::run(if let Ok(addr) = addr { Commands::Start {} => Node::run_master(
addr ConnectionConfig {
} else { socket: SocketAddr::from_str("127.0.0.1:13370")?,
error!("Could not parse address!"); layers: vec![],
return Ok(()); },
}) { vec![ConnectionConfig {
error!("{}", e); socket: SocketAddr::from_str("127.0.0.1:13371")?,
} layers: vec![],
} }],
),
Commands::Middle {} => Node::run_node(
ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13371")?,
layers: vec![],
},
vec![ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13372")?,
layers: vec![LayerConfig::Base64],
}],
),
Commands::End {} => Node::run_node(
ConnectionConfig {
socket: SocketAddr::from_str("127.0.0.1:13372")?,
layers: vec![LayerConfig::Base64],
},
vec![],
),
Commands::Connect { host, port } => { Commands::Connect { host, port } => {
let addr = SocketAddr::from_str(format!("{}:{}", host, port).as_str()); let addr = SocketAddr::from_str(format!("{}:{}", host, port).as_str());
if let Err(e) = Cli::connect(if let Ok(addr) = addr { Cli::connect(if let Ok(addr) = addr {
addr addr
} else { } else {
error!("Could not parse address!"); error!("Could not parse address!");
return Ok(()); return Ok(());
}) { })
}
} {
error!("{}", e); error!("{}", e);
}
}
}; };
Ok(()) Ok(())
+3
View File
@@ -0,0 +1,3 @@
mod cli;
pub use cli::Cli;
+1
View File
@@ -8,3 +8,4 @@ crossbeam-channel = "0.5.15"
log = "0.4.27" log = "0.4.27"
serde = { version = "1.0.219", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140" serde_json = "1.0.140"
uuid = { version = "1.17.0", features = ["v4"] }
+11
View File
@@ -0,0 +1,11 @@
use std::net::SocketAddr;
use serde::{Deserialize, Serialize};
use crate::layers::LayerConfig;
#[derive(Serialize, Deserialize, Debug)]
pub struct ConnectionConfig {
pub socket: SocketAddr,
pub layers: Vec<LayerConfig>,
}
+6
View File
@@ -1,2 +1,8 @@
mod listener;
mod node; mod node;
mod packets;
pub use listener::ConnectionConfig;
pub use node::Node; pub use node::Node;
pub use packets::PacketError;
pub use packets::Packets;
+251 -25
View File
@@ -1,37 +1,263 @@
use std::{net::SocketAddr, thread}; use std::{
f32::consts::PI,
sync::{Arc, Mutex},
thread,
time::Duration,
};
use uuid::Uuid;
use crate::{ use crate::{
Error, Error,
layers::LayerConfig, connection::{listener::ConnectionConfig, packets::Packets},
networkers::{Connection, ServerTrait, TCPConnection, TCPServer, run_listener}, layers::build_client,
networkers::{ClientTrait, Connection, ServerTrait, TCPClient, TCPServer, run_listener_state},
}; };
pub struct Node; pub struct Node {
// parent: Box<dyn Connection + Send + Sync>,
clients: Vec<Client>,
}
pub struct Client {
connection: Box<dyn Connection + Send>,
uuid: String,
route: Vec<String>,
}
impl Client {
pub fn get_info(&self) -> String {
format!("{} ({})", self.uuid, self.route.join("->"))
}
}
fn read(c: &mut Box<dyn Connection + Send>) -> Result<Packets, Error> {
Packets::decode(c.read()?.as_str())
}
fn write(c: &mut Box<dyn Connection + Send>, packet: Packets) -> Result<(), Error> {
c.write(packet.encode()?.as_str())
}
impl Node { impl Node {
pub fn run(addr: SocketAddr) -> Result<(), Error> { fn run_listeners(
let layers = vec![LayerConfig::Handshake, LayerConfig::Base64]; state: &Arc<Mutex<Self>>,
listeners: Vec<ConnectionConfig>,
run_listener( ) -> Result<(), Error> {
TCPServer::bind(&addr)?, // Start server listeners
layers, for listener in listeners {
|connection: Box<dyn Connection + Send + 'static>| { run_listener_state(
thread::spawn(move || { TCPServer::bind(&listener.socket)?,
let mut connection = connection; listener.layers,
Self::on_listener_client,
loop { Arc::clone(state),
if let Ok(data) = connection.read() {
if !connection.is_alive() {
warn!("{} Disconnected!", connection.get_info());
break;
}
println!("Data: {}", data);
}
}
});
},
); );
}
Ok(()) Ok(())
} }
pub fn run_node(
parent: ConnectionConfig,
listeners: Vec<ConnectionConfig>,
) -> Result<(), Error> {
let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?;
let state = Arc::new(Mutex::new(Self {
// parent: parent_clone,
clients: Vec::new(),
}));
Self::run_listeners(&state, listeners)?;
while parent.is_alive() {
match read(&mut parent) {
Ok(packet) => match packet {
Packets::GetRoutes => write(
&mut parent,
Packets::UpdateRoutes(state.lock().unwrap().get_routes()),
)?,
_ => {}
},
Err(e) => {
error!("Error: {}", e)
}
}
}
Ok(())
}
pub fn run_master(
server: ConnectionConfig,
listeners: Vec<ConnectionConfig>,
) -> Result<(), Error> {
// let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?;
let state = Arc::new(Mutex::new(Self {
// parent: parent_clone,
clients: Vec::new(),
}));
run_listener_state(
TCPServer::bind(&server.socket)?,
server.layers,
Self::on_command_client,
Arc::clone(&state),
);
Self::run_listeners(&state, listeners)?;
thread::sleep(Duration::MAX);
Ok(())
}
fn on_command_client(
connection: Box<dyn Connection + Send + 'static>,
state: Arc<Mutex<Node>>,
) {
thread::spawn(move || {
let mut connection = connection;
loop {
match read(&mut connection) {
Ok(packet) => {
let result = match packet {
Packets::GetConnections => write(
&mut connection,
Packets::UpdateConnections(state.lock().unwrap().get_clients()),
),
Packets::GetRoutes => write(
&mut connection,
Packets::UpdateRoutes(state.lock().unwrap().get_routes()),
),
_ => {
error!("Unsupported packet: {:?}", packet);
Ok(())
}
};
if let Err(e) = result {
error!("Got error: {}", e);
}
}
Err(e) => {
if !connection.is_alive() {
warn!("Connection {} disconnected!", connection.get_info());
break;
} else {
error!("Got error: {}", e);
}
}
}
}
});
}
fn on_listener_client(
connection: Box<dyn Connection + Send + 'static>,
state: Arc<Mutex<Node>>,
) {
thread::spawn(move || {
let mut connection = connection;
let mut s = state.lock().unwrap();
let index = s.clients.len();
let uuid = Uuid::new_v4().to_string(); //TODO: Calling an OS RNG can pose a problem for security;
s.clients.push(Client {
uuid: uuid.clone(),
connection: connection.try_clone().unwrap(),
route: vec![uuid],
});
write(
&mut connection,
Packets::OnClientConnect {
id: s.clients.last().unwrap().uuid.clone(),
route: s.clients.last().unwrap().route.clone(),
},
)
.unwrap();
std::mem::drop(s);
// let is_root = s.parent.is_none();
loop {
match read(&mut connection) {
Ok(packet) => {
let result = match packet {
Packets::GetConnections => write(
&mut connection,
Packets::UpdateConnections(state.lock().unwrap().get_clients()),
),
Packets::GetRoutes => write(
&mut connection,
Packets::UpdateRoutes(state.lock().unwrap().get_routes()),
),
Packets::OnClientConnect { id, route } => Ok(()),
_ => {
error!("Unsupported packet: {:?}", packet);
Ok(())
}
};
if let Err(e) = result {
error!("Got error: {}", e);
}
}
Err(e) => {
if !connection.is_alive() {
(&mut state.lock().unwrap()).clients.remove(index);
warn!("Connection {} Disconnected!", connection.get_info());
break;
}
error!("Got error: {}", e);
}
}
}
});
}
fn get_clients(&self) -> Vec<String> {
self.clients
.iter()
.map(|c| format!("Client {}", c.get_info()))
.collect()
}
fn get_routes(&mut self) -> Vec<String> {
let mut routes = Vec::new();
for client in &mut self.clients {
let prefix = client.connection.get_info();
routes.push(prefix.clone());
if let Err(e) = write(&mut client.connection, Packets::GetRoutes) {
error!("Failed to send packet: {}", e);
}
if let Ok(Packets::UpdateRoutes(new_routes)) = read(&mut client.connection) {
routes.append(
new_routes
.iter()
.map(|c| format!("{} -> {}", prefix, c))
.collect::<Vec<String>>()
.as_mut(),
);
}
}
routes
// self.clients
// .iter()
// .map(|c| format!("Client {}", c.get_info()))
// .collect()
}
} }
+31
View File
@@ -0,0 +1,31 @@
use serde::{Deserialize, Serialize};
use crate::Error;
#[derive(Debug, Serialize, Deserialize)]
pub enum Packets {
GetConnections,
UpdateConnections(Vec<String>),
GetRoutes,
UpdateRoutes(Vec<String>),
OnClientConnect { id: String, route: Vec<String> },
OnClientDisconnect { id: String },
Error(PacketError),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum PacketError {
UnsupportedType,
}
impl Packets {
pub fn encode(&self) -> Result<String, Error> {
Ok(serde_json::to_string(self)?)
}
pub fn decode(string: &str) -> Result<Self, Error> {
Ok(serde_json::from_str::<Self>(string)?)
}
}
+11 -10
View File
@@ -3,13 +3,12 @@ use crate::{
networkers::{Connection, ProtocolLayer}, networkers::{Connection, ProtocolLayer},
}; };
use base64::{Engine as _, engine::general_purpose}; use base64::{Engine as _, engine::general_purpose};
use serde::{Deserialize, Serialize};
pub struct Base64Layer<C: Connection> { pub struct Base64Layer {
inner: C, inner: Box<dyn Connection>,
} }
impl<C: Connection> Connection for Base64Layer<C> { impl Connection for Base64Layer {
fn get_info(&self) -> String { fn get_info(&self) -> String {
format!("b64->{}", self.inner.get_info()) format!("b64->{}", self.inner.get_info())
} }
@@ -29,16 +28,18 @@ impl<C: Connection> Connection for Base64Layer<C> {
} }
fn write(&mut self, data: &str) -> Result<(), Error> { fn write(&mut self, data: &str) -> Result<(), Error> {
info!("Bsae"); self.inner.write(&general_purpose::STANDARD.encode(data))
}
self.inner.write(&general_purpose::STANDARD.encode(data))?; fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, Error> {
Ok(Box::new(Self {
Ok(()) inner: self.inner.try_clone()?,
}))
} }
} }
impl<C: Connection> ProtocolLayer<C> for Base64Layer<C> { impl ProtocolLayer for Base64Layer {
fn new(inner: C) -> Result<Self, Error> { fn new(inner: Box<dyn Connection>) -> Result<Self, Error> {
Ok(Base64Layer { inner }) Ok(Base64Layer { inner })
} }
} }
+9 -2
View File
@@ -4,7 +4,7 @@ use crate::{
networkers::{Connection, ProtocolLayer}, networkers::{Connection, ProtocolLayer},
}; };
impl Connection for Box<dyn Connection + Send> { impl Connection for Box<dyn Connection + Send + Sync> {
fn get_info(&self) -> String { fn get_info(&self) -> String {
(**self).get_info() (**self).get_info()
} }
@@ -20,9 +20,16 @@ impl Connection for Box<dyn Connection + Send> {
fn write(&mut self, data: &str) -> Result<(), Error> { fn write(&mut self, data: &str) -> Result<(), Error> {
(**self).write(data) (**self).write(data)
} }
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, Error> {
Ok(Box::new((**self).try_clone()?))
}
} }
pub fn build_client<C>(base_conn: C, layers: Vec<LayerConfig>) -> Result<Box<dyn Connection>, Error> pub fn build_client<C>(
base_conn: C,
layers: Vec<LayerConfig>,
) -> Result<Box<dyn Connection + Send>, Error>
where where
C: Connection + 'static, C: Connection + 'static,
{ {
+26 -27
View File
@@ -1,17 +1,19 @@
use crate::{ use std::sync::{
layers::Base64Layer, Arc,
networkers::{Connection, ProtocolLayer}, atomic::{AtomicBool, Ordering},
}; };
use crate::networkers::{Connection, ProtocolLayer};
type Error = Box<dyn std::error::Error>; type Error = Box<dyn std::error::Error>;
// 4-Way Handshake Layer // 4-Way Handshake Layer
pub struct HandshakeLayer<C: Connection> { pub struct HandshakeLayer {
inner: C, inner: Box<dyn Connection>,
finished_handshake: bool, finished_handshake: Arc<AtomicBool>,
} }
impl<C: Connection> Connection for HandshakeLayer<C> { impl Connection for HandshakeLayer {
fn get_info(&self) -> String { fn get_info(&self) -> String {
format!("handshake->{}", self.inner.get_info()) format!("handshake->{}", self.inner.get_info())
} }
@@ -21,83 +23,80 @@ impl<C: Connection> Connection for HandshakeLayer<C> {
} }
fn read(&mut self) -> Result<String, Error> { fn read(&mut self) -> Result<String, Error> {
if !self.finished_handshake { if !self.finished_handshake.load(Ordering::Relaxed) {
return Err("NotComplete".into()); return Err("NotComplete".into());
} }
self.inner.read() self.inner.read()
} }
fn write(&mut self, data: &str) -> Result<(), Error> { fn write(&mut self, data: &str) -> Result<(), Error> {
if !self.finished_handshake { if !self.finished_handshake.load(Ordering::Relaxed) {
return Err("NotComplete".into()); return Err("NotComplete".into());
} }
self.inner.write(data) self.inner.write(data)
} }
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, crate::Error> {
Ok(Box::new(Self {
inner: self.inner.try_clone()?,
finished_handshake: Arc::clone(&self.finished_handshake.clone()),
}))
}
} }
impl<C: Connection + 'static> ProtocolLayer<C> for HandshakeLayer<C> { impl ProtocolLayer for HandshakeLayer {
fn new(inner: C) -> Result<Self, Error> { fn new(inner: Box<dyn Connection>) -> Result<Self, Error> {
Ok(HandshakeLayer { Ok(HandshakeLayer {
inner, inner,
finished_handshake: false, finished_handshake: Arc::new(AtomicBool::new(false)),
}) })
} }
fn initialize_client(&mut self) -> Result<(), Error> { fn initialize_client(&mut self) -> Result<(), Error> {
println!("Starting client handshake...");
// Step 1: Client sends SYN // Step 1: Client sends SYN
self.inner.write("SYN")?; self.inner.write("SYN")?;
println!("Client: Sent SYN");
// Step 2: Client receives SYN-ACK // Step 2: Client receives SYN-ACK
let response = self.inner.read()?; let response = self.inner.read()?;
if response != "SYN-ACK" { if response != "SYN-ACK" {
return Err(format!("Expected SYN-ACK, got: {}", response).into()); return Err(format!("Expected SYN-ACK, got: {}", response).into());
} }
println!("Client: Received SYN-ACK");
// Step 3: Client sends ACK // Step 3: Client sends ACK
self.inner.write("ACK")?; self.inner.write("ACK")?;
println!("Client: Sent ACK");
// Step 4: Client receives FIN (final confirmation) // Step 4: Client receives FIN (final confirmation)
let response = self.inner.read()?; let response = self.inner.read()?;
if response != "FIN" { if response != "FIN" {
return Err(format!("Expected FIN, got: {}", response).into()); return Err(format!("Expected FIN, got: {}", response).into());
} }
println!("Client: Received FIN - Handshake complete!");
self.finished_handshake = true; info!("Handshake complete!");
self.finished_handshake.swap(true, Ordering::Relaxed);
Ok(()) Ok(())
} }
fn initialize_server(&mut self) -> Result<(), Error> { fn initialize_server(&mut self) -> Result<(), Error> {
println!("Starting server handshake...");
// Step 1: Server receives SYN // Step 1: Server receives SYN
let request = self.inner.read()?; let request = self.inner.read()?;
if request != "SYN" { if request != "SYN" {
return Err(format!("Expected SYN, got: {}", request).into()); return Err(format!("Expected SYN, got: {}", request).into());
} }
println!("Server: Received SYN");
// Step 2: Server sends SYN-ACK // Step 2: Server sends SYN-ACK
self.inner.write("SYN-ACK")?; self.inner.write("SYN-ACK")?;
println!("Server: Sent SYN-ACK");
// Step 3: Server receives ACK // Step 3: Server receives ACK
let response = self.inner.read()?; let response = self.inner.read()?;
if response != "ACK" { if response != "ACK" {
return Err(format!("Expected ACK, got: {}", response).into()); return Err(format!("Expected ACK, got: {}", response).into());
} }
println!("Server: Received ACK");
// Step 4: Server sends FIN (final confirmation) // Step 4: Server sends FIN (final confirmation)
self.inner.write("FIN")?; self.inner.write("FIN")?;
println!("Server: Sent FIN - Handshake complete!"); info!("Handshake complete!");
self.finished_handshake = true; self.finished_handshake.swap(true, Ordering::Relaxed);
Ok(()) Ok(())
} }
} }
+6 -2
View File
@@ -1,11 +1,15 @@
use serde::Deserialize;
use serde::Serialize;
#[derive(Serialize, Deserialize, Debug)]
pub enum LayerConfig { pub enum LayerConfig {
Base64, Base64,
Handshake, Handshake,
} }
pub mod base64; mod base64;
mod builder; mod builder;
pub mod handshake; mod handshake;
pub use base64::Base64Layer; pub use base64::Base64Layer;
pub use handshake::HandshakeLayer; pub use handshake::HandshakeLayer;
+1
View File
@@ -13,3 +13,4 @@ pub use traits::ProtocolLayer;
pub use traits::ServerTrait; pub use traits::ServerTrait;
pub use server::run_listener; pub use server::run_listener;
pub use server::run_listener_state;
+26 -29
View File
@@ -1,54 +1,48 @@
use std::sync::Arc; use std::{sync::Arc, thread};
use std::thread;
use crate::{ use crate::{
layers::{LayerConfig, create_server_builder}, layers::{LayerConfig, create_server_builder},
networkers::{Connection, ServerTrait}, networkers::{Connection, ServerTrait},
}; };
// Helper macros for building layered connections #[allow(dead_code)]
macro_rules! build_layered_connection { pub fn run_listener_state<S, C, R, A>(
($base:expr) => { server: S,
$base layers: Vec<LayerConfig>,
}; on_connect_callback: R,
($base:expr, $layer:ty) => { state: Arc<A>,
<$layer>::new($base)? )
};
($base:expr, $layer:ty, $($layers:ty),+) => {
build_layered_connection!(<$layer>::new($base)?, $($layers),+)
};
}
pub fn run_listener_state<S, C, R, A>(server: S, on_connect_callback: R, state: Arc<A>)
/*-> Arc<Mutex<Vec<C>>>*/ /*-> Arc<Mutex<Vec<C>>>*/
where where
S: ServerTrait<C> + Sync + Send + 'static, S: ServerTrait<C> + Sync + Send + 'static,
C: Connection + 'static, C: Connection + 'static,
R: Fn(C, Arc<A>) + Sync + Send + 'static, R: Fn(Box<dyn Connection + Send + 'static>, Arc<A>) + Sync + Send + 'static,
A: Sync + Send + 'static, A: Sync + Send + 'static,
{ {
thread::spawn(move || {
let layer_builder = create_server_builder::<C>(layers).unwrap();
info!("Started listener {}", server.get_info()); info!("Started listener {}", server.get_info());
// let clients: Arc<Mutex<Vec<C>>> = Arc::new(Mutex::new(Vec::new()));
// let clients_clone = Arc::clone(&clients);
loop { loop {
match server.accept() { match server.accept() {
Ok(conn) => match layer_builder(conn) {
Ok(conn) => { Ok(conn) => {
info!("New connection ({})", conn.get_info()); info!("New connection ({})", conn.get_info());
on_connect_callback(conn, Arc::clone(&state)); on_connect_callback(conn, Arc::clone(&state));
// OnConnectCallback::on_connect(&mut on_connect_callback, conn);
// let mut clients_lock = clients_clone.lock().unwrap();
// clients_lock.push(conn);
} }
Err(e) => {
error!("Failed to create layers: {:?}", e);
}
},
Err(e) => { Err(e) => {
error!("Failed to accept connection: {:?}", e); error!("Failed to accept connection: {:?}", e);
} }
} }
} }
});
} }
#[allow(dead_code)]
pub fn run_listener<S, C, R>(server: S, layers: Vec<LayerConfig>, on_connect_callback: R) pub fn run_listener<S, C, R>(server: S, layers: Vec<LayerConfig>, on_connect_callback: R)
/*-> Arc<Mutex<Vec<C>>>*/ /*-> Arc<Mutex<Vec<C>>>*/
where where
@@ -56,17 +50,19 @@ where
C: Connection + 'static, C: Connection + 'static,
R: Fn(Box<dyn Connection + Send + 'static>) + Sync + Send + 'static, R: Fn(Box<dyn Connection + Send + 'static>) + Sync + Send + 'static,
{ {
let layer_builder = create_server_builder::<C>(layers).unwrap();
info!("Started listener {}", server.get_info());
// let clients: Arc<Mutex<Vec<C>>> = Arc::new(Mutex::new(Vec::new())); // let clients: Arc<Mutex<Vec<C>>> = Arc::new(Mutex::new(Vec::new()));
// let clients_clone = Arc::clone(&clients); // let clients_clone = Arc::clone(&clients);
thread::spawn(move || {
let layer_builder = create_server_builder::<C>(layers).unwrap();
info!("Started listener {}", server.get_info());
loop { loop {
match server.accept() { match server.accept() {
Ok(conn) => match layer_builder(conn) { Ok(conn) => match layer_builder(conn) {
Ok(conn) => { Ok(conn) => {
info!("New connection ({})", conn.get_info()); let con_info = conn.get_info();
info!("New connection ({})", con_info);
on_connect_callback(conn); on_connect_callback(conn);
} }
Err(e) => { Err(e) => {
@@ -78,4 +74,5 @@ where
} }
} }
} }
});
} }
+21 -66
View File
@@ -1,6 +1,10 @@
use std::{ use std::{
io::{self, BufRead, BufReader, Write}, io::{BufRead, BufReader, Write},
net::{SocketAddr, TcpListener, TcpStream}, net::{SocketAddr, TcpListener, TcpStream},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
}; };
use crate::{ use crate::{
@@ -11,7 +15,7 @@ use crate::{
pub struct TCPConnection { pub struct TCPConnection {
stream: TcpStream, stream: TcpStream,
reader: BufReader<TcpStream>, reader: BufReader<TcpStream>,
is_alive: bool, is_alive: Arc<AtomicBool>,
} }
impl Connection for TCPConnection { impl Connection for TCPConnection {
@@ -27,7 +31,7 @@ impl Connection for TCPConnection {
} }
fn is_alive(&self) -> bool { fn is_alive(&self) -> bool {
self.is_alive self.is_alive.load(Ordering::Relaxed)
} }
fn read(&mut self) -> Result<String, Error> { fn read(&mut self) -> Result<String, Error> {
@@ -36,78 +40,29 @@ impl Connection for TCPConnection {
// Stream sends a null buffer if it is disconnected // Stream sends a null buffer if it is disconnected
if n == 0 { if n == 0 {
self.is_alive = false; self.is_alive.swap(false, Ordering::Relaxed);
} }
// println!("Recieved: {}", line.trim_end().to_string());
Ok(line.trim_end().to_string()) Ok(line.trim_end().to_string())
} }
fn write(&mut self, data: &str) -> Result<(), Error> { fn write(&mut self, data: &str) -> Result<(), Error> {
info!("Sent: {}", data); // println!("Recsent: {}", data);
writeln!(self.stream, "{}", data)?; writeln!(self.stream, "{}", data)?;
self.stream.flush()?; self.stream.flush()?;
Ok(()) Ok(())
} }
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, Error> {
Ok(Box::new(Self {
stream: self.stream.try_clone()?,
reader: BufReader::new(self.stream.try_clone()?),
is_alive: Arc::clone(&self.is_alive),
}))
}
} }
// impl AsyncConnection<TCPConnection> for TCPConnection {
// type Error = io::Error;
// fn as_async<T: Serialize + DeserializeOwned + Send + 'static>(
// connection: TCPConnection,
// ) -> (Sender<T>, Receiver<T>) {
// let (send_tx, send_rx) = crossbeam_channel::unbounded::<T>();
// let (recv_tx, recv_rx) = crossbeam_channel::unbounded::<T>();
// thread::spawn(move || {
// let mut reader = connection.reader;
// let mut read = || -> Result<String, Self::Error> {
// let mut line = String::new();
// let _ = reader.read_line(&mut line)?;
// Ok(line.trim_end().to_string())
// };
// loop {
// if let Ok(data) = read() {
// if data.is_empty() {
// break;
// }
// info!("Got {}", data);
// if let Ok(decoded) = serde_json::from_str::<T>(&data) {
// if let Err(e) = send_tx.send(decoded) {
// error!("Got error: {}", e);
// }
// }
// }
// }
// });
// thread::spawn(move || {
// let mut stream = connection.stream;
// let mut write = |data: String| -> Result<(), Self::Error> {
// writeln!(stream, "{}", data)?;
// stream.flush()?;
// Ok(())
// };
// loop {
// if let Ok(data) = recv_rx.recv() {
// if let Ok(encoded) = serde_json::to_string(&data) {
// info!("Write {}", encoded);
// if let Err(e) = write(encoded) {
// error!("Got error: {}", e);
// }
// }
// }
// }
// });
// (recv_tx, send_rx)
// }
// }
pub struct TCPServer { pub struct TCPServer {
listener: TcpListener, listener: TcpListener,
@@ -131,7 +86,7 @@ impl ServerTrait<TCPConnection> for TCPServer {
Ok(TCPConnection { Ok(TCPConnection {
stream, stream,
reader, reader,
is_alive: true, is_alive: Arc::new(AtomicBool::new(true)),
}) })
} }
@@ -150,7 +105,7 @@ impl ClientTrait<TCPConnection> for TCPClient {
let conn = TCPConnection { let conn = TCPConnection {
stream, stream,
reader, reader,
is_alive: true, is_alive: Arc::new(AtomicBool::new(true)),
}; };
Ok(conn) Ok(conn)
} }
+6 -6
View File
@@ -1,21 +1,21 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::ops::Deref;
use std::ops::DerefMut;
use crate::Error; use crate::Error;
// This is the lowset-level data transmission type // This is the data transmission type
pub trait Connection: Send { pub trait Connection: Send + Sync {
fn get_info(&self) -> String; fn get_info(&self) -> String;
fn is_alive(&self) -> bool; fn is_alive(&self) -> bool;
fn read(&mut self) -> Result<String, Error>; fn read(&mut self) -> Result<String, Error>;
fn write(&mut self, data: &str) -> Result<(), Error>; fn write(&mut self, data: &str) -> Result<(), Error>;
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, Error>;
} }
// Trait for protocol layers that can be initialized // Trait for protocol layers that can be initialized
pub trait ProtocolLayer<C: Connection>: Connection { pub trait ProtocolLayer: Connection {
fn new(inner: C) -> Result<Self, Error> fn new(inner: Box<dyn Connection>) -> Result<Self, Error>
where where
Self: Sized; Self: Sized;
fn initialize_client(&mut self) -> Result<(), Error> { fn initialize_client(&mut self) -> Result<(), Error> {