Move main project to new repository

This commit is contained in:
Michael Mikovsky
2025-08-24 11:04:33 -06:00
parent 7bf1ef9419
commit 2bb86fb67c
48 changed files with 0 additions and 3497 deletions
+11
View File
@@ -0,0 +1,11 @@
[package]
name = "unshell-rs-lib"
edition = "2024"
[dependencies]
base64 = "0.22.1"
bincode = "2.0.1"
crossbeam-channel = "0.5.15"
log = "0.4.27"
rand = "0.9.1"
uuid = { version = "1.17.0", features = ["v4"] }
+42
View File
@@ -0,0 +1,42 @@
use crate::{
Error,
networkers::{Connection, ProtocolLayer},
};
use base64::{Engine as _, engine::general_purpose};
pub struct Base64Layer {
inner: Box<dyn Connection>,
}
impl Connection for Base64Layer {
fn get_info(&self) -> String {
format!("b64->{}", self.inner.get_info())
}
fn is_alive(&self) -> bool {
self.inner.is_alive()
}
fn read(&mut self) -> Result<Vec<u8>, Error> {
Ok(general_purpose::STANDARD
.decode(&self.inner.read()?)
.unwrap())
}
fn write(&mut self, data: &[u8]) -> Result<(), Error> {
self.inner
.write(general_purpose::STANDARD.encode(data).as_bytes())
}
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, Error> {
Ok(Box::new(Self {
inner: self.inner.try_clone()?,
}))
}
}
impl ProtocolLayer for Base64Layer {
fn new(inner: Box<dyn Connection>) -> Result<Self, Error> {
Ok(Base64Layer { inner })
}
}
+76
View File
@@ -0,0 +1,76 @@
use crate::{
Error,
layers::{Base64Layer, HandshakeLayer, LayerConfig},
networkers::{Connection, ProtocolLayer},
};
impl Connection for Box<dyn Connection + Send + Sync> {
fn get_info(&self) -> String {
(**self).get_info()
}
fn is_alive(&self) -> bool {
(**self).is_alive()
}
fn read(&mut self) -> Result<Vec<u8>, Error> {
(**self).read()
}
fn write(&mut self, data: &[u8]) -> Result<(), Error> {
(**self).write(data)
}
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, Error> {
Ok(Box::new((**self).try_clone()?))
}
}
pub fn build_client<C>(
base_conn: C,
layers: Vec<LayerConfig>,
) -> Result<Box<dyn Connection + Send>, Error>
where
C: Connection + 'static,
{
let mut current_conn: Box<dyn Connection + Send> = Box::new(base_conn);
for layer_config in &layers {
current_conn = match layer_config {
LayerConfig::Base64 => Box::new(Base64Layer::new(current_conn)?),
LayerConfig::Handshake => {
let mut handshake_layer = HandshakeLayer::new(current_conn)?;
handshake_layer.initialize_client()?;
Box::new(handshake_layer)
}
};
}
Ok(current_conn)
}
pub fn create_server_builder<C>(
layers: Vec<LayerConfig>,
) -> Result<Box<dyn Fn(C) -> Result<Box<dyn Connection + Send>, Error>>, Error>
where
C: Connection + 'static,
{
Ok(Box::new(
move |base_conn: C| -> Result<Box<dyn Connection + Send>, Error> {
let mut current_conn: Box<dyn Connection + Send> = Box::new(base_conn);
for layer_config in &layers {
current_conn = match layer_config {
LayerConfig::Base64 => Box::new(Base64Layer::new(current_conn)?),
LayerConfig::Handshake => {
let mut handshake_layer = HandshakeLayer::new(current_conn)?;
handshake_layer.initialize_server()?;
Box::new(handshake_layer)
}
};
}
Ok(current_conn)
},
))
}
+102
View File
@@ -0,0 +1,102 @@
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use crate::networkers::{Connection, ProtocolLayer};
type Error = Box<dyn std::error::Error>;
// 4-Way Handshake Layer
pub struct HandshakeLayer {
inner: Box<dyn Connection>,
finished_handshake: Arc<AtomicBool>,
}
impl Connection for HandshakeLayer {
fn get_info(&self) -> String {
format!("handshake->{}", self.inner.get_info())
}
fn is_alive(&self) -> bool {
self.inner.is_alive()
}
fn read(&mut self) -> Result<Vec<u8>, Error> {
if !self.finished_handshake.load(Ordering::Relaxed) {
return Err("NotComplete".into());
}
self.inner.read()
}
fn write(&mut self, data: &[u8]) -> Result<(), Error> {
if !self.finished_handshake.load(Ordering::Relaxed) {
return Err("NotComplete".into());
}
self.inner.write(data)
}
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, crate::Error> {
Ok(Box::new(Self {
inner: self.inner.try_clone()?,
finished_handshake: Arc::clone(&self.finished_handshake.clone()),
}))
}
}
impl ProtocolLayer for HandshakeLayer {
fn new(inner: Box<dyn Connection>) -> Result<Self, Error> {
Ok(HandshakeLayer {
inner,
finished_handshake: Arc::new(AtomicBool::new(false)),
})
}
fn initialize_client(&mut self) -> Result<(), Error> {
// Step 1: Client sends SYN
self.inner.write("SYN".as_bytes())?;
// Step 2: Client receives SYN-ACK
let response = self.inner.read()?;
if response != "SYN-ACK".as_bytes() {
return Err(format!("Expected SYN-ACK, got: {:?}", response).into());
}
// Step 3: Client sends ACK
self.inner.write("ACK".as_bytes())?;
// Step 4: Client receives FIN (final confirmation)
let response = self.inner.read()?;
if response != "FIN".as_bytes() {
return Err(format!("Expected FIN, got: {:?}", response).into());
}
info!("Handshake complete!");
self.finished_handshake.swap(true, Ordering::Relaxed);
Ok(())
}
fn initialize_server(&mut self) -> Result<(), Error> {
// Step 1: Server receives SYN
let request = self.inner.read()?;
if request != "SYN".as_bytes() {
return Err(format!("Expected SYN, got: {:?}", request).into());
}
// Step 2: Server sends SYN-ACK
self.inner.write("SYN-ACK".as_bytes())?;
// Step 3: Server receives ACK
let response = self.inner.read()?;
if response != "ACK".as_bytes() {
return Err(format!("Expected ACK, got: {:?}", response).into());
}
// Step 4: Server sends FIN (final confirmation)
self.inner.write("FIN".as_bytes())?;
info!("Handshake complete!");
self.finished_handshake.swap(true, Ordering::Relaxed);
Ok(())
}
}
+17
View File
@@ -0,0 +1,17 @@
use bincode::{Decode, Encode};
#[derive(Encode, Decode, Debug, Clone)]
pub enum LayerConfig {
Base64,
Handshake,
}
mod base64;
mod builder;
mod handshake;
pub use base64::Base64Layer;
pub use handshake::HandshakeLayer;
pub use builder::build_client;
pub use builder::create_server_builder;
+13
View File
@@ -0,0 +1,13 @@
#[macro_use]
extern crate log;
pub type Error = Box<dyn std::error::Error>;
static BINCODE_CONFIG: bincode::config::Configuration = bincode::config::standard();
pub mod layers;
pub mod networkers;
pub mod nodes;
mod packets;
pub use packets::C2Packet;
+16
View File
@@ -0,0 +1,16 @@
mod server;
mod tcp;
mod traits;
pub use tcp::TCPClient;
pub use tcp::TCPConnection;
pub use tcp::TCPServer;
// pub use traits::AsyncConnection;
pub use traits::ClientTrait;
pub use traits::Connection;
pub use traits::ProtocolLayer;
pub use traits::ServerTrait;
pub use server::run_listener;
pub use server::run_listener_state;
+78
View File
@@ -0,0 +1,78 @@
use std::{sync::Arc, thread};
use crate::{
layers::{LayerConfig, create_server_builder},
networkers::{Connection, ServerTrait},
};
#[allow(dead_code)]
pub fn run_listener_state<S, C, R, A>(
server: S,
layers: Vec<LayerConfig>,
on_connect_callback: R,
state: Arc<A>,
)
/*-> Arc<Mutex<Vec<C>>>*/
where
S: ServerTrait<C> + Sync + Send + 'static,
C: Connection + 'static,
R: Fn(Box<dyn Connection + Send + 'static>, Arc<A>) + Sync + Send + 'static,
A: Sync + Send + 'static,
{
thread::spawn(move || {
let layer_builder = create_server_builder::<C>(layers).unwrap();
info!("Started listener {}", server.get_info());
loop {
match server.accept() {
Ok(conn) => match layer_builder(conn) {
Ok(conn) => {
info!("New connection ({})", conn.get_info());
on_connect_callback(conn, Arc::clone(&state));
}
Err(e) => {
error!("Failed to create layers: {:?}", e);
}
},
Err(e) => {
error!("Failed to accept connection: {:?}", e);
}
}
}
});
}
#[allow(dead_code)]
pub fn run_listener<S, C, R>(server: S, layers: Vec<LayerConfig>, on_connect_callback: R)
/*-> Arc<Mutex<Vec<C>>>*/
where
S: ServerTrait<C> + Sync + Send + 'static,
C: Connection + 'static,
R: Fn(Box<dyn Connection + Send + 'static>) + Sync + Send + 'static,
{
// let clients: Arc<Mutex<Vec<C>>> = Arc::new(Mutex::new(Vec::new()));
// let clients_clone = Arc::clone(&clients);
thread::spawn(move || {
let layer_builder = create_server_builder::<C>(layers).unwrap();
info!("Started listener {}", server.get_info());
loop {
match server.accept() {
Ok(conn) => match layer_builder(conn) {
Ok(conn) => {
let con_info = conn.get_info();
info!("New connection ({})", con_info);
on_connect_callback(conn);
}
Err(e) => {
error!("Failed to create layers: {:?}", e);
}
},
Err(e) => {
error!("Failed to accept connection: {:?}", e);
}
}
}
});
}
+131
View File
@@ -0,0 +1,131 @@
use std::{
io::{BufReader, Read, Write},
net::{SocketAddr, TcpListener, TcpStream},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};
use crate::{
Error,
networkers::{ClientTrait, Connection, ServerTrait},
};
pub struct TCPConnection {
stream: TcpStream,
reader: BufReader<TcpStream>,
is_alive: Arc<AtomicBool>,
}
impl Connection for TCPConnection {
fn get_info(&self) -> String {
format!(
"tcp://{}",
if let Ok(addr) = &self.stream.peer_addr() {
addr.to_string()
} else {
"ERROR".to_string()
}
)
}
fn is_alive(&self) -> bool {
self.is_alive.load(Ordering::Relaxed)
}
fn read(&mut self) -> Result<Vec<u8>, Error> {
let mut len_bytes = [0u8; 4];
if let Err(e) = self.reader.read_exact(&mut len_bytes) {
self.is_alive.swap(false, Ordering::Relaxed);
return Err(format!("Stream disconnected! ({})", e).into());
}
let len = u32::from_be_bytes(len_bytes) as usize;
let mut buffer = vec![0u8; len];
// In case the
match self.reader.read_exact(&mut buffer) {
Ok(()) => Ok(buffer.to_vec()),
Err(e) => {
self.is_alive.swap(false, Ordering::Relaxed);
Err(format!("Stream disconnected! ({})", e).into())
}
}
// let mut buf = Vec::new();
// let n = self.reader.read(&mut buf)?;
// Stream sends a null buffer if it is disconnected
// if n == 0 {
// self.is_alive.swap(false, Ordering::Relaxed);
// }
// println!("Recieved: {}", line.trim_end().to_string());
}
fn write(&mut self, data: &[u8]) -> Result<(), Error> {
let len = data.len() as u32;
self.stream.write_all(&len.to_be_bytes())?;
self.stream.write_all(data)?;
self.stream.flush()?;
Ok(())
}
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, Error> {
Ok(Box::new(Self {
stream: self.stream.try_clone()?,
reader: BufReader::new(self.stream.try_clone()?),
is_alive: Arc::clone(&self.is_alive),
}))
}
}
pub struct TCPServer {
listener: TcpListener,
}
impl ServerTrait<TCPConnection> for TCPServer {
fn get_info(&self) -> String {
format!(
"tcp://{}",
if let Ok(addr) = &self.listener.local_addr() {
addr.to_string()
} else {
"ERROR".to_string()
}
)
}
fn accept(&self) -> Result<TCPConnection, Error> {
let (stream, _) = self.listener.accept()?;
let reader = BufReader::new(stream.try_clone()?);
Ok(TCPConnection {
stream,
reader,
is_alive: Arc::new(AtomicBool::new(true)),
})
}
fn bind(address: &SocketAddr) -> Result<Self, Error> {
let listener = TcpListener::bind(address)?;
Ok(Self { listener })
}
}
pub struct TCPClient;
impl ClientTrait<TCPConnection> for TCPClient {
fn connect(address: &SocketAddr) -> Result<TCPConnection, Error> {
let stream = TcpStream::connect(address)?;
let reader = BufReader::new(stream.try_clone()?);
let conn = TCPConnection {
stream,
reader,
is_alive: Arc::new(AtomicBool::new(true)),
};
Ok(conn)
}
}
+39
View File
@@ -0,0 +1,39 @@
use std::net::SocketAddr;
use crate::Error;
// This is the data transmission type
pub trait Connection: Send + Sync {
fn get_info(&self) -> String;
fn is_alive(&self) -> bool;
fn read(&mut self) -> Result<Vec<u8>, Error>;
fn write(&mut self, data: &[u8]) -> Result<(), Error>;
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, Error>;
}
// Trait for protocol layers that can be initialized
pub trait ProtocolLayer: Connection {
fn new(inner: Box<dyn Connection>) -> Result<Self, Error>
where
Self: Sized;
fn initialize_client(&mut self) -> Result<(), Error> {
Ok(())
}
fn initialize_server(&mut self) -> Result<(), Error> {
Ok(())
}
}
pub trait ServerTrait<C: Connection> {
fn get_info(&self) -> String;
fn accept(&self) -> Result<C, Error>;
fn bind(address: &SocketAddr) -> Result<Self, Error>
where
Self: Sized;
}
pub trait ClientTrait<C: Connection + Sized> {
fn connect(address: &SocketAddr) -> Result<C, Error>;
}
+11
View File
@@ -0,0 +1,11 @@
use std::net::SocketAddr;
use bincode::{Decode, Encode};
use crate::layers::LayerConfig;
#[derive(Encode, Decode, Debug, Clone)]
pub struct ConnectionConfig {
pub socket: SocketAddr,
pub layers: Vec<LayerConfig>,
}
+10
View File
@@ -0,0 +1,10 @@
mod listener;
mod node;
mod node_container;
mod packets;
mod stream;
pub use listener::ConnectionConfig;
pub use node::Node;
pub use node_container::NodeContainer;
pub use stream::Stream;
+483
View File
@@ -0,0 +1,483 @@
use std::{
collections::HashMap,
fmt::Debug,
sync::{Arc, Mutex, MutexGuard},
thread,
time::Duration,
};
use bincode::{Decode, Encode};
use crossbeam_channel::{Receiver, RecvError, Sender};
// use std:::{Receiver, Sender};
#[allow(deprecated)]
use rand::{seq::IndexedRandom, thread_rng};
use crate::{
Error,
layers::build_client,
networkers::{ClientTrait, Connection, ServerTrait, TCPClient, TCPServer, run_listener_state},
nodes::{
listener::ConnectionConfig,
packets::{Packets, decode_vec, encode_vec},
},
};
fn read(c: &mut Box<dyn Connection + Send>) -> Result<Packets, Error> {
Packets::decode(c.read()?.as_slice())
}
fn write(c: &mut Box<dyn Connection + Send>, packet: Packets) -> Result<(), Error> {
c.write(&(packet.encode()?))
}
pub struct Node<P>
where
P: Encode + Decode<()> + Debug + Clone + 'static,
{
pub state: Arc<Mutex<NodeState<P>>>,
rx: Receiver<(String, P)>,
disconnect_rx: Receiver<String>,
}
impl<P> Node<P>
where
P: Encode + Decode<()> + Debug + Clone + Send + 'static,
{
pub fn run_node(
id: String,
clients: Vec<ConnectionConfig>,
listeners: Vec<ConnectionConfig>,
) -> Result<Self, Error>
where
P: Encode + Decode<()> + Debug + Clone + 'static,
{
// let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?;
let (tx, rx) = crossbeam_channel::unbounded();
let (disconnect_tx, disconnect_rx) = crossbeam_channel::unbounded();
let state = Arc::new(Mutex::new(NodeState::<P> {
id: id, //Uuid::new_v4().to_string(), //TODO: Calling an OS RNG can pose a problem for security;
connections: HashMap::new(),
map: HashMap::new(),
packet_listener: tx,
disconnect_listener: disconnect_tx,
}));
for listener in listeners {
run_listener_state(
TCPServer::bind(&listener.socket)?,
listener.layers,
Self::on_listener_client,
Arc::clone(&state),
);
}
for client in clients {
let state = Arc::clone(&state);
thread::spawn(move || {
loop {
if let Err(e) = Self::run_client(client.clone(), &state) {
error!("Could not connect to server; {:?}", e);
}
thread::sleep(Duration::from_millis(1000));
}
});
}
Ok(Self {
state,
rx,
disconnect_rx,
})
}
fn run_client(client: ConnectionConfig, state: &Arc<Mutex<NodeState<P>>>) -> Result<(), Error> {
Self::run_connection(
build_client(TCPClient::connect(&client.socket)?, client.layers)?,
state,
)?;
Ok(())
}
fn on_listener_client(
connection: Box<dyn Connection + Send + 'static>,
state: Arc<Mutex<NodeState<P>>>,
) {
thread::spawn(move || {
if let Err(e) = Self::run_connection(connection, &state) {
error!("Could not connect; {}", e);
}
});
}
fn run_connection(
connection: Box<dyn Connection + Send + 'static>,
state: &Arc<Mutex<NodeState<P>>>,
) -> Result<(), Error> {
let mut connection = connection;
let s = state.lock().unwrap();
let this_uuid = s.id.clone();
std::mem::drop(s);
// Send UUID to new connection
write(&mut connection, Packets::SyncUUID(this_uuid.clone()))?;
// Recieve UUID
let uuid_result = read(&mut connection)?;
let other_uuid = if let Packets::SyncUUID(source) = uuid_result {
source
} else {
return Err(format!("Could not get UUID! Got {:?}", uuid_result).into());
};
if (&mut state.lock().unwrap()).knows_client(&other_uuid) {
write(&mut connection, Packets::ErrorNameExists)?;
return Err(format!(
"Attempted to accept connection from node {} which already exists!",
other_uuid
)
.into());
}
info!("New Node! {} (direct)", other_uuid);
// Add connection
(&mut state.lock().unwrap())
.connections
.insert(other_uuid.clone(), connection.try_clone()?);
// Update direct connections and the new connections with the new table
(&mut state.lock().unwrap()).broadcast_table(None);
loop {
match read(&mut connection) {
Ok(packet) => {
let result: Result<(), Error> =
match packet {
Packets::Disconnect { routes } => Ok(
(&mut state.lock().unwrap()).disconnect(&other_uuid, routes, false)
),
Packets::Update { routes } => Ok((&mut state.lock().unwrap())
.extend_routes(other_uuid.clone(), routes)),
Packets::DataUnrouted {
src: source,
dest,
data,
} => (&mut state.lock().unwrap()).route_packet(source, dest, data),
_ => {
error!("Unsupported packet: {:?}", packet);
Ok(())
}
};
if let Err(e) = result {
error!("Could not parse; {}", e);
}
}
Err(e) => {
if !connection.is_alive() {
warn!("Connection {} Disconnected!", connection.get_info());
let state = &mut state.lock().unwrap();
state.connections.remove(&other_uuid);
state.disconnect(&this_uuid, vec![other_uuid.clone()], true);
break;
}
error!("Could not read; {}", e);
}
}
}
Ok(())
}
pub fn recv(&self) -> Result<(String, P), RecvError> {
self.rx.recv()
}
pub fn state(&self) -> MutexGuard<'_, NodeState<P>> {
self.state.lock().unwrap()
}
pub fn try_clone(&self) -> Result<Self, Error> {
Ok(Self {
state: Arc::clone(&self.state),
rx: self.rx.clone(),
disconnect_rx: self.disconnect_rx.clone(),
})
}
pub fn send_unrouted(&self, dest: String, data: &P) -> Result<(), Error> {
self.state().send_unrouted(dest, data)
}
pub fn get_disconnect_rx(&self) -> Receiver<String> {
self.disconnect_rx.clone()
}
}
pub struct NodeState<P>
where
P: Encode + Decode<()> + Debug + Clone + 'static,
{
id: String,
connections: HashMap<String, Box<dyn Connection + Send>>,
map: HashMap<String, Vec<String>>,
packet_listener: Sender<(String, P)>,
disconnect_listener: Sender<String>,
}
impl<P> NodeState<P>
where
P: Encode + Decode<()> + Debug + Clone + Send + 'static,
{
// Get list of all nodes in map
fn get_known_nodes(&self) -> Vec<String> {
self.map.keys().map(|k| k.clone()).collect::<Vec<String>>()
}
// Get list of node UUIDs that are directly connected to this node
fn get_direct_nodes(&self) -> Vec<String> {
self.connections
.keys()
.map(|k| k.clone())
.collect::<Vec<String>>()
}
fn knows_client(&self, id: &String) -> bool {
self.get_all_nodes().contains(id)
}
// Remove all nodes where the routes are empty
fn remove_null_nodes(&mut self) {
self.map.retain(|_, routes| !routes.is_empty());
}
// Send packet to all directly connected nodes, except maybe one
fn broadcast(&mut self, data: Packets, disclude: Option<&String>) {
for (uuid, connection) in self.connections.iter_mut() {
if disclude.is_some() && disclude.unwrap() == uuid {
continue;
}
if let Err(e) = write(connection, data.clone()) {
error!("Failed to send packet to {}, {}", uuid, e);
}
}
}
// Get list of nodes to send to another as known routes
fn get_routes_to(&self, recv_uuid: &String) -> Vec<String> {
let mut tx_routes: Vec<String> = Vec::new();
// Append
for (map_uuid, routes) in self.map.iter() {
// Do not transmit a route, which bounces directly back to the sender
if routes.len() == 1 && &routes[0] == recv_uuid {
continue;
}
tx_routes.push(map_uuid.clone());
}
// Append directly connected nodes
tx_routes.append(&mut self.get_direct_nodes());
tx_routes
}
fn broadcast_table(&mut self, disclude: Option<&String>) {
let packets = self
.connections
.iter()
.map(|(recv_uuid, _)| self.get_routes_to(&recv_uuid))
.collect::<Vec<Vec<String>>>();
for (i, (recv_uuid, connection)) in self.connections.iter_mut().enumerate() {
if let Some(disclude) = disclude {
if disclude == recv_uuid {
continue;
}
}
if let Err(e) = write(
connection,
Packets::Update {
routes: packets[i].clone(),
},
) {
error!("Failed to send packet to {}, {}", recv_uuid, e);
}
}
}
fn disconnect(&mut self, source: &String, routes: Vec<String>, direct: bool) {
let mut resend_table = false;
let mut remove_uuids = Vec::new();
for remove_uuid in routes {
// Sanity check, in case the current client is still connected
if self.get_direct_nodes().contains(&remove_uuid) {
resend_table = true;
continue;
}
// Check if client still exists, or if it was a direct connection
// Prevents infinite network loops
if direct || self.knows_client(&remove_uuid) {
self.map.remove(&remove_uuid);
remove_uuids.push(remove_uuid.clone());
info!(
"Node disconnected! {} ({})",
remove_uuid,
if direct { "direct" } else { "indirect" }
);
self.disconnect_listener.send(remove_uuid.clone()).unwrap();
for (uuid, route) in self.map.iter_mut() {
if route.contains(&remove_uuid) {
let index = route.iter().position(|r| r == &remove_uuid).unwrap();
route.remove(index);
remove_uuids.push(uuid.clone());
}
}
self.remove_null_nodes();
}
}
if !remove_uuids.is_empty() {
self.broadcast(
Packets::Disconnect {
routes: remove_uuids,
},
Some(source),
);
}
if resend_table {
self.broadcast_table(None);
}
// self.print_map();
}
fn extend_routes(&mut self, src: String, routes: Vec<String>) {
let mut updated = false;
// Quick sanity check
if !self.get_direct_nodes().contains(&src) {
return;
}
// Loop through all of the routes in the new recieved route map
for route in routes {
// If the route loops back to self, disregard.
if route == self.id {
continue;
}
// If the connection is already established directly, disregard
if self.get_direct_nodes().contains(&route) {
continue;
}
// If there is already an entry created for this route
if self.map.contains_key(&route) {
// If the route does not already contain the new one
if !self.map.get(&route).unwrap().contains(&src) {
// If the neighbor can be acessed directly, disregard
self.map.get_mut(&route).unwrap().push(src.clone());
info!("Node update: {} (indirect)", src);
updated = true;
} else {
// Else, do nothing
continue;
}
} else {
// Else, create the new route entry
self.map.insert(route.clone(), vec![src.clone()]);
info!("Node update: {} (indirect)", src);
updated = true;
}
}
// Solves the case that if a remote node has said that a neighbor has connected before itself has
let direct_connections = self.get_direct_nodes();
for connection in direct_connections {
if self.map.contains_key(&connection) {
self.map.remove(&connection);
}
}
// If something has updated, rebroadcast
// Prevents infinite network loops
if updated {
self.broadcast_table(Some(&src));
}
// self.print_map();
}
fn route_packet(&mut self, src: String, dest: String, data: Vec<u8>) -> Result<(), Error> {
if dest == self.id {
self.packet_listener.send((src, decode_vec::<P>(&data)?))?;
Ok(())
} else {
if self.connections.contains_key(&dest) {
write(
self.connections.get_mut(&dest).unwrap(),
Packets::DataUnrouted { src, dest, data },
)?;
Ok(())
} else if self.map.contains_key(&dest) {
#[allow(deprecated)]
let next_uuid = self
.map
.get(&dest)
.unwrap()
.choose(&mut thread_rng())
.unwrap()
.clone();
write(
self.connections.get_mut(&next_uuid).unwrap(),
Packets::DataUnrouted { src, dest, data },
)?;
Ok(())
} else {
Err::<(), Error>(format!("Could not find route from {} to {}!", src, dest).into())
}
}
}
pub fn send_unrouted(&mut self, dest: String, data: &P) -> Result<(), Error> {
self.route_packet(self.id.clone(), dest, encode_vec(data)?)
}
pub fn get_all_nodes(&self) -> Vec<String> {
let mut uuids = self.get_known_nodes();
uuids.append(&mut self.get_direct_nodes());
uuids
}
#[allow(dead_code)]
fn print_map(&self) {
info!("\n\n");
info!("Local addr: {}", self.id);
info!("Table: ");
for (uuid, route) in self.map.iter() {
info!("{} -> [ {:?} ]", uuid, route);
}
info!("Direct: {:?}", self.get_direct_nodes());
}
}
+183
View File
@@ -0,0 +1,183 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
thread,
};
use crossbeam_channel::{Receiver, Sender};
use crate::{
Error,
nodes::{ConnectionConfig, Node, Stream, stream::StreamHandle},
packets::TransportLayerPacket,
};
type Streams = Arc<Mutex<HashMap<(usize, String), StreamHandle>>>;
pub struct NodeContainer {
streams: Streams,
node: Node<TransportLayerPacket>,
on_stream_rx: Receiver<Stream>,
// state: Arc<Mutex<NodeState<TransportLayerPacket>>>,
// spontanious_rx: Receiver<(String, C2Packet)>,
}
impl NodeContainer {
pub fn connect(
id: String,
clients: Vec<ConnectionConfig>,
listeners: Vec<ConnectionConfig>,
) -> Result<Self, Error> {
let node = Node::<TransportLayerPacket>::run_node(id, clients, listeners)?;
let streams = Arc::new(Mutex::new(HashMap::new()));
// let (spontanious_tx, spontanious_rx) = crossbeam_channel::unbounded();
let (on_stream_tx, on_stream_rx) = crossbeam_channel::unbounded();
let s = Self {
streams: Arc::clone(&streams),
node: node.try_clone()?,
on_stream_rx,
};
let close_rx = node.get_disconnect_rx();
let stream_clone = Arc::clone(&streams);
thread::spawn(move || {
loop {
let close_uuid = close_rx.recv().unwrap();
let stream = stream_clone.lock().unwrap();
let keys = stream.keys();
for key in keys {
if key.1 == close_uuid {
warn!("Stream ({}, {}) disconnected!", key.0, key.1);
let handle = (&mut stream_clone.lock().unwrap()).remove(key).unwrap();
handle.close();
}
}
}
});
// Start node listening thread
thread::spawn(move || {
loop {
if let Err(e) = Self::node_listening_thread(&node, &streams, &on_stream_tx) {
error!("Got error: {}", e);
}
}
});
Ok(s)
}
fn node_listening_thread(
node: &Node<TransportLayerPacket>,
streams: &Streams,
on_stream_tx: &Sender<Stream>, // spontanious_tx: &Sender<(String, C2Packet)>,
) -> Result<(), Error> {
// info!("Loop");
let (src, packet) = node.recv()?;
info!("Packet: {:?}", packet);
match packet {
TransportLayerPacket::RequestStreamUnrouted {
stream_id: remote_stream_id,
} => {
// Create stream ID
let local_stream_id = streams.lock().unwrap().keys().len();
// Send response to server including local id and remoe ID
Stream::respond_create(src.clone(), local_stream_id, remote_stream_id, node)?;
Self::create_handle_thread(
on_stream_tx,
streams,
node,
src,
local_stream_id,
remote_stream_id,
)?;
Ok(())
}
TransportLayerPacket::AckStreamUnrouted {
ack_stream_id,
stream_id,
} => {
Self::create_handle_thread(
on_stream_tx,
streams,
node,
src,
ack_stream_id,
stream_id,
)?;
Ok(())
}
TransportLayerPacket::StreamDataUnrouted { stream_id, data } => {
match streams.lock().unwrap().get(&(stream_id, src.clone())) {
Some(handle) => Ok(handle.send(data).unwrap()),
// Some(_) => Err(format!(
// "Stream {}, {} has not been initilized!",
// stream_id, src
// )
// .into()),
None => Err(format!("Stream {}, {} does not exist!", stream_id, src).into()),
}
} // _ => Err(format!("Unsupported packet: {:?}", packet).into()),
}
}
fn create_handle_thread(
on_stream_tx: &Sender<Stream>,
streams: &Streams,
node: &Node<TransportLayerPacket>,
src: String,
local_stream_id: usize,
remote_stream_id: usize,
) -> Result<(), Error> {
info!("Local: {}, Remote: {}", local_stream_id, remote_stream_id);
// Create stream from local and remote stream handles
let (stream, handle) =
Stream::create_handle(src.clone(), local_stream_id, remote_stream_id)?;
on_stream_tx.send(stream)?;
// Add the local stream to map
streams
.lock()
.unwrap()
.insert((local_stream_id, src.clone()), handle.clone()?);
let node_clone = node.try_clone()?;
thread::spawn(move || {
loop {
let data = handle.recv().unwrap();
if let Err(e) = node_clone.state().send_unrouted(
src.clone(),
&TransportLayerPacket::StreamDataUnrouted {
stream_id: remote_stream_id,
data,
},
) {
error!("Got error: {}", e);
break;
}
}
});
Ok(())
}
pub fn create_stream_block(&self, dest: String) -> Result<Stream, Error> {
let local_stream_id = self.streams.lock().unwrap().keys().len();
Stream::ask_create(dest.clone(), local_stream_id, &self.node)?;
Ok(self.on_stream_rx.recv()?)
}
pub fn recv_stream(&self) -> Result<Stream, Error> {
Ok(self.on_stream_rx.recv()?)
}
pub fn get_nodes(&self) -> Vec<String> {
self.node.state().get_all_nodes()
// self.state.lock().unwrap().get_all_nodes()
}
}
+60
View File
@@ -0,0 +1,60 @@
use std::fmt::Debug;
use bincode::{Decode, Encode, config::Configuration};
use crate::Error;
#[derive(Debug, Encode, Decode, Clone)]
pub enum Packets {
SyncUUID(String),
Update {
routes: Vec<String>,
},
Disconnect {
routes: Vec<String>,
},
// Send single data packet without routing details
DataUnrouted {
src: String,
dest: String,
data: Vec<u8>,
},
// Send single data packet with routing details
DataRouted {
path: Vec<String>,
data: Vec<u8>,
},
// DataStreamRouted {
// path: Vec<String>,
// data: Vec<u8>,
// },
ErrorNameExists,
}
impl Packets {
pub fn encode(&self) -> Result<Vec<u8>, Error> {
encode_vec(self)
}
pub fn decode(data: &[u8]) -> Result<Self, Error> {
decode_vec(data)
}
}
pub fn encode_vec<P>(object: &P) -> Result<Vec<u8>, Error>
where
P: Encode + Decode<()> + Debug + Clone + 'static,
{
Ok(bincode::encode_to_vec(object, crate::BINCODE_CONFIG)?)
}
pub fn decode_vec<P>(data: &[u8]) -> Result<P, Error>
where
P: Encode + Decode<()> + Debug + Clone + 'static,
{
let (decoded, _) =
bincode::decode_from_slice::<P, Configuration>(&data[..], crate::BINCODE_CONFIG)?;
Ok(decoded)
}
+155
View File
@@ -0,0 +1,155 @@
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use crossbeam_channel::{Receiver, Sender};
use crate::{Error, networkers::Connection, nodes::Node, packets::TransportLayerPacket};
pub struct Stream {
tx: Sender<Vec<u8>>,
rx: Receiver<Vec<u8>>,
closed: Arc<AtomicBool>,
}
impl Connection for Stream {
fn get_info(&self) -> String {
"unrouted".to_string()
}
fn is_alive(&self) -> bool {
self.closed.load(Ordering::Relaxed)
}
fn read(&mut self) -> Result<Vec<u8>, crate::Error> {
if self.closed.load(Ordering::Relaxed) {
Err("Connection closed".into())
} else {
Ok(self.rx.recv()?)
}
}
fn write(&mut self, data: &[u8]) -> Result<(), crate::Error> {
if self.closed.load(Ordering::Relaxed) {
Err("Connection closed".into())
} else {
Ok(self.tx.send(data.to_vec())?)
}
}
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, crate::Error> {
Ok(Box::new(Self {
tx: self.tx.clone(),
rx: self.rx.clone(),
closed: Arc::clone(&self.closed),
}))
}
}
impl Stream {
pub fn ask_create(
dest: String,
local_stream_id: usize,
node: &Node<TransportLayerPacket>,
) -> Result<(), Error> {
info!("Sent to {}", dest);
node.send_unrouted(
dest.clone(),
&TransportLayerPacket::RequestStreamUnrouted {
stream_id: local_stream_id,
},
)?;
Ok(())
// Self::create_handle(dest, local_stream_id, remote_stream_id, node)
}
pub fn respond_create(
dest: String,
local_stream_id: usize,
remote_stream_id: usize,
node: &Node<TransportLayerPacket>,
) -> Result<(), Error> {
node.send_unrouted(
dest.clone(),
&&TransportLayerPacket::AckStreamUnrouted {
ack_stream_id: remote_stream_id,
stream_id: local_stream_id,
},
)?;
Ok(())
}
pub fn create_handle(
dest: String,
local_stream_id: usize,
remote_stream_id: usize,
) -> Result<(Self, StreamHandle), Error> {
let (recv_tx, recv_rx) = crossbeam_channel::unbounded();
let (send_tx, send_rx) = crossbeam_channel::unbounded();
let closed = Arc::new(AtomicBool::new(false));
let handle = StreamHandle {
dest,
tx: recv_tx,
rx: send_rx,
local_stream_id,
remote_stream_id,
closed: Arc::clone(&closed),
};
Ok((
Self {
tx: send_tx,
rx: recv_rx,
closed,
},
handle,
))
}
// pub fn new(tx: Sender<Vec<u8>>, rx: Receiver<Vec<u8>>) -> Self {
// Self { tx, rx }
// }
}
pub struct StreamHandle {
pub dest: String,
pub local_stream_id: usize,
pub remote_stream_id: usize,
tx: Sender<Vec<u8>>,
rx: Receiver<Vec<u8>>,
closed: Arc<AtomicBool>,
}
impl StreamHandle {
pub fn send(&self, data: Vec<u8>) -> Result<(), Error> {
Ok(self.tx.send(data)?)
}
pub fn has_content(&self) -> bool {
self.rx.len() > 0
}
pub fn recv(&self) -> Result<Vec<u8>, Error> {
Ok(self.rx.recv()?)
}
pub fn clone(&self) -> Result<Self, Error> {
Ok(Self {
dest: self.dest.clone(),
local_stream_id: self.local_stream_id.clone(),
remote_stream_id: self.remote_stream_id.clone(),
tx: self.tx.clone(),
rx: self.rx.clone(),
closed: Arc::clone(&self.closed),
})
}
pub fn close(self) {
drop(self.tx);
drop(self.rx);
self.closed.store(false, Ordering::Relaxed);
}
}
+29
View File
@@ -0,0 +1,29 @@
use bincode::{Decode, Encode};
use std::fmt::Debug;
#[derive(Debug, Encode, Decode, Clone)]
pub enum TransportLayerPacket {
RequestStreamUnrouted {
stream_id: usize,
},
AckStreamUnrouted {
ack_stream_id: usize,
stream_id: usize,
},
StreamDataUnrouted {
stream_id: usize,
data: Vec<u8>,
},
// SpontaniousDataUnrouted {
// data: Vec<u8>,
// },
}
#[derive(Debug, Encode, Decode, Clone)]
pub enum C2Packet {
Ping,
Pong,
CreatePTY { width: usize, height: usize },
PTYData,
}