Work on,making streams

This commit is contained in:
Michael Mikovsky
2025-06-16 13:20:36 -06:00
parent f26b739d43
commit 1ed6ff2d9a
6 changed files with 455 additions and 154 deletions
+46 -10
View File
@@ -1,15 +1,13 @@
use std::{
collections::HashMap,
fmt::Debug,
sync::{
Arc, Mutex,
mpsc::{self, Receiver, Sender},
},
sync::{Arc, Mutex, MutexGuard},
thread,
time::Duration,
};
use bincode::{Decode, Encode};
use crossbeam_channel::{Receiver, RecvError, Sender};
// use std:::{Receiver, Sender};
#[allow(deprecated)]
use rand::{seq::IndexedRandom, thread_rng};
@@ -37,7 +35,8 @@ where
P: Encode + Decode<()> + Debug + Clone + 'static,
{
pub state: Arc<Mutex<NodeState<P>>>,
pub rx: Receiver<(String, P)>,
rx: Receiver<(String, P)>,
disconnect_rx: Receiver<String>,
}
impl<P> Node<P>
@@ -54,13 +53,15 @@ where
{
// let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?;
let (tx, rx) = mpsc::channel();
let (tx, rx) = crossbeam_channel::unbounded();
let (disconnect_tx, disconnect_rx) = crossbeam_channel::unbounded();
let state = Arc::new(Mutex::new(NodeState::<P> {
id: id, //Uuid::new_v4().to_string(), //TODO: Calling an OS RNG can pose a problem for security;
connections: HashMap::new(),
map: HashMap::new(),
packet_listener: tx,
disconnect_listener: disconnect_tx,
}));
for listener in listeners {
@@ -85,7 +86,11 @@ where
});
}
Ok(Self { state, rx })
Ok(Self {
state,
rx,
disconnect_rx,
})
}
fn run_client(client: ConnectionConfig, state: &Arc<Mutex<NodeState<P>>>) -> Result<(), Error> {
@@ -191,6 +196,30 @@ where
Ok(())
}
pub fn recv(&self) -> Result<(String, P), RecvError> {
self.rx.recv()
}
pub fn state(&self) -> MutexGuard<'_, NodeState<P>> {
self.state.lock().unwrap()
}
pub fn try_clone(&self) -> Result<Self, Error> {
Ok(Self {
state: Arc::clone(&self.state),
rx: self.rx.clone(),
disconnect_rx: self.disconnect_rx.clone(),
})
}
pub fn send_unrouted(&self, dest: String, data: &P) -> Result<(), Error> {
self.state().send_unrouted(dest, data)
}
pub fn get_disconnect_rx(&self) -> Receiver<String> {
self.disconnect_rx.clone()
}
}
pub struct NodeState<P>
@@ -201,6 +230,7 @@ where
connections: HashMap<String, Box<dyn Connection + Send>>,
map: HashMap<String, Vec<String>>,
packet_listener: Sender<(String, P)>,
disconnect_listener: Sender<String>,
}
impl<P> NodeState<P>
@@ -309,6 +339,8 @@ where
if direct { "direct" } else { "indirect" }
);
self.disconnect_listener.send(remove_uuid.clone()).unwrap();
for (uuid, route) in self.map.iter_mut() {
if route.contains(&remove_uuid) {
let index = route.iter().position(|r| r == &remove_uuid).unwrap();
@@ -396,12 +428,16 @@ where
fn route_packet(&mut self, src: String, dest: String, data: Vec<u8>) -> Result<(), Error> {
if dest == self.id {
self.packet_listener.send((src, decode_vec::<P>(&data)?))?;
Ok(())
} else {
if self.connections.contains_key(&dest) {
write(
self.connections.get_mut(&dest).unwrap(),
Packets::DataUnrouted { src, dest, data },
)?;
Ok(())
} else if self.map.contains_key(&dest) {
#[allow(deprecated)]
let next_uuid = self
@@ -416,12 +452,12 @@ where
self.connections.get_mut(&next_uuid).unwrap(),
Packets::DataUnrouted { src, dest, data },
)?;
Ok(())
} else {
error!("Could not find route from {} to {}!", src, dest);
Err::<(), Error>(format!("Could not find route from {} to {}!", src, dest).into())
}
}
Ok(())
}
pub fn send_unrouted(&mut self, dest: String, data: &P) -> Result<(), Error> {
+110 -78
View File
@@ -7,21 +7,19 @@ use std::{
use crossbeam_channel::{Receiver, Sender};
use crate::{
C2Packet, Error,
nodes::{
ConnectionConfig, Node, Stream,
node::NodeState,
packets::{decode_vec, encode_vec},
},
Error,
nodes::{ConnectionConfig, Node, Stream, stream::StreamHandle},
packets::TransportLayerPacket,
};
type Streams = Arc<Mutex<HashMap<(usize, String), Option<(Stream, Sender<Vec<u8>>)>>>>;
type Streams = Arc<Mutex<HashMap<(usize, String), StreamHandle>>>;
pub struct NodeContainer {
streams: Streams,
state: Arc<Mutex<NodeState<TransportLayerPacket>>>,
spontanious_rx: Receiver<(String, C2Packet)>,
node: Node<TransportLayerPacket>,
on_stream_rx: Receiver<Stream>,
// state: Arc<Mutex<NodeState<TransportLayerPacket>>>,
// spontanious_rx: Receiver<(String, C2Packet)>,
}
impl NodeContainer {
@@ -30,20 +28,38 @@ impl NodeContainer {
clients: Vec<ConnectionConfig>,
listeners: Vec<ConnectionConfig>,
) -> Result<Self, Error> {
let node = Node::run_node(id, clients, listeners)?;
let node = Node::<TransportLayerPacket>::run_node(id, clients, listeners)?;
let streams = Arc::new(Mutex::new(HashMap::new()));
let (spontanious_tx, spontanious_rx) = crossbeam_channel::unbounded();
// let (spontanious_tx, spontanious_rx) = crossbeam_channel::unbounded();
let (on_stream_tx, on_stream_rx) = crossbeam_channel::unbounded();
let s = Self {
streams: Arc::clone(&streams),
state: Arc::clone(&node.state),
spontanious_rx,
node: node.try_clone()?,
on_stream_rx,
};
let close_rx = node.get_disconnect_rx();
let stream_clone = Arc::clone(&streams);
thread::spawn(move || {
loop {
let close_uuid = close_rx.recv().unwrap();
let stream = stream_clone.lock().unwrap();
let keys = stream.keys();
for key in keys {
if key.1 == close_uuid {
warn!("Stream ({}, {}) disconnected!", key.0, key.1);
let handle = (&mut stream_clone.lock().unwrap()).remove(key).unwrap();
handle.close();
}
}
}
});
// Start node listening thread
thread::spawn(move || {
loop {
if let Err(e) = Self::node_listening_thread(&node, &streams, &spontanious_tx) {
if let Err(e) = Self::node_listening_thread(&node, &streams, &on_stream_tx) {
error!("Got error: {}", e);
}
}
@@ -55,97 +71,113 @@ impl NodeContainer {
fn node_listening_thread(
node: &Node<TransportLayerPacket>,
streams: &Streams,
spontanious_tx: &Sender<(String, C2Packet)>,
on_stream_tx: &Sender<Stream>, // spontanious_tx: &Sender<(String, C2Packet)>,
) -> Result<(), Error> {
let (src, packet) = node.rx.recv()?;
// info!("Loop");
let (src, packet) = node.recv()?;
info!("Packet: {:?}", packet);
match packet {
TransportLayerPacket::RequestStreamUnrouted { stream_id } => {
TransportLayerPacket::RequestStreamUnrouted {
stream_id: remote_stream_id,
} => {
// Create stream ID
let local_stream_id = streams.lock().unwrap().keys().len();
streams
.lock()
.unwrap()
.insert((local_stream_id, src.clone()), None);
(&mut node.state.lock().unwrap()).send_unrouted(
src,
&TransportLayerPacket::AckStreamUnrouted {
local_stream_id,
remote_stream_id: stream_id,
},
)?;
// Send response to server including local id and remoe ID
Stream::respond_create(src.clone(), local_stream_id, remote_stream_id, node)?;
Self::create_handle_thread(
on_stream_tx,
streams,
node,
src,
local_stream_id,
remote_stream_id,
)?;
Ok(())
}
TransportLayerPacket::AckStreamUnrouted {
local_stream_id,
remote_stream_id,
ack_stream_id,
stream_id,
} => {
let key = &(remote_stream_id, src);
if let Some(stream_mut) = streams.lock().unwrap().get_mut(&key) {
if stream_mut.is_none() {
let stream = Self::create_stream(local_stream_id, node, src, stream_mut)?;
Ok(())
} else {
Err(format!("Stream {:?} already exists!", key).into())
}
} else {
Err(format!("Could not find stream id by {:?}", key).into())
}
}
TransportLayerPacket::StreamDataUnrouted { stream_id, data } => todo!(),
TransportLayerPacket::SpontaniousDataUnrouted { data } => {
spontanious_tx.send((src, decode_vec::<C2Packet>(&data)?))?;
Self::create_handle_thread(
on_stream_tx,
streams,
node,
src,
ack_stream_id,
stream_id,
)?;
Ok(())
}
TransportLayerPacket::StreamDataUnrouted { stream_id, data } => {
match streams.lock().unwrap().get(&(stream_id, src.clone())) {
Some(handle) => Ok(handle.send(data).unwrap()),
// Some(_) => Err(format!(
// "Stream {}, {} has not been initilized!",
// stream_id, src
// )
// .into()),
None => Err(format!("Stream {}, {} does not exist!", stream_id, src).into()),
}
} // _ => Err(format!("Unsupported packet: {:?}", packet).into()),
}
}
fn create_stream(
remote_stream_id: usize,
dest: String,
fn create_handle_thread(
on_stream_tx: &Sender<Stream>,
streams: &Streams,
node: &Node<TransportLayerPacket>,
stream_mut: &mut Option<(Stream, Sender<Vec<u8>>)>,
src: String,
local_stream_id: usize,
remote_stream_id: usize,
) -> Result<(), Error> {
let (recv_tx, recv_rx) = crossbeam_channel::unbounded();
let (send_tx, send_rx) = crossbeam_channel::unbounded();
info!("Local: {}, Remote: {}", local_stream_id, remote_stream_id);
let stream = Stream::new(send_tx, recv_rx);
// Create stream from local and remote stream handles
let (stream, handle) =
Stream::create_handle(src.clone(), local_stream_id, remote_stream_id)?;
let _ = stream_mut.insert((stream, recv_tx));
on_stream_tx.send(stream)?;
// Add the local stream to map
streams
.lock()
.unwrap()
.insert((local_stream_id, src.clone()), handle.clone()?);
let node_clone = node.try_clone()?;
thread::spawn(move || {
loop {
let packet = send_rx.recv().unwrap();
(&mut node.state.lock().unwrap())
.send_unrouted(
dest,
&TransportLayerPacket::StreamDataUnrouted {
stream_id: remote_stream_id,
data: packet,
},
)
.unwrap();
let data = handle.recv().unwrap();
if let Err(e) = node_clone.state().send_unrouted(
src.clone(),
&TransportLayerPacket::StreamDataUnrouted {
stream_id: remote_stream_id,
data,
},
) {
error!("Got error: {}", e);
break;
}
}
});
Ok(())
}
pub fn create_stream_block(&self, dest: String) -> Result<Stream, Error> {
let local_stream_id = self.streams.lock().unwrap().keys().len();
Stream::ask_create(dest.clone(), local_stream_id, &self.node)?;
Ok(self.on_stream_rx.recv()?)
}
pub fn recv_stream(&self) -> Result<Stream, Error> {
Ok(self.on_stream_rx.recv()?)
}
pub fn get_nodes(&self) -> Vec<String> {
self.state.lock().unwrap().get_all_nodes()
}
pub fn send_unrouted(&self, dest: &String, data: &C2Packet) -> Result<(), Error> {
(&mut self.state.lock().unwrap()).send_unrouted(
dest.clone(),
&TransportLayerPacket::SpontaniousDataUnrouted {
data: encode_vec(data)?,
},
)?;
Ok(())
}
pub fn read_packet(&self) -> Result<(String, C2Packet), Error> {
Ok(self.spontanious_rx.recv()?)
self.node.state().get_all_nodes()
// self.state.lock().unwrap().get_all_nodes()
}
}
+126 -7
View File
@@ -1,10 +1,16 @@
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use crossbeam_channel::{Receiver, Sender};
use crate::networkers::Connection;
use crate::{Error, networkers::Connection, nodes::Node, packets::TransportLayerPacket};
pub struct Stream {
tx: Sender<Vec<u8>>,
rx: Receiver<Vec<u8>>,
closed: Arc<AtomicBool>,
}
impl Connection for Stream {
@@ -13,24 +19,137 @@ impl Connection for Stream {
}
fn is_alive(&self) -> bool {
true
self.closed.load(Ordering::Relaxed)
}
fn read(&mut self) -> Result<Vec<u8>, crate::Error> {
Ok(self.rx.recv()?)
if self.closed.load(Ordering::Relaxed) {
Err("Connection closed".into())
} else {
Ok(self.rx.recv()?)
}
}
fn write(&mut self, data: &[u8]) -> Result<(), crate::Error> {
Ok(self.tx.send(data.to_vec())?)
if self.closed.load(Ordering::Relaxed) {
Err("Connection closed".into())
} else {
Ok(self.tx.send(data.to_vec())?)
}
}
fn try_clone(&self) -> Result<Box<dyn Connection + Send + Sync>, crate::Error> {
todo!()
Ok(Box::new(Self {
tx: self.tx.clone(),
rx: self.rx.clone(),
closed: Arc::clone(&self.closed),
}))
}
}
impl Stream {
pub fn new(tx: Sender<Vec<u8>>, rx: Receiver<Vec<u8>>) -> Self {
Self { tx, rx }
pub fn ask_create(
dest: String,
local_stream_id: usize,
node: &Node<TransportLayerPacket>,
) -> Result<(), Error> {
info!("Sent to {}", dest);
node.send_unrouted(
dest.clone(),
&TransportLayerPacket::RequestStreamUnrouted {
stream_id: local_stream_id,
},
)?;
Ok(())
// Self::create_handle(dest, local_stream_id, remote_stream_id, node)
}
pub fn respond_create(
dest: String,
local_stream_id: usize,
remote_stream_id: usize,
node: &Node<TransportLayerPacket>,
) -> Result<(), Error> {
node.send_unrouted(
dest.clone(),
&&TransportLayerPacket::AckStreamUnrouted {
ack_stream_id: remote_stream_id,
stream_id: local_stream_id,
},
)?;
Ok(())
}
pub fn create_handle(
dest: String,
local_stream_id: usize,
remote_stream_id: usize,
) -> Result<(Self, StreamHandle), Error> {
let (recv_tx, recv_rx) = crossbeam_channel::unbounded();
let (send_tx, send_rx) = crossbeam_channel::unbounded();
let closed = Arc::new(AtomicBool::new(false));
let handle = StreamHandle {
dest,
tx: recv_tx,
rx: send_rx,
local_stream_id,
remote_stream_id,
closed: Arc::clone(&closed),
};
Ok((
Self {
tx: send_tx,
rx: recv_rx,
closed,
},
handle,
))
}
// pub fn new(tx: Sender<Vec<u8>>, rx: Receiver<Vec<u8>>) -> Self {
// Self { tx, rx }
// }
}
pub struct StreamHandle {
pub dest: String,
pub local_stream_id: usize,
pub remote_stream_id: usize,
tx: Sender<Vec<u8>>,
rx: Receiver<Vec<u8>>,
closed: Arc<AtomicBool>,
}
impl StreamHandle {
pub fn send(&self, data: Vec<u8>) -> Result<(), Error> {
Ok(self.tx.send(data)?)
}
pub fn has_content(&self) -> bool {
self.rx.len() > 0
}
pub fn recv(&self) -> Result<Vec<u8>, Error> {
Ok(self.rx.recv()?)
}
pub fn clone(&self) -> Result<Self, Error> {
Ok(Self {
dest: self.dest.clone(),
local_stream_id: self.local_stream_id.clone(),
remote_stream_id: self.remote_stream_id.clone(),
tx: self.tx.clone(),
rx: self.rx.clone(),
closed: Arc::clone(&self.closed),
})
}
pub fn close(self) {
drop(self.tx);
drop(self.rx);
self.closed.store(false, Ordering::Relaxed);
}
}
+5 -6
View File
@@ -7,17 +7,16 @@ pub enum TransportLayerPacket {
stream_id: usize,
},
AckStreamUnrouted {
local_stream_id: usize,
remote_stream_id: usize,
ack_stream_id: usize,
stream_id: usize,
},
StreamDataUnrouted {
stream_id: usize,
data: Vec<u8>,
},
SpontaniousDataUnrouted {
data: Vec<u8>,
},
// SpontaniousDataUnrouted {
// data: Vec<u8>,
// },
}
#[derive(Debug, Encode, Decode, Clone)]