mirror of
https://github.com/Astatin3/unshell-nodes-rs.git
synced 2026-06-09 00:28:00 -06:00
Reformat, add syscall streaming
This commit is contained in:
@@ -0,0 +1,11 @@
|
||||
[package]
|
||||
name = "unshell-rs-lib"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
base64 = "0.22.1"
|
||||
bincode = "2.0.1"
|
||||
crossbeam-channel = "0.5.15"
|
||||
log = "0.4.27"
|
||||
rand = "0.9.1"
|
||||
uuid = { version = "1.17.0", features = ["v4"] }
|
||||
@@ -0,0 +1,42 @@
|
||||
use crate::{
|
||||
Error,
|
||||
networkers::{Connection, ProtocolLayer},
|
||||
};
|
||||
use base64::{Engine as _, engine::general_purpose};
|
||||
|
||||
pub struct Base64Layer {
|
||||
inner: Box<dyn Connection>,
|
||||
}
|
||||
|
||||
impl Connection for Base64Layer {
|
||||
fn get_info(&self) -> String {
|
||||
format!("b64->{}", self.inner.get_info())
|
||||
}
|
||||
|
||||
fn is_alive(&self) -> bool {
|
||||
self.inner.is_alive()
|
||||
}
|
||||
|
||||
fn read(&mut self) -> Result<Vec<u8>, Error> {
|
||||
Ok(general_purpose::STANDARD
|
||||
.decode(&self.inner.read()?)
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
fn write(&mut self, data: &[u8]) -> Result<(), Error> {
|
||||
self.inner
|
||||
.write(general_purpose::STANDARD.encode(data).as_bytes())
|
||||
}
|
||||
|
||||
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, Error> {
|
||||
Ok(Box::new(Self {
|
||||
inner: self.inner.try_clone()?,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl ProtocolLayer for Base64Layer {
|
||||
fn new(inner: Box<dyn Connection>) -> Result<Self, Error> {
|
||||
Ok(Base64Layer { inner })
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
use crate::{
|
||||
Error,
|
||||
layers::{Base64Layer, HandshakeLayer, LayerConfig},
|
||||
networkers::{Connection, ProtocolLayer},
|
||||
};
|
||||
|
||||
impl Connection for Box<dyn Connection + Send + Sync> {
|
||||
fn get_info(&self) -> String {
|
||||
(**self).get_info()
|
||||
}
|
||||
|
||||
fn is_alive(&self) -> bool {
|
||||
(**self).is_alive()
|
||||
}
|
||||
|
||||
fn read(&mut self) -> Result<Vec<u8>, Error> {
|
||||
(**self).read()
|
||||
}
|
||||
|
||||
fn write(&mut self, data: &[u8]) -> Result<(), Error> {
|
||||
(**self).write(data)
|
||||
}
|
||||
|
||||
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, Error> {
|
||||
Ok(Box::new((**self).try_clone()?))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_client<C>(
|
||||
base_conn: C,
|
||||
layers: Vec<LayerConfig>,
|
||||
) -> Result<Box<dyn Connection + Send>, Error>
|
||||
where
|
||||
C: Connection + 'static,
|
||||
{
|
||||
let mut current_conn: Box<dyn Connection + Send> = Box::new(base_conn);
|
||||
|
||||
for layer_config in &layers {
|
||||
current_conn = match layer_config {
|
||||
LayerConfig::Base64 => Box::new(Base64Layer::new(current_conn)?),
|
||||
LayerConfig::Handshake => {
|
||||
let mut handshake_layer = HandshakeLayer::new(current_conn)?;
|
||||
handshake_layer.initialize_client()?;
|
||||
Box::new(handshake_layer)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Ok(current_conn)
|
||||
}
|
||||
|
||||
pub fn create_server_builder<C>(
|
||||
layers: Vec<LayerConfig>,
|
||||
) -> Result<Box<dyn Fn(C) -> Result<Box<dyn Connection + Send>, Error>>, Error>
|
||||
where
|
||||
C: Connection + 'static,
|
||||
{
|
||||
Ok(Box::new(
|
||||
move |base_conn: C| -> Result<Box<dyn Connection + Send>, Error> {
|
||||
let mut current_conn: Box<dyn Connection + Send> = Box::new(base_conn);
|
||||
|
||||
for layer_config in &layers {
|
||||
current_conn = match layer_config {
|
||||
LayerConfig::Base64 => Box::new(Base64Layer::new(current_conn)?),
|
||||
LayerConfig::Handshake => {
|
||||
let mut handshake_layer = HandshakeLayer::new(current_conn)?;
|
||||
handshake_layer.initialize_server()?;
|
||||
Box::new(handshake_layer)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Ok(current_conn)
|
||||
},
|
||||
))
|
||||
}
|
||||
@@ -0,0 +1,102 @@
|
||||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{AtomicBool, Ordering},
|
||||
};
|
||||
|
||||
use crate::networkers::{Connection, ProtocolLayer};
|
||||
|
||||
type Error = Box<dyn std::error::Error>;
|
||||
|
||||
// 4-Way Handshake Layer
|
||||
pub struct HandshakeLayer {
|
||||
inner: Box<dyn Connection>,
|
||||
finished_handshake: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl Connection for HandshakeLayer {
|
||||
fn get_info(&self) -> String {
|
||||
format!("handshake->{}", self.inner.get_info())
|
||||
}
|
||||
|
||||
fn is_alive(&self) -> bool {
|
||||
self.inner.is_alive()
|
||||
}
|
||||
|
||||
fn read(&mut self) -> Result<Vec<u8>, Error> {
|
||||
if !self.finished_handshake.load(Ordering::Relaxed) {
|
||||
return Err("NotComplete".into());
|
||||
}
|
||||
self.inner.read()
|
||||
}
|
||||
|
||||
fn write(&mut self, data: &[u8]) -> Result<(), Error> {
|
||||
if !self.finished_handshake.load(Ordering::Relaxed) {
|
||||
return Err("NotComplete".into());
|
||||
}
|
||||
self.inner.write(data)
|
||||
}
|
||||
|
||||
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, crate::Error> {
|
||||
Ok(Box::new(Self {
|
||||
inner: self.inner.try_clone()?,
|
||||
finished_handshake: Arc::clone(&self.finished_handshake.clone()),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl ProtocolLayer for HandshakeLayer {
|
||||
fn new(inner: Box<dyn Connection>) -> Result<Self, Error> {
|
||||
Ok(HandshakeLayer {
|
||||
inner,
|
||||
finished_handshake: Arc::new(AtomicBool::new(false)),
|
||||
})
|
||||
}
|
||||
|
||||
fn initialize_client(&mut self) -> Result<(), Error> {
|
||||
// Step 1: Client sends SYN
|
||||
self.inner.write("SYN".as_bytes())?;
|
||||
|
||||
// Step 2: Client receives SYN-ACK
|
||||
let response = self.inner.read()?;
|
||||
if response != "SYN-ACK".as_bytes() {
|
||||
return Err(format!("Expected SYN-ACK, got: {:?}", response).into());
|
||||
}
|
||||
|
||||
// Step 3: Client sends ACK
|
||||
self.inner.write("ACK".as_bytes())?;
|
||||
|
||||
// Step 4: Client receives FIN (final confirmation)
|
||||
let response = self.inner.read()?;
|
||||
if response != "FIN".as_bytes() {
|
||||
return Err(format!("Expected FIN, got: {:?}", response).into());
|
||||
}
|
||||
|
||||
info!("Handshake complete!");
|
||||
|
||||
self.finished_handshake.swap(true, Ordering::Relaxed);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn initialize_server(&mut self) -> Result<(), Error> {
|
||||
// Step 1: Server receives SYN
|
||||
let request = self.inner.read()?;
|
||||
if request != "SYN".as_bytes() {
|
||||
return Err(format!("Expected SYN, got: {:?}", request).into());
|
||||
}
|
||||
// Step 2: Server sends SYN-ACK
|
||||
self.inner.write("SYN-ACK".as_bytes())?;
|
||||
|
||||
// Step 3: Server receives ACK
|
||||
let response = self.inner.read()?;
|
||||
if response != "ACK".as_bytes() {
|
||||
return Err(format!("Expected ACK, got: {:?}", response).into());
|
||||
}
|
||||
|
||||
// Step 4: Server sends FIN (final confirmation)
|
||||
self.inner.write("FIN".as_bytes())?;
|
||||
info!("Handshake complete!");
|
||||
|
||||
self.finished_handshake.swap(true, Ordering::Relaxed);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
use bincode::{Decode, Encode};
|
||||
|
||||
#[derive(Encode, Decode, Debug, Clone)]
|
||||
pub enum LayerConfig {
|
||||
Base64,
|
||||
Handshake,
|
||||
}
|
||||
|
||||
mod base64;
|
||||
mod builder;
|
||||
mod handshake;
|
||||
|
||||
pub use base64::Base64Layer;
|
||||
pub use handshake::HandshakeLayer;
|
||||
|
||||
pub use builder::build_client;
|
||||
pub use builder::create_server_builder;
|
||||
@@ -0,0 +1,13 @@
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
pub type Error = Box<dyn std::error::Error>;
|
||||
|
||||
static BINCODE_CONFIG: bincode::config::Configuration = bincode::config::standard();
|
||||
|
||||
pub mod layers;
|
||||
pub mod networkers;
|
||||
pub mod nodes;
|
||||
mod packets;
|
||||
|
||||
pub use packets::C2Packet;
|
||||
@@ -0,0 +1,16 @@
|
||||
mod server;
|
||||
mod tcp;
|
||||
mod traits;
|
||||
|
||||
pub use tcp::TCPClient;
|
||||
pub use tcp::TCPConnection;
|
||||
pub use tcp::TCPServer;
|
||||
|
||||
// pub use traits::AsyncConnection;
|
||||
pub use traits::ClientTrait;
|
||||
pub use traits::Connection;
|
||||
pub use traits::ProtocolLayer;
|
||||
pub use traits::ServerTrait;
|
||||
|
||||
pub use server::run_listener;
|
||||
pub use server::run_listener_state;
|
||||
@@ -0,0 +1,78 @@
|
||||
use std::{sync::Arc, thread};
|
||||
|
||||
use crate::{
|
||||
layers::{LayerConfig, create_server_builder},
|
||||
networkers::{Connection, ServerTrait},
|
||||
};
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn run_listener_state<S, C, R, A>(
|
||||
server: S,
|
||||
layers: Vec<LayerConfig>,
|
||||
on_connect_callback: R,
|
||||
state: Arc<A>,
|
||||
)
|
||||
/*-> Arc<Mutex<Vec<C>>>*/
|
||||
where
|
||||
S: ServerTrait<C> + Sync + Send + 'static,
|
||||
C: Connection + 'static,
|
||||
R: Fn(Box<dyn Connection + Send + 'static>, Arc<A>) + Sync + Send + 'static,
|
||||
A: Sync + Send + 'static,
|
||||
{
|
||||
thread::spawn(move || {
|
||||
let layer_builder = create_server_builder::<C>(layers).unwrap();
|
||||
info!("Started listener {}", server.get_info());
|
||||
loop {
|
||||
match server.accept() {
|
||||
Ok(conn) => match layer_builder(conn) {
|
||||
Ok(conn) => {
|
||||
info!("New connection ({})", conn.get_info());
|
||||
on_connect_callback(conn, Arc::clone(&state));
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to create layers: {:?}", e);
|
||||
}
|
||||
},
|
||||
|
||||
Err(e) => {
|
||||
error!("Failed to accept connection: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn run_listener<S, C, R>(server: S, layers: Vec<LayerConfig>, on_connect_callback: R)
|
||||
/*-> Arc<Mutex<Vec<C>>>*/
|
||||
where
|
||||
S: ServerTrait<C> + Sync + Send + 'static,
|
||||
C: Connection + 'static,
|
||||
R: Fn(Box<dyn Connection + Send + 'static>) + Sync + Send + 'static,
|
||||
{
|
||||
// let clients: Arc<Mutex<Vec<C>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
// let clients_clone = Arc::clone(&clients);
|
||||
|
||||
thread::spawn(move || {
|
||||
let layer_builder = create_server_builder::<C>(layers).unwrap();
|
||||
|
||||
info!("Started listener {}", server.get_info());
|
||||
loop {
|
||||
match server.accept() {
|
||||
Ok(conn) => match layer_builder(conn) {
|
||||
Ok(conn) => {
|
||||
let con_info = conn.get_info();
|
||||
info!("New connection ({})", con_info);
|
||||
on_connect_callback(conn);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to create layers: {:?}", e);
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Failed to accept connection: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,131 @@
|
||||
use std::{
|
||||
io::{BufReader, Read, Write},
|
||||
net::{SocketAddr, TcpListener, TcpStream},
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicBool, Ordering},
|
||||
},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
Error,
|
||||
networkers::{ClientTrait, Connection, ServerTrait},
|
||||
};
|
||||
|
||||
pub struct TCPConnection {
|
||||
stream: TcpStream,
|
||||
reader: BufReader<TcpStream>,
|
||||
is_alive: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl Connection for TCPConnection {
|
||||
fn get_info(&self) -> String {
|
||||
format!(
|
||||
"tcp://{}",
|
||||
if let Ok(addr) = &self.stream.peer_addr() {
|
||||
addr.to_string()
|
||||
} else {
|
||||
"ERROR".to_string()
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
fn is_alive(&self) -> bool {
|
||||
self.is_alive.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn read(&mut self) -> Result<Vec<u8>, Error> {
|
||||
let mut len_bytes = [0u8; 4];
|
||||
|
||||
if let Err(e) = self.reader.read_exact(&mut len_bytes) {
|
||||
self.is_alive.swap(false, Ordering::Relaxed);
|
||||
return Err(format!("Stream disconnected! ({})", e).into());
|
||||
}
|
||||
|
||||
let len = u32::from_be_bytes(len_bytes) as usize;
|
||||
|
||||
let mut buffer = vec![0u8; len];
|
||||
|
||||
// In case the
|
||||
match self.reader.read_exact(&mut buffer) {
|
||||
Ok(()) => Ok(buffer.to_vec()),
|
||||
Err(e) => {
|
||||
self.is_alive.swap(false, Ordering::Relaxed);
|
||||
Err(format!("Stream disconnected! ({})", e).into())
|
||||
}
|
||||
}
|
||||
|
||||
// let mut buf = Vec::new();
|
||||
// let n = self.reader.read(&mut buf)?;
|
||||
|
||||
// Stream sends a null buffer if it is disconnected
|
||||
// if n == 0 {
|
||||
// self.is_alive.swap(false, Ordering::Relaxed);
|
||||
// }
|
||||
|
||||
// println!("Recieved: {}", line.trim_end().to_string());
|
||||
}
|
||||
|
||||
fn write(&mut self, data: &[u8]) -> Result<(), Error> {
|
||||
let len = data.len() as u32;
|
||||
self.stream.write_all(&len.to_be_bytes())?;
|
||||
self.stream.write_all(data)?;
|
||||
self.stream.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, Error> {
|
||||
Ok(Box::new(Self {
|
||||
stream: self.stream.try_clone()?,
|
||||
reader: BufReader::new(self.stream.try_clone()?),
|
||||
is_alive: Arc::clone(&self.is_alive),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TCPServer {
|
||||
listener: TcpListener,
|
||||
}
|
||||
|
||||
impl ServerTrait<TCPConnection> for TCPServer {
|
||||
fn get_info(&self) -> String {
|
||||
format!(
|
||||
"tcp://{}",
|
||||
if let Ok(addr) = &self.listener.local_addr() {
|
||||
addr.to_string()
|
||||
} else {
|
||||
"ERROR".to_string()
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
fn accept(&self) -> Result<TCPConnection, Error> {
|
||||
let (stream, _) = self.listener.accept()?;
|
||||
let reader = BufReader::new(stream.try_clone()?);
|
||||
Ok(TCPConnection {
|
||||
stream,
|
||||
reader,
|
||||
is_alive: Arc::new(AtomicBool::new(true)),
|
||||
})
|
||||
}
|
||||
|
||||
fn bind(address: &SocketAddr) -> Result<Self, Error> {
|
||||
let listener = TcpListener::bind(address)?;
|
||||
Ok(Self { listener })
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TCPClient;
|
||||
|
||||
impl ClientTrait<TCPConnection> for TCPClient {
|
||||
fn connect(address: &SocketAddr) -> Result<TCPConnection, Error> {
|
||||
let stream = TcpStream::connect(address)?;
|
||||
let reader = BufReader::new(stream.try_clone()?);
|
||||
let conn = TCPConnection {
|
||||
stream,
|
||||
reader,
|
||||
is_alive: Arc::new(AtomicBool::new(true)),
|
||||
};
|
||||
Ok(conn)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use crate::Error;
|
||||
|
||||
// This is the data transmission type
|
||||
pub trait Connection: Send + Sync {
|
||||
fn get_info(&self) -> String;
|
||||
fn is_alive(&self) -> bool;
|
||||
|
||||
fn read(&mut self) -> Result<Vec<u8>, Error>;
|
||||
fn write(&mut self, data: &[u8]) -> Result<(), Error>;
|
||||
|
||||
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, Error>;
|
||||
}
|
||||
|
||||
// Trait for protocol layers that can be initialized
|
||||
pub trait ProtocolLayer: Connection {
|
||||
fn new(inner: Box<dyn Connection>) -> Result<Self, Error>
|
||||
where
|
||||
Self: Sized;
|
||||
fn initialize_client(&mut self) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
fn initialize_server(&mut self) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ServerTrait<C: Connection> {
|
||||
fn get_info(&self) -> String;
|
||||
fn accept(&self) -> Result<C, Error>;
|
||||
fn bind(address: &SocketAddr) -> Result<Self, Error>
|
||||
where
|
||||
Self: Sized;
|
||||
}
|
||||
|
||||
pub trait ClientTrait<C: Connection + Sized> {
|
||||
fn connect(address: &SocketAddr) -> Result<C, Error>;
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use bincode::{Decode, Encode};
|
||||
|
||||
use crate::layers::LayerConfig;
|
||||
|
||||
#[derive(Encode, Decode, Debug, Clone)]
|
||||
pub struct ConnectionConfig {
|
||||
pub socket: SocketAddr,
|
||||
pub layers: Vec<LayerConfig>,
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
mod listener;
|
||||
mod node;
|
||||
mod node_container;
|
||||
mod packets;
|
||||
mod stream;
|
||||
|
||||
pub use listener::ConnectionConfig;
|
||||
pub use node::Node;
|
||||
pub use node_container::NodeContainer;
|
||||
pub use stream::Stream;
|
||||
@@ -0,0 +1,483 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::Debug,
|
||||
sync::{Arc, Mutex, MutexGuard},
|
||||
thread,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use bincode::{Decode, Encode};
|
||||
use crossbeam_channel::{Receiver, RecvError, Sender};
|
||||
// use std:::{Receiver, Sender};
|
||||
#[allow(deprecated)]
|
||||
use rand::{seq::IndexedRandom, thread_rng};
|
||||
|
||||
use crate::{
|
||||
Error,
|
||||
layers::build_client,
|
||||
networkers::{ClientTrait, Connection, ServerTrait, TCPClient, TCPServer, run_listener_state},
|
||||
nodes::{
|
||||
listener::ConnectionConfig,
|
||||
packets::{Packets, decode_vec, encode_vec},
|
||||
},
|
||||
};
|
||||
|
||||
fn read(c: &mut Box<dyn Connection + Send>) -> Result<Packets, Error> {
|
||||
Packets::decode(c.read()?.as_slice())
|
||||
}
|
||||
|
||||
fn write(c: &mut Box<dyn Connection + Send>, packet: Packets) -> Result<(), Error> {
|
||||
c.write(&(packet.encode()?))
|
||||
}
|
||||
|
||||
pub struct Node<P>
|
||||
where
|
||||
P: Encode + Decode<()> + Debug + Clone + 'static,
|
||||
{
|
||||
pub state: Arc<Mutex<NodeState<P>>>,
|
||||
rx: Receiver<(String, P)>,
|
||||
disconnect_rx: Receiver<String>,
|
||||
}
|
||||
|
||||
impl<P> Node<P>
|
||||
where
|
||||
P: Encode + Decode<()> + Debug + Clone + Send + 'static,
|
||||
{
|
||||
pub fn run_node(
|
||||
id: String,
|
||||
clients: Vec<ConnectionConfig>,
|
||||
listeners: Vec<ConnectionConfig>,
|
||||
) -> Result<Self, Error>
|
||||
where
|
||||
P: Encode + Decode<()> + Debug + Clone + 'static,
|
||||
{
|
||||
// let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?;
|
||||
|
||||
let (tx, rx) = crossbeam_channel::unbounded();
|
||||
let (disconnect_tx, disconnect_rx) = crossbeam_channel::unbounded();
|
||||
|
||||
let state = Arc::new(Mutex::new(NodeState::<P> {
|
||||
id: id, //Uuid::new_v4().to_string(), //TODO: Calling an OS RNG can pose a problem for security;
|
||||
connections: HashMap::new(),
|
||||
map: HashMap::new(),
|
||||
packet_listener: tx,
|
||||
disconnect_listener: disconnect_tx,
|
||||
}));
|
||||
|
||||
for listener in listeners {
|
||||
run_listener_state(
|
||||
TCPServer::bind(&listener.socket)?,
|
||||
listener.layers,
|
||||
Self::on_listener_client,
|
||||
Arc::clone(&state),
|
||||
);
|
||||
}
|
||||
|
||||
for client in clients {
|
||||
let state = Arc::clone(&state);
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
if let Err(e) = Self::run_client(client.clone(), &state) {
|
||||
error!("Could not connect to server; {:?}", e);
|
||||
}
|
||||
|
||||
thread::sleep(Duration::from_millis(1000));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
state,
|
||||
rx,
|
||||
disconnect_rx,
|
||||
})
|
||||
}
|
||||
|
||||
fn run_client(client: ConnectionConfig, state: &Arc<Mutex<NodeState<P>>>) -> Result<(), Error> {
|
||||
Self::run_connection(
|
||||
build_client(TCPClient::connect(&client.socket)?, client.layers)?,
|
||||
state,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn on_listener_client(
|
||||
connection: Box<dyn Connection + Send + 'static>,
|
||||
state: Arc<Mutex<NodeState<P>>>,
|
||||
) {
|
||||
thread::spawn(move || {
|
||||
if let Err(e) = Self::run_connection(connection, &state) {
|
||||
error!("Could not connect; {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn run_connection(
|
||||
connection: Box<dyn Connection + Send + 'static>,
|
||||
state: &Arc<Mutex<NodeState<P>>>,
|
||||
) -> Result<(), Error> {
|
||||
let mut connection = connection;
|
||||
let s = state.lock().unwrap();
|
||||
|
||||
let this_uuid = s.id.clone();
|
||||
std::mem::drop(s);
|
||||
|
||||
// Send UUID to new connection
|
||||
write(&mut connection, Packets::SyncUUID(this_uuid.clone()))?;
|
||||
|
||||
// Recieve UUID
|
||||
let uuid_result = read(&mut connection)?;
|
||||
let other_uuid = if let Packets::SyncUUID(source) = uuid_result {
|
||||
source
|
||||
} else {
|
||||
return Err(format!("Could not get UUID! Got {:?}", uuid_result).into());
|
||||
};
|
||||
|
||||
if (&mut state.lock().unwrap()).knows_client(&other_uuid) {
|
||||
write(&mut connection, Packets::ErrorNameExists)?;
|
||||
return Err(format!(
|
||||
"Attempted to accept connection from node {} which already exists!",
|
||||
other_uuid
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
info!("New Node! {} (direct)", other_uuid);
|
||||
|
||||
// Add connection
|
||||
(&mut state.lock().unwrap())
|
||||
.connections
|
||||
.insert(other_uuid.clone(), connection.try_clone()?);
|
||||
|
||||
// Update direct connections and the new connections with the new table
|
||||
(&mut state.lock().unwrap()).broadcast_table(None);
|
||||
|
||||
loop {
|
||||
match read(&mut connection) {
|
||||
Ok(packet) => {
|
||||
let result: Result<(), Error> =
|
||||
match packet {
|
||||
Packets::Disconnect { routes } => Ok(
|
||||
(&mut state.lock().unwrap()).disconnect(&other_uuid, routes, false)
|
||||
),
|
||||
Packets::Update { routes } => Ok((&mut state.lock().unwrap())
|
||||
.extend_routes(other_uuid.clone(), routes)),
|
||||
Packets::DataUnrouted {
|
||||
src: source,
|
||||
dest,
|
||||
data,
|
||||
} => (&mut state.lock().unwrap()).route_packet(source, dest, data),
|
||||
_ => {
|
||||
error!("Unsupported packet: {:?}", packet);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = result {
|
||||
error!("Could not parse; {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
if !connection.is_alive() {
|
||||
warn!("Connection {} Disconnected!", connection.get_info());
|
||||
let state = &mut state.lock().unwrap();
|
||||
state.connections.remove(&other_uuid);
|
||||
state.disconnect(&this_uuid, vec![other_uuid.clone()], true);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
error!("Could not read; {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn recv(&self) -> Result<(String, P), RecvError> {
|
||||
self.rx.recv()
|
||||
}
|
||||
|
||||
pub fn state(&self) -> MutexGuard<'_, NodeState<P>> {
|
||||
self.state.lock().unwrap()
|
||||
}
|
||||
|
||||
pub fn try_clone(&self) -> Result<Self, Error> {
|
||||
Ok(Self {
|
||||
state: Arc::clone(&self.state),
|
||||
rx: self.rx.clone(),
|
||||
disconnect_rx: self.disconnect_rx.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn send_unrouted(&self, dest: String, data: &P) -> Result<(), Error> {
|
||||
self.state().send_unrouted(dest, data)
|
||||
}
|
||||
|
||||
pub fn get_disconnect_rx(&self) -> Receiver<String> {
|
||||
self.disconnect_rx.clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NodeState<P>
|
||||
where
|
||||
P: Encode + Decode<()> + Debug + Clone + 'static,
|
||||
{
|
||||
id: String,
|
||||
connections: HashMap<String, Box<dyn Connection + Send>>,
|
||||
map: HashMap<String, Vec<String>>,
|
||||
packet_listener: Sender<(String, P)>,
|
||||
disconnect_listener: Sender<String>,
|
||||
}
|
||||
|
||||
impl<P> NodeState<P>
|
||||
where
|
||||
P: Encode + Decode<()> + Debug + Clone + Send + 'static,
|
||||
{
|
||||
// Get list of all nodes in map
|
||||
fn get_known_nodes(&self) -> Vec<String> {
|
||||
self.map.keys().map(|k| k.clone()).collect::<Vec<String>>()
|
||||
}
|
||||
|
||||
// Get list of node UUIDs that are directly connected to this node
|
||||
fn get_direct_nodes(&self) -> Vec<String> {
|
||||
self.connections
|
||||
.keys()
|
||||
.map(|k| k.clone())
|
||||
.collect::<Vec<String>>()
|
||||
}
|
||||
|
||||
fn knows_client(&self, id: &String) -> bool {
|
||||
self.get_all_nodes().contains(id)
|
||||
}
|
||||
|
||||
// Remove all nodes where the routes are empty
|
||||
fn remove_null_nodes(&mut self) {
|
||||
self.map.retain(|_, routes| !routes.is_empty());
|
||||
}
|
||||
|
||||
// Send packet to all directly connected nodes, except maybe one
|
||||
fn broadcast(&mut self, data: Packets, disclude: Option<&String>) {
|
||||
for (uuid, connection) in self.connections.iter_mut() {
|
||||
if disclude.is_some() && disclude.unwrap() == uuid {
|
||||
continue;
|
||||
}
|
||||
if let Err(e) = write(connection, data.clone()) {
|
||||
error!("Failed to send packet to {}, {}", uuid, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get list of nodes to send to another as known routes
|
||||
fn get_routes_to(&self, recv_uuid: &String) -> Vec<String> {
|
||||
let mut tx_routes: Vec<String> = Vec::new();
|
||||
|
||||
// Append
|
||||
for (map_uuid, routes) in self.map.iter() {
|
||||
// Do not transmit a route, which bounces directly back to the sender
|
||||
if routes.len() == 1 && &routes[0] == recv_uuid {
|
||||
continue;
|
||||
}
|
||||
|
||||
tx_routes.push(map_uuid.clone());
|
||||
}
|
||||
|
||||
// Append directly connected nodes
|
||||
tx_routes.append(&mut self.get_direct_nodes());
|
||||
|
||||
tx_routes
|
||||
}
|
||||
|
||||
fn broadcast_table(&mut self, disclude: Option<&String>) {
|
||||
let packets = self
|
||||
.connections
|
||||
.iter()
|
||||
.map(|(recv_uuid, _)| self.get_routes_to(&recv_uuid))
|
||||
.collect::<Vec<Vec<String>>>();
|
||||
|
||||
for (i, (recv_uuid, connection)) in self.connections.iter_mut().enumerate() {
|
||||
if let Some(disclude) = disclude {
|
||||
if disclude == recv_uuid {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(e) = write(
|
||||
connection,
|
||||
Packets::Update {
|
||||
routes: packets[i].clone(),
|
||||
},
|
||||
) {
|
||||
error!("Failed to send packet to {}, {}", recv_uuid, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn disconnect(&mut self, source: &String, routes: Vec<String>, direct: bool) {
|
||||
let mut resend_table = false;
|
||||
let mut remove_uuids = Vec::new();
|
||||
|
||||
for remove_uuid in routes {
|
||||
// Sanity check, in case the current client is still connected
|
||||
if self.get_direct_nodes().contains(&remove_uuid) {
|
||||
resend_table = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if client still exists, or if it was a direct connection
|
||||
// Prevents infinite network loops
|
||||
if direct || self.knows_client(&remove_uuid) {
|
||||
self.map.remove(&remove_uuid);
|
||||
remove_uuids.push(remove_uuid.clone());
|
||||
|
||||
info!(
|
||||
"Node disconnected! {} ({})",
|
||||
remove_uuid,
|
||||
if direct { "direct" } else { "indirect" }
|
||||
);
|
||||
|
||||
self.disconnect_listener.send(remove_uuid.clone()).unwrap();
|
||||
|
||||
for (uuid, route) in self.map.iter_mut() {
|
||||
if route.contains(&remove_uuid) {
|
||||
let index = route.iter().position(|r| r == &remove_uuid).unwrap();
|
||||
route.remove(index);
|
||||
remove_uuids.push(uuid.clone());
|
||||
}
|
||||
}
|
||||
|
||||
self.remove_null_nodes();
|
||||
}
|
||||
}
|
||||
|
||||
if !remove_uuids.is_empty() {
|
||||
self.broadcast(
|
||||
Packets::Disconnect {
|
||||
routes: remove_uuids,
|
||||
},
|
||||
Some(source),
|
||||
);
|
||||
}
|
||||
|
||||
if resend_table {
|
||||
self.broadcast_table(None);
|
||||
}
|
||||
|
||||
// self.print_map();
|
||||
}
|
||||
|
||||
fn extend_routes(&mut self, src: String, routes: Vec<String>) {
|
||||
let mut updated = false;
|
||||
|
||||
// Quick sanity check
|
||||
if !self.get_direct_nodes().contains(&src) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Loop through all of the routes in the new recieved route map
|
||||
for route in routes {
|
||||
// If the route loops back to self, disregard.
|
||||
if route == self.id {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If the connection is already established directly, disregard
|
||||
if self.get_direct_nodes().contains(&route) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If there is already an entry created for this route
|
||||
if self.map.contains_key(&route) {
|
||||
// If the route does not already contain the new one
|
||||
if !self.map.get(&route).unwrap().contains(&src) {
|
||||
// If the neighbor can be acessed directly, disregard
|
||||
self.map.get_mut(&route).unwrap().push(src.clone());
|
||||
info!("Node update: {} (indirect)", src);
|
||||
updated = true;
|
||||
} else {
|
||||
// Else, do nothing
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
// Else, create the new route entry
|
||||
self.map.insert(route.clone(), vec![src.clone()]);
|
||||
info!("Node update: {} (indirect)", src);
|
||||
updated = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Solves the case that if a remote node has said that a neighbor has connected before itself has
|
||||
let direct_connections = self.get_direct_nodes();
|
||||
for connection in direct_connections {
|
||||
if self.map.contains_key(&connection) {
|
||||
self.map.remove(&connection);
|
||||
}
|
||||
}
|
||||
|
||||
// If something has updated, rebroadcast
|
||||
// Prevents infinite network loops
|
||||
if updated {
|
||||
self.broadcast_table(Some(&src));
|
||||
}
|
||||
// self.print_map();
|
||||
}
|
||||
|
||||
fn route_packet(&mut self, src: String, dest: String, data: Vec<u8>) -> Result<(), Error> {
|
||||
if dest == self.id {
|
||||
self.packet_listener.send((src, decode_vec::<P>(&data)?))?;
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
if self.connections.contains_key(&dest) {
|
||||
write(
|
||||
self.connections.get_mut(&dest).unwrap(),
|
||||
Packets::DataUnrouted { src, dest, data },
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
} else if self.map.contains_key(&dest) {
|
||||
#[allow(deprecated)]
|
||||
let next_uuid = self
|
||||
.map
|
||||
.get(&dest)
|
||||
.unwrap()
|
||||
.choose(&mut thread_rng())
|
||||
.unwrap()
|
||||
.clone();
|
||||
|
||||
write(
|
||||
self.connections.get_mut(&next_uuid).unwrap(),
|
||||
Packets::DataUnrouted { src, dest, data },
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
Err::<(), Error>(format!("Could not find route from {} to {}!", src, dest).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_unrouted(&mut self, dest: String, data: &P) -> Result<(), Error> {
|
||||
self.route_packet(self.id.clone(), dest, encode_vec(data)?)
|
||||
}
|
||||
|
||||
pub fn get_all_nodes(&self) -> Vec<String> {
|
||||
let mut uuids = self.get_known_nodes();
|
||||
uuids.append(&mut self.get_direct_nodes());
|
||||
uuids
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn print_map(&self) {
|
||||
info!("\n\n");
|
||||
info!("Local addr: {}", self.id);
|
||||
info!("Table: ");
|
||||
for (uuid, route) in self.map.iter() {
|
||||
info!("{} -> [ {:?} ]", uuid, route);
|
||||
}
|
||||
info!("Direct: {:?}", self.get_direct_nodes());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,183 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex},
|
||||
thread,
|
||||
};
|
||||
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
|
||||
use crate::{
|
||||
Error,
|
||||
nodes::{ConnectionConfig, Node, Stream, stream::StreamHandle},
|
||||
packets::TransportLayerPacket,
|
||||
};
|
||||
|
||||
type Streams = Arc<Mutex<HashMap<(usize, String), StreamHandle>>>;
|
||||
|
||||
pub struct NodeContainer {
|
||||
streams: Streams,
|
||||
node: Node<TransportLayerPacket>,
|
||||
on_stream_rx: Receiver<Stream>,
|
||||
// state: Arc<Mutex<NodeState<TransportLayerPacket>>>,
|
||||
// spontanious_rx: Receiver<(String, C2Packet)>,
|
||||
}
|
||||
|
||||
impl NodeContainer {
|
||||
pub fn connect(
|
||||
id: String,
|
||||
clients: Vec<ConnectionConfig>,
|
||||
listeners: Vec<ConnectionConfig>,
|
||||
) -> Result<Self, Error> {
|
||||
let node = Node::<TransportLayerPacket>::run_node(id, clients, listeners)?;
|
||||
let streams = Arc::new(Mutex::new(HashMap::new()));
|
||||
// let (spontanious_tx, spontanious_rx) = crossbeam_channel::unbounded();
|
||||
let (on_stream_tx, on_stream_rx) = crossbeam_channel::unbounded();
|
||||
|
||||
let s = Self {
|
||||
streams: Arc::clone(&streams),
|
||||
node: node.try_clone()?,
|
||||
on_stream_rx,
|
||||
};
|
||||
|
||||
let close_rx = node.get_disconnect_rx();
|
||||
let stream_clone = Arc::clone(&streams);
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
let close_uuid = close_rx.recv().unwrap();
|
||||
let stream = stream_clone.lock().unwrap();
|
||||
let keys = stream.keys();
|
||||
for key in keys {
|
||||
if key.1 == close_uuid {
|
||||
warn!("Stream ({}, {}) disconnected!", key.0, key.1);
|
||||
let handle = (&mut stream_clone.lock().unwrap()).remove(key).unwrap();
|
||||
handle.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Start node listening thread
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
if let Err(e) = Self::node_listening_thread(&node, &streams, &on_stream_tx) {
|
||||
error!("Got error: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(s)
|
||||
}
|
||||
|
||||
fn node_listening_thread(
|
||||
node: &Node<TransportLayerPacket>,
|
||||
streams: &Streams,
|
||||
on_stream_tx: &Sender<Stream>, // spontanious_tx: &Sender<(String, C2Packet)>,
|
||||
) -> Result<(), Error> {
|
||||
// info!("Loop");
|
||||
let (src, packet) = node.recv()?;
|
||||
info!("Packet: {:?}", packet);
|
||||
|
||||
match packet {
|
||||
TransportLayerPacket::RequestStreamUnrouted {
|
||||
stream_id: remote_stream_id,
|
||||
} => {
|
||||
// Create stream ID
|
||||
let local_stream_id = streams.lock().unwrap().keys().len();
|
||||
// Send response to server including local id and remoe ID
|
||||
Stream::respond_create(src.clone(), local_stream_id, remote_stream_id, node)?;
|
||||
|
||||
Self::create_handle_thread(
|
||||
on_stream_tx,
|
||||
streams,
|
||||
node,
|
||||
src,
|
||||
local_stream_id,
|
||||
remote_stream_id,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
TransportLayerPacket::AckStreamUnrouted {
|
||||
ack_stream_id,
|
||||
stream_id,
|
||||
} => {
|
||||
Self::create_handle_thread(
|
||||
on_stream_tx,
|
||||
streams,
|
||||
node,
|
||||
src,
|
||||
ack_stream_id,
|
||||
stream_id,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
TransportLayerPacket::StreamDataUnrouted { stream_id, data } => {
|
||||
match streams.lock().unwrap().get(&(stream_id, src.clone())) {
|
||||
Some(handle) => Ok(handle.send(data).unwrap()),
|
||||
// Some(_) => Err(format!(
|
||||
// "Stream {}, {} has not been initilized!",
|
||||
// stream_id, src
|
||||
// )
|
||||
// .into()),
|
||||
None => Err(format!("Stream {}, {} does not exist!", stream_id, src).into()),
|
||||
}
|
||||
} // _ => Err(format!("Unsupported packet: {:?}", packet).into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn create_handle_thread(
|
||||
on_stream_tx: &Sender<Stream>,
|
||||
streams: &Streams,
|
||||
node: &Node<TransportLayerPacket>,
|
||||
src: String,
|
||||
local_stream_id: usize,
|
||||
remote_stream_id: usize,
|
||||
) -> Result<(), Error> {
|
||||
info!("Local: {}, Remote: {}", local_stream_id, remote_stream_id);
|
||||
|
||||
// Create stream from local and remote stream handles
|
||||
let (stream, handle) =
|
||||
Stream::create_handle(src.clone(), local_stream_id, remote_stream_id)?;
|
||||
|
||||
on_stream_tx.send(stream)?;
|
||||
|
||||
// Add the local stream to map
|
||||
streams
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert((local_stream_id, src.clone()), handle.clone()?);
|
||||
|
||||
let node_clone = node.try_clone()?;
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
let data = handle.recv().unwrap();
|
||||
if let Err(e) = node_clone.state().send_unrouted(
|
||||
src.clone(),
|
||||
&TransportLayerPacket::StreamDataUnrouted {
|
||||
stream_id: remote_stream_id,
|
||||
data,
|
||||
},
|
||||
) {
|
||||
error!("Got error: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn create_stream_block(&self, dest: String) -> Result<Stream, Error> {
|
||||
let local_stream_id = self.streams.lock().unwrap().keys().len();
|
||||
Stream::ask_create(dest.clone(), local_stream_id, &self.node)?;
|
||||
Ok(self.on_stream_rx.recv()?)
|
||||
}
|
||||
|
||||
pub fn recv_stream(&self) -> Result<Stream, Error> {
|
||||
Ok(self.on_stream_rx.recv()?)
|
||||
}
|
||||
|
||||
pub fn get_nodes(&self) -> Vec<String> {
|
||||
self.node.state().get_all_nodes()
|
||||
// self.state.lock().unwrap().get_all_nodes()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use bincode::{Decode, Encode, config::Configuration};
|
||||
|
||||
use crate::Error;
|
||||
|
||||
#[derive(Debug, Encode, Decode, Clone)]
|
||||
pub enum Packets {
|
||||
SyncUUID(String),
|
||||
Update {
|
||||
routes: Vec<String>,
|
||||
},
|
||||
Disconnect {
|
||||
routes: Vec<String>,
|
||||
},
|
||||
|
||||
// Send single data packet without routing details
|
||||
DataUnrouted {
|
||||
src: String,
|
||||
dest: String,
|
||||
data: Vec<u8>,
|
||||
},
|
||||
// Send single data packet with routing details
|
||||
DataRouted {
|
||||
path: Vec<String>,
|
||||
data: Vec<u8>,
|
||||
},
|
||||
|
||||
// DataStreamRouted {
|
||||
// path: Vec<String>,
|
||||
// data: Vec<u8>,
|
||||
// },
|
||||
ErrorNameExists,
|
||||
}
|
||||
|
||||
impl Packets {
|
||||
pub fn encode(&self) -> Result<Vec<u8>, Error> {
|
||||
encode_vec(self)
|
||||
}
|
||||
pub fn decode(data: &[u8]) -> Result<Self, Error> {
|
||||
decode_vec(data)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn encode_vec<P>(object: &P) -> Result<Vec<u8>, Error>
|
||||
where
|
||||
P: Encode + Decode<()> + Debug + Clone + 'static,
|
||||
{
|
||||
Ok(bincode::encode_to_vec(object, crate::BINCODE_CONFIG)?)
|
||||
}
|
||||
|
||||
pub fn decode_vec<P>(data: &[u8]) -> Result<P, Error>
|
||||
where
|
||||
P: Encode + Decode<()> + Debug + Clone + 'static,
|
||||
{
|
||||
let (decoded, _) =
|
||||
bincode::decode_from_slice::<P, Configuration>(&data[..], crate::BINCODE_CONFIG)?;
|
||||
|
||||
Ok(decoded)
|
||||
}
|
||||
@@ -0,0 +1,155 @@
|
||||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{AtomicBool, Ordering},
|
||||
};
|
||||
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
|
||||
use crate::{Error, networkers::Connection, nodes::Node, packets::TransportLayerPacket};
|
||||
|
||||
pub struct Stream {
|
||||
tx: Sender<Vec<u8>>,
|
||||
rx: Receiver<Vec<u8>>,
|
||||
closed: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl Connection for Stream {
|
||||
fn get_info(&self) -> String {
|
||||
"unrouted".to_string()
|
||||
}
|
||||
|
||||
fn is_alive(&self) -> bool {
|
||||
self.closed.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn read(&mut self) -> Result<Vec<u8>, crate::Error> {
|
||||
if self.closed.load(Ordering::Relaxed) {
|
||||
Err("Connection closed".into())
|
||||
} else {
|
||||
Ok(self.rx.recv()?)
|
||||
}
|
||||
}
|
||||
|
||||
fn write(&mut self, data: &[u8]) -> Result<(), crate::Error> {
|
||||
if self.closed.load(Ordering::Relaxed) {
|
||||
Err("Connection closed".into())
|
||||
} else {
|
||||
Ok(self.tx.send(data.to_vec())?)
|
||||
}
|
||||
}
|
||||
|
||||
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, crate::Error> {
|
||||
Ok(Box::new(Self {
|
||||
tx: self.tx.clone(),
|
||||
rx: self.rx.clone(),
|
||||
closed: Arc::clone(&self.closed),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream {
|
||||
pub fn ask_create(
|
||||
dest: String,
|
||||
local_stream_id: usize,
|
||||
node: &Node<TransportLayerPacket>,
|
||||
) -> Result<(), Error> {
|
||||
info!("Sent to {}", dest);
|
||||
node.send_unrouted(
|
||||
dest.clone(),
|
||||
&TransportLayerPacket::RequestStreamUnrouted {
|
||||
stream_id: local_stream_id,
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
|
||||
// Self::create_handle(dest, local_stream_id, remote_stream_id, node)
|
||||
}
|
||||
|
||||
pub fn respond_create(
|
||||
dest: String,
|
||||
local_stream_id: usize,
|
||||
remote_stream_id: usize,
|
||||
node: &Node<TransportLayerPacket>,
|
||||
) -> Result<(), Error> {
|
||||
node.send_unrouted(
|
||||
dest.clone(),
|
||||
&&TransportLayerPacket::AckStreamUnrouted {
|
||||
ack_stream_id: remote_stream_id,
|
||||
stream_id: local_stream_id,
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn create_handle(
|
||||
dest: String,
|
||||
local_stream_id: usize,
|
||||
remote_stream_id: usize,
|
||||
) -> Result<(Self, StreamHandle), Error> {
|
||||
let (recv_tx, recv_rx) = crossbeam_channel::unbounded();
|
||||
let (send_tx, send_rx) = crossbeam_channel::unbounded();
|
||||
|
||||
let closed = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let handle = StreamHandle {
|
||||
dest,
|
||||
tx: recv_tx,
|
||||
rx: send_rx,
|
||||
local_stream_id,
|
||||
remote_stream_id,
|
||||
closed: Arc::clone(&closed),
|
||||
};
|
||||
|
||||
Ok((
|
||||
Self {
|
||||
tx: send_tx,
|
||||
rx: recv_rx,
|
||||
closed,
|
||||
},
|
||||
handle,
|
||||
))
|
||||
}
|
||||
|
||||
// pub fn new(tx: Sender<Vec<u8>>, rx: Receiver<Vec<u8>>) -> Self {
|
||||
// Self { tx, rx }
|
||||
// }
|
||||
}
|
||||
|
||||
pub struct StreamHandle {
|
||||
pub dest: String,
|
||||
pub local_stream_id: usize,
|
||||
pub remote_stream_id: usize,
|
||||
|
||||
tx: Sender<Vec<u8>>,
|
||||
rx: Receiver<Vec<u8>>,
|
||||
closed: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl StreamHandle {
|
||||
pub fn send(&self, data: Vec<u8>) -> Result<(), Error> {
|
||||
Ok(self.tx.send(data)?)
|
||||
}
|
||||
pub fn has_content(&self) -> bool {
|
||||
self.rx.len() > 0
|
||||
}
|
||||
pub fn recv(&self) -> Result<Vec<u8>, Error> {
|
||||
Ok(self.rx.recv()?)
|
||||
}
|
||||
pub fn clone(&self) -> Result<Self, Error> {
|
||||
Ok(Self {
|
||||
dest: self.dest.clone(),
|
||||
local_stream_id: self.local_stream_id.clone(),
|
||||
remote_stream_id: self.remote_stream_id.clone(),
|
||||
tx: self.tx.clone(),
|
||||
rx: self.rx.clone(),
|
||||
closed: Arc::clone(&self.closed),
|
||||
})
|
||||
}
|
||||
pub fn close(self) {
|
||||
drop(self.tx);
|
||||
drop(self.rx);
|
||||
self.closed.store(false, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
use bincode::{Decode, Encode};
|
||||
use std::fmt::Debug;
|
||||
|
||||
#[derive(Debug, Encode, Decode, Clone)]
|
||||
pub enum TransportLayerPacket {
|
||||
RequestStreamUnrouted {
|
||||
stream_id: usize,
|
||||
},
|
||||
AckStreamUnrouted {
|
||||
ack_stream_id: usize,
|
||||
stream_id: usize,
|
||||
},
|
||||
StreamDataUnrouted {
|
||||
stream_id: usize,
|
||||
data: Vec<u8>,
|
||||
},
|
||||
// SpontaniousDataUnrouted {
|
||||
// data: Vec<u8>,
|
||||
// },
|
||||
}
|
||||
|
||||
#[derive(Debug, Encode, Decode, Clone)]
|
||||
pub enum C2Packet {
|
||||
Ping,
|
||||
Pong,
|
||||
|
||||
CreatePTY { width: usize, height: usize },
|
||||
PTYData,
|
||||
}
|
||||
Reference in New Issue
Block a user