mirror of
https://github.com/Astatin3/unshell-nodes-rs.git
synced 2026-06-09 00:28:00 -06:00
Start rewrite, get layers working
This commit is contained in:
@@ -1,9 +0,0 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::config::listeners::ListenerConfig;
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct CampaignConfig {
|
||||
pub name: String,
|
||||
pub listeners: Vec<ListenerConfig>,
|
||||
}
|
||||
@@ -1,4 +0,0 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub enum LayerConfig {}
|
||||
@@ -1,47 +0,0 @@
|
||||
use std::{
|
||||
error::Error,
|
||||
net::SocketAddr,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
config::layers::LayerConfig,
|
||||
networkers::{ServerTrait, TCPConnection, TCPServer},
|
||||
};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub enum ListenerConfig {
|
||||
Tcp {
|
||||
enabled: bool,
|
||||
name: String,
|
||||
addr: SocketAddr,
|
||||
layers: Vec<LayerConfig>,
|
||||
|
||||
#[serde(skip)]
|
||||
connections: Option<Arc<Mutex<Vec<TCPConnection>>>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl ListenerConfig {
|
||||
pub fn start(self) -> Result<(), Box<dyn Error>> {
|
||||
match self {
|
||||
ListenerConfig::Tcp {
|
||||
mut enabled,
|
||||
addr,
|
||||
layers,
|
||||
mut connections,
|
||||
..
|
||||
} => {
|
||||
let server = TCPServer::bind(&addr)?;
|
||||
|
||||
enabled = true;
|
||||
|
||||
// connections = Some(run_listener(server));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,3 +0,0 @@
|
||||
pub mod campaign;
|
||||
pub mod layers;
|
||||
pub mod listeners;
|
||||
@@ -1,6 +1,2 @@
|
||||
mod packets;
|
||||
|
||||
pub use packets::C2Packet;
|
||||
pub use packets::ErrorPacket;
|
||||
pub use packets::Parameter;
|
||||
pub use packets::Parameters;
|
||||
mod node;
|
||||
pub use node::Node;
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
use std::{net::SocketAddr, thread};
|
||||
|
||||
use crate::{
|
||||
Error,
|
||||
layers::LayerConfig,
|
||||
networkers::{Connection, ServerTrait, TCPConnection, TCPServer, run_listener},
|
||||
};
|
||||
|
||||
pub struct Node;
|
||||
|
||||
impl Node {
|
||||
pub fn run(addr: SocketAddr) -> Result<(), Error> {
|
||||
let layers = vec![LayerConfig::Handshake, LayerConfig::Base64];
|
||||
|
||||
run_listener(
|
||||
TCPServer::bind(&addr)?,
|
||||
layers,
|
||||
|connection: Box<dyn Connection + Send + 'static>| {
|
||||
thread::spawn(move || {
|
||||
let mut connection = connection;
|
||||
|
||||
loop {
|
||||
if let Ok(data) = connection.read() {
|
||||
if !connection.is_alive() {
|
||||
warn!("{} Disconnected!", connection.get_info());
|
||||
break;
|
||||
}
|
||||
println!("Data: {}", data);
|
||||
}
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,60 +0,0 @@
|
||||
use std::{collections::HashMap, fmt};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Result;
|
||||
|
||||
use crate::config::campaign::CampaignConfig;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub enum C2Packet {
|
||||
GetClients,
|
||||
AckGetClients,
|
||||
|
||||
RequestCampaign,
|
||||
AckRequestCampaign(CampaignConfig),
|
||||
|
||||
SetCampaign(CampaignConfig),
|
||||
AckSetCampaign,
|
||||
|
||||
GetParameter(String),
|
||||
AckGetParameter(String, Option<Parameter>),
|
||||
ParameterUpate(String, Parameter),
|
||||
|
||||
SetParameter(String, Parameter),
|
||||
AckSetParameter(bool),
|
||||
|
||||
SetAllParameters(Parameters),
|
||||
|
||||
Error(ErrorPacket),
|
||||
|
||||
Sysinfo { hostname: String },
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub enum ErrorPacket {
|
||||
UnsupportedRequestError,
|
||||
}
|
||||
|
||||
impl fmt::Debug for CampaignConfig {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "CampaignConfig")
|
||||
}
|
||||
}
|
||||
|
||||
pub type Parameters = HashMap<String, Parameter>;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum Parameter {
|
||||
Test1,
|
||||
CurrentTab(i32),
|
||||
}
|
||||
|
||||
impl C2Packet {
|
||||
pub fn encode(&self) -> Result<String> {
|
||||
serde_json::to_string(self)
|
||||
}
|
||||
|
||||
pub fn decode(string: &str) -> Result<Self> {
|
||||
serde_json::from_str::<Self>(string)
|
||||
}
|
||||
}
|
||||
@@ -1,18 +1,44 @@
|
||||
use crate::layers::Layer;
|
||||
use base64;
|
||||
use crate::{
|
||||
Error,
|
||||
networkers::{Connection, ProtocolLayer},
|
||||
};
|
||||
use base64::{Engine as _, engine::general_purpose};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Default, Serialize, Deserialize)]
|
||||
pub struct Base64;
|
||||
pub struct Base64Layer<C: Connection> {
|
||||
inner: C,
|
||||
}
|
||||
|
||||
impl Layer for Base64 {
|
||||
fn encode(&mut self, data: &[u8]) -> Vec<u8> {
|
||||
#[allow(deprecated)]
|
||||
base64::encode(str::from_utf8(data).unwrap()).into_bytes()
|
||||
impl<C: Connection> Connection for Base64Layer<C> {
|
||||
fn get_info(&self) -> String {
|
||||
format!("b64->{}", self.inner.get_info())
|
||||
}
|
||||
|
||||
fn decode(&mut self, data: &[u8]) -> Vec<u8> {
|
||||
#[allow(deprecated)]
|
||||
base64::decode(str::from_utf8(data).unwrap()).unwrap()
|
||||
fn is_alive(&self) -> bool {
|
||||
self.inner.is_alive()
|
||||
}
|
||||
|
||||
fn read(&mut self) -> Result<String, Error> {
|
||||
Ok(str::from_utf8(
|
||||
&general_purpose::STANDARD
|
||||
.decode(&self.inner.read()?)
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap()
|
||||
.to_string())
|
||||
}
|
||||
|
||||
fn write(&mut self, data: &str) -> Result<(), Error> {
|
||||
info!("Bsae");
|
||||
|
||||
self.inner.write(&general_purpose::STANDARD.encode(data))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: Connection> ProtocolLayer<C> for Base64Layer<C> {
|
||||
fn new(inner: C) -> Result<Self, Error> {
|
||||
Ok(Base64Layer { inner })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
use crate::{
|
||||
Error,
|
||||
layers::{Base64Layer, HandshakeLayer, LayerConfig},
|
||||
networkers::{Connection, ProtocolLayer},
|
||||
};
|
||||
|
||||
impl Connection for Box<dyn Connection + Send> {
|
||||
fn get_info(&self) -> String {
|
||||
(**self).get_info()
|
||||
}
|
||||
|
||||
fn is_alive(&self) -> bool {
|
||||
(**self).is_alive()
|
||||
}
|
||||
|
||||
fn read(&mut self) -> Result<String, Error> {
|
||||
(**self).read()
|
||||
}
|
||||
|
||||
fn write(&mut self, data: &str) -> Result<(), Error> {
|
||||
(**self).write(data)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_client<C>(base_conn: C, layers: Vec<LayerConfig>) -> Result<Box<dyn Connection>, Error>
|
||||
where
|
||||
C: Connection + 'static,
|
||||
{
|
||||
let mut current_conn: Box<dyn Connection + Send> = Box::new(base_conn);
|
||||
|
||||
for layer_config in &layers {
|
||||
current_conn = match layer_config {
|
||||
LayerConfig::Base64 => Box::new(Base64Layer::new(current_conn)?),
|
||||
LayerConfig::Handshake => {
|
||||
let mut handshake_layer = HandshakeLayer::new(current_conn)?;
|
||||
handshake_layer.initialize_client()?;
|
||||
Box::new(handshake_layer)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Ok(current_conn)
|
||||
}
|
||||
|
||||
pub fn create_server_builder<C>(
|
||||
layers: Vec<LayerConfig>,
|
||||
) -> Result<Box<dyn Fn(C) -> Result<Box<dyn Connection + Send>, Error>>, Error>
|
||||
where
|
||||
C: Connection + 'static,
|
||||
{
|
||||
Ok(Box::new(
|
||||
move |base_conn: C| -> Result<Box<dyn Connection + Send>, Error> {
|
||||
let mut current_conn: Box<dyn Connection + Send> = Box::new(base_conn);
|
||||
|
||||
for layer_config in &layers {
|
||||
current_conn = match layer_config {
|
||||
LayerConfig::Base64 => Box::new(Base64Layer::new(current_conn)?),
|
||||
LayerConfig::Handshake => {
|
||||
let mut handshake_layer = HandshakeLayer::new(current_conn)?;
|
||||
handshake_layer.initialize_server()?;
|
||||
Box::new(handshake_layer)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Ok(current_conn)
|
||||
},
|
||||
))
|
||||
}
|
||||
@@ -0,0 +1,103 @@
|
||||
use crate::{
|
||||
layers::Base64Layer,
|
||||
networkers::{Connection, ProtocolLayer},
|
||||
};
|
||||
|
||||
type Error = Box<dyn std::error::Error>;
|
||||
|
||||
// 4-Way Handshake Layer
|
||||
pub struct HandshakeLayer<C: Connection> {
|
||||
inner: C,
|
||||
finished_handshake: bool,
|
||||
}
|
||||
|
||||
impl<C: Connection> Connection for HandshakeLayer<C> {
|
||||
fn get_info(&self) -> String {
|
||||
format!("handshake->{}", self.inner.get_info())
|
||||
}
|
||||
|
||||
fn is_alive(&self) -> bool {
|
||||
self.inner.is_alive()
|
||||
}
|
||||
|
||||
fn read(&mut self) -> Result<String, Error> {
|
||||
if !self.finished_handshake {
|
||||
return Err("NotComplete".into());
|
||||
}
|
||||
self.inner.read()
|
||||
}
|
||||
|
||||
fn write(&mut self, data: &str) -> Result<(), Error> {
|
||||
if !self.finished_handshake {
|
||||
return Err("NotComplete".into());
|
||||
}
|
||||
self.inner.write(data)
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: Connection + 'static> ProtocolLayer<C> for HandshakeLayer<C> {
|
||||
fn new(inner: C) -> Result<Self, Error> {
|
||||
Ok(HandshakeLayer {
|
||||
inner,
|
||||
finished_handshake: false,
|
||||
})
|
||||
}
|
||||
|
||||
fn initialize_client(&mut self) -> Result<(), Error> {
|
||||
println!("Starting client handshake...");
|
||||
|
||||
// Step 1: Client sends SYN
|
||||
self.inner.write("SYN")?;
|
||||
println!("Client: Sent SYN");
|
||||
|
||||
// Step 2: Client receives SYN-ACK
|
||||
let response = self.inner.read()?;
|
||||
if response != "SYN-ACK" {
|
||||
return Err(format!("Expected SYN-ACK, got: {}", response).into());
|
||||
}
|
||||
println!("Client: Received SYN-ACK");
|
||||
|
||||
// Step 3: Client sends ACK
|
||||
self.inner.write("ACK")?;
|
||||
println!("Client: Sent ACK");
|
||||
|
||||
// Step 4: Client receives FIN (final confirmation)
|
||||
let response = self.inner.read()?;
|
||||
if response != "FIN" {
|
||||
return Err(format!("Expected FIN, got: {}", response).into());
|
||||
}
|
||||
println!("Client: Received FIN - Handshake complete!");
|
||||
|
||||
self.finished_handshake = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn initialize_server(&mut self) -> Result<(), Error> {
|
||||
println!("Starting server handshake...");
|
||||
|
||||
// Step 1: Server receives SYN
|
||||
let request = self.inner.read()?;
|
||||
if request != "SYN" {
|
||||
return Err(format!("Expected SYN, got: {}", request).into());
|
||||
}
|
||||
println!("Server: Received SYN");
|
||||
|
||||
// Step 2: Server sends SYN-ACK
|
||||
self.inner.write("SYN-ACK")?;
|
||||
println!("Server: Sent SYN-ACK");
|
||||
|
||||
// Step 3: Server receives ACK
|
||||
let response = self.inner.read()?;
|
||||
if response != "ACK" {
|
||||
return Err(format!("Expected ACK, got: {}", response).into());
|
||||
}
|
||||
println!("Server: Received ACK");
|
||||
|
||||
// Step 4: Server sends FIN (final confirmation)
|
||||
self.inner.write("FIN")?;
|
||||
println!("Server: Sent FIN - Handshake complete!");
|
||||
|
||||
self.finished_handshake = true;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,14 @@
|
||||
pub trait Layer: Serialize + Deserialize<'static> + Sized {
|
||||
fn encode(&mut self, data: &[u8]) -> Vec<u8>;
|
||||
fn decode(&mut self, data: &[u8]) -> Vec<u8>;
|
||||
pub enum LayerConfig {
|
||||
Base64,
|
||||
Handshake,
|
||||
}
|
||||
|
||||
pub mod base64;
|
||||
mod builder;
|
||||
pub mod handshake;
|
||||
|
||||
pub use base64::Base64;
|
||||
use serde::{Deserialize, Serialize};
|
||||
pub use base64::Base64Layer;
|
||||
pub use handshake::HandshakeLayer;
|
||||
|
||||
pub use builder::build_client;
|
||||
pub use builder::create_server_builder;
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
pub mod config;
|
||||
pub type Error = Box<dyn std::error::Error>;
|
||||
|
||||
// pub mod config;
|
||||
pub mod connection;
|
||||
pub mod layers;
|
||||
pub mod networkers;
|
||||
|
||||
@@ -1,115 +1,15 @@
|
||||
/// This is the lowset-level data transmission type
|
||||
|
||||
pub trait Connection: Send + Sync {
|
||||
type Error: std::fmt::Debug;
|
||||
|
||||
fn get_info(&self) -> String;
|
||||
fn is_alive(&self) -> bool;
|
||||
|
||||
fn read(&mut self) -> Result<String, Self::Error>;
|
||||
fn write(&mut self, data: &str) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
pub trait AsyncConnection<C>
|
||||
where
|
||||
C: Connection,
|
||||
{
|
||||
type Error: std::fmt::Debug;
|
||||
|
||||
fn as_async<T: Serialize + DeserializeOwned + Send + 'static>(
|
||||
connection: C,
|
||||
) -> (Sender<T>, Receiver<T>);
|
||||
}
|
||||
|
||||
pub trait ServerTrait<C: Connection> {
|
||||
type Error: std::fmt::Debug;
|
||||
|
||||
fn get_info(&self) -> String;
|
||||
fn accept(&self) -> Result<C, Self::Error>;
|
||||
fn bind(address: &SocketAddr) -> Result<Self, Self::Error>
|
||||
where
|
||||
Self: Sized;
|
||||
}
|
||||
|
||||
pub trait ClientTrait<C: Connection + Sized> {
|
||||
type Error: std::fmt::Debug;
|
||||
|
||||
fn connect(address: &SocketAddr) -> Result<C, Self::Error>;
|
||||
}
|
||||
|
||||
pub fn run_listener_state<S, C, R, A>(server: S, on_connect_callback: R, state: Arc<A>)
|
||||
/*-> Arc<Mutex<Vec<C>>>*/
|
||||
where
|
||||
S: ServerTrait<C> + Sync + Send + 'static,
|
||||
C: Connection + 'static,
|
||||
R: Fn(C, Arc<A>) + Sync + Send + 'static,
|
||||
A: Sync + Send + 'static,
|
||||
{
|
||||
info!("Started listener {}", server.get_info());
|
||||
// let clients: Arc<Mutex<Vec<C>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
// let clients_clone = Arc::clone(&clients);
|
||||
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
match server.accept() {
|
||||
Ok(conn) => {
|
||||
info!("New connection ({})", conn.get_info());
|
||||
|
||||
on_connect_callback(conn, Arc::clone(&state));
|
||||
|
||||
// OnConnectCallback::on_connect(&mut on_connect_callback, conn);
|
||||
// let mut clients_lock = clients_clone.lock().unwrap();
|
||||
// clients_lock.push(conn);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to accept connection: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn run_listener<S, C, R>(server: S, on_connect_callback: R)
|
||||
/*-> Arc<Mutex<Vec<C>>>*/
|
||||
where
|
||||
S: ServerTrait<C> + Sync + Send + 'static,
|
||||
C: Connection + 'static,
|
||||
R: Fn(C) + Sync + Send + 'static,
|
||||
{
|
||||
info!("Started listener {}", server.get_info());
|
||||
// let clients: Arc<Mutex<Vec<C>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
// let clients_clone = Arc::clone(&clients);
|
||||
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
match server.accept() {
|
||||
Ok(conn) => {
|
||||
info!("New connection ({})", conn.get_info());
|
||||
|
||||
on_connect_callback(conn);
|
||||
|
||||
// OnConnectCallback::on_connect(&mut on_connect_callback, conn);
|
||||
// let mut clients_lock = clients_clone.lock().unwrap();
|
||||
// clients_lock.push(conn);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to accept connection: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
mod server;
|
||||
mod tcp;
|
||||
mod traits;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
use crossbeam_channel::Receiver;
|
||||
use crossbeam_channel::Sender;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
pub use tcp::TCPClient;
|
||||
pub use tcp::TCPConnection;
|
||||
pub use tcp::TCPServer;
|
||||
|
||||
// pub use traits::AsyncConnection;
|
||||
pub use traits::ClientTrait;
|
||||
pub use traits::Connection;
|
||||
pub use traits::ProtocolLayer;
|
||||
pub use traits::ServerTrait;
|
||||
|
||||
pub use server::run_listener;
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
use crate::{
|
||||
layers::{LayerConfig, create_server_builder},
|
||||
networkers::{Connection, ServerTrait},
|
||||
};
|
||||
|
||||
// Helper macros for building layered connections
|
||||
macro_rules! build_layered_connection {
|
||||
($base:expr) => {
|
||||
$base
|
||||
};
|
||||
($base:expr, $layer:ty) => {
|
||||
<$layer>::new($base)?
|
||||
};
|
||||
($base:expr, $layer:ty, $($layers:ty),+) => {
|
||||
build_layered_connection!(<$layer>::new($base)?, $($layers),+)
|
||||
};
|
||||
}
|
||||
|
||||
pub fn run_listener_state<S, C, R, A>(server: S, on_connect_callback: R, state: Arc<A>)
|
||||
/*-> Arc<Mutex<Vec<C>>>*/
|
||||
where
|
||||
S: ServerTrait<C> + Sync + Send + 'static,
|
||||
C: Connection + 'static,
|
||||
R: Fn(C, Arc<A>) + Sync + Send + 'static,
|
||||
A: Sync + Send + 'static,
|
||||
{
|
||||
info!("Started listener {}", server.get_info());
|
||||
// let clients: Arc<Mutex<Vec<C>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
// let clients_clone = Arc::clone(&clients);
|
||||
|
||||
loop {
|
||||
match server.accept() {
|
||||
Ok(conn) => {
|
||||
info!("New connection ({})", conn.get_info());
|
||||
|
||||
on_connect_callback(conn, Arc::clone(&state));
|
||||
|
||||
// OnConnectCallback::on_connect(&mut on_connect_callback, conn);
|
||||
// let mut clients_lock = clients_clone.lock().unwrap();
|
||||
// clients_lock.push(conn);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to accept connection: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run_listener<S, C, R>(server: S, layers: Vec<LayerConfig>, on_connect_callback: R)
|
||||
/*-> Arc<Mutex<Vec<C>>>*/
|
||||
where
|
||||
S: ServerTrait<C> + Sync + Send + 'static,
|
||||
C: Connection + 'static,
|
||||
R: Fn(Box<dyn Connection + Send + 'static>) + Sync + Send + 'static,
|
||||
{
|
||||
let layer_builder = create_server_builder::<C>(layers).unwrap();
|
||||
|
||||
info!("Started listener {}", server.get_info());
|
||||
// let clients: Arc<Mutex<Vec<C>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
// let clients_clone = Arc::clone(&clients);
|
||||
|
||||
loop {
|
||||
match server.accept() {
|
||||
Ok(conn) => match layer_builder(conn) {
|
||||
Ok(conn) => {
|
||||
info!("New connection ({})", conn.get_info());
|
||||
on_connect_callback(conn);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to create layers: {:?}", e);
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Failed to accept connection: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,12 @@
|
||||
use std::{
|
||||
io::{self, BufRead, BufReader, Write},
|
||||
net::{SocketAddr, TcpListener, TcpStream},
|
||||
thread,
|
||||
};
|
||||
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use serde::{Serialize, de::DeserializeOwned};
|
||||
|
||||
use crate::networkers::{AsyncConnection, ClientTrait, Connection, ServerTrait};
|
||||
use crate::{
|
||||
Error,
|
||||
networkers::{ClientTrait, Connection, ServerTrait},
|
||||
};
|
||||
|
||||
pub struct TCPConnection {
|
||||
stream: TcpStream,
|
||||
@@ -16,8 +15,6 @@ pub struct TCPConnection {
|
||||
}
|
||||
|
||||
impl Connection for TCPConnection {
|
||||
type Error = io::Error;
|
||||
|
||||
fn get_info(&self) -> String {
|
||||
format!(
|
||||
"tcp://{}",
|
||||
@@ -33,7 +30,7 @@ impl Connection for TCPConnection {
|
||||
self.is_alive
|
||||
}
|
||||
|
||||
fn read(&mut self) -> Result<String, Self::Error> {
|
||||
fn read(&mut self) -> Result<String, Error> {
|
||||
let mut line = String::new();
|
||||
let n = self.reader.read_line(&mut line)?;
|
||||
|
||||
@@ -45,79 +42,78 @@ impl Connection for TCPConnection {
|
||||
Ok(line.trim_end().to_string())
|
||||
}
|
||||
|
||||
fn write(&mut self, data: &str) -> Result<(), Self::Error> {
|
||||
fn write(&mut self, data: &str) -> Result<(), Error> {
|
||||
info!("Sent: {}", data);
|
||||
writeln!(self.stream, "{}", data)?;
|
||||
self.stream.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncConnection<TCPConnection> for TCPConnection {
|
||||
type Error = io::Error;
|
||||
// impl AsyncConnection<TCPConnection> for TCPConnection {
|
||||
// type Error = io::Error;
|
||||
|
||||
fn as_async<T: Serialize + DeserializeOwned + Send + 'static>(
|
||||
connection: TCPConnection,
|
||||
) -> (Sender<T>, Receiver<T>) {
|
||||
let (send_tx, send_rx) = crossbeam_channel::unbounded::<T>();
|
||||
let (recv_tx, recv_rx) = crossbeam_channel::unbounded::<T>();
|
||||
// fn as_async<T: Serialize + DeserializeOwned + Send + 'static>(
|
||||
// connection: TCPConnection,
|
||||
// ) -> (Sender<T>, Receiver<T>) {
|
||||
// let (send_tx, send_rx) = crossbeam_channel::unbounded::<T>();
|
||||
// let (recv_tx, recv_rx) = crossbeam_channel::unbounded::<T>();
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut reader = connection.reader;
|
||||
// thread::spawn(move || {
|
||||
// let mut reader = connection.reader;
|
||||
|
||||
let mut read = || -> Result<String, Self::Error> {
|
||||
let mut line = String::new();
|
||||
let _ = reader.read_line(&mut line)?;
|
||||
// let mut read = || -> Result<String, Self::Error> {
|
||||
// let mut line = String::new();
|
||||
// let _ = reader.read_line(&mut line)?;
|
||||
|
||||
Ok(line.trim_end().to_string())
|
||||
};
|
||||
// Ok(line.trim_end().to_string())
|
||||
// };
|
||||
|
||||
loop {
|
||||
if let Ok(data) = read() {
|
||||
if data.is_empty() {
|
||||
break;
|
||||
}
|
||||
info!("Got {}", data);
|
||||
if let Ok(decoded) = serde_json::from_str::<T>(&data) {
|
||||
if let Err(e) = send_tx.send(decoded) {
|
||||
error!("Got error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
// loop {
|
||||
// if let Ok(data) = read() {
|
||||
// if data.is_empty() {
|
||||
// break;
|
||||
// }
|
||||
// info!("Got {}", data);
|
||||
// if let Ok(decoded) = serde_json::from_str::<T>(&data) {
|
||||
// if let Err(e) = send_tx.send(decoded) {
|
||||
// error!("Got error: {}", e);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// });
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut stream = connection.stream;
|
||||
// thread::spawn(move || {
|
||||
// let mut stream = connection.stream;
|
||||
|
||||
let mut write = |data: String| -> Result<(), Self::Error> {
|
||||
writeln!(stream, "{}", data)?;
|
||||
stream.flush()?;
|
||||
Ok(())
|
||||
};
|
||||
// let mut write = |data: String| -> Result<(), Self::Error> {
|
||||
// writeln!(stream, "{}", data)?;
|
||||
// stream.flush()?;
|
||||
// Ok(())
|
||||
// };
|
||||
|
||||
loop {
|
||||
if let Ok(data) = recv_rx.recv() {
|
||||
if let Ok(encoded) = serde_json::to_string(&data) {
|
||||
info!("Write {}", encoded);
|
||||
if let Err(e) = write(encoded) {
|
||||
error!("Got error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
// loop {
|
||||
// if let Ok(data) = recv_rx.recv() {
|
||||
// if let Ok(encoded) = serde_json::to_string(&data) {
|
||||
// info!("Write {}", encoded);
|
||||
// if let Err(e) = write(encoded) {
|
||||
// error!("Got error: {}", e);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// });
|
||||
|
||||
(recv_tx, send_rx)
|
||||
}
|
||||
}
|
||||
// (recv_tx, send_rx)
|
||||
// }
|
||||
// }
|
||||
|
||||
pub struct TCPServer {
|
||||
listener: TcpListener,
|
||||
}
|
||||
|
||||
impl ServerTrait<TCPConnection> for TCPServer {
|
||||
type Error = io::Error;
|
||||
|
||||
fn get_info(&self) -> String {
|
||||
format!(
|
||||
"tcp://{}",
|
||||
@@ -129,7 +125,7 @@ impl ServerTrait<TCPConnection> for TCPServer {
|
||||
)
|
||||
}
|
||||
|
||||
fn accept(&self) -> Result<TCPConnection, Self::Error> {
|
||||
fn accept(&self) -> Result<TCPConnection, Error> {
|
||||
let (stream, _) = self.listener.accept()?;
|
||||
let reader = BufReader::new(stream.try_clone()?);
|
||||
Ok(TCPConnection {
|
||||
@@ -139,7 +135,7 @@ impl ServerTrait<TCPConnection> for TCPServer {
|
||||
})
|
||||
}
|
||||
|
||||
fn bind(address: &SocketAddr) -> Result<Self, Self::Error> {
|
||||
fn bind(address: &SocketAddr) -> Result<Self, Error> {
|
||||
let listener = TcpListener::bind(address)?;
|
||||
Ok(Self { listener })
|
||||
}
|
||||
@@ -148,9 +144,7 @@ impl ServerTrait<TCPConnection> for TCPServer {
|
||||
pub struct TCPClient;
|
||||
|
||||
impl ClientTrait<TCPConnection> for TCPClient {
|
||||
type Error = io::Error;
|
||||
|
||||
fn connect(address: &SocketAddr) -> Result<TCPConnection, Self::Error> {
|
||||
fn connect(address: &SocketAddr) -> Result<TCPConnection, Error> {
|
||||
let stream = TcpStream::connect(address)?;
|
||||
let reader = BufReader::new(stream.try_clone()?);
|
||||
let conn = TCPConnection {
|
||||
@@ -158,7 +152,6 @@ impl ClientTrait<TCPConnection> for TCPClient {
|
||||
reader,
|
||||
is_alive: true,
|
||||
};
|
||||
info!("Connected to {}", conn.get_info());
|
||||
Ok(conn)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::ops::Deref;
|
||||
use std::ops::DerefMut;
|
||||
|
||||
use crate::Error;
|
||||
|
||||
// This is the lowset-level data transmission type
|
||||
pub trait Connection: Send {
|
||||
fn get_info(&self) -> String;
|
||||
fn is_alive(&self) -> bool;
|
||||
|
||||
fn read(&mut self) -> Result<String, Error>;
|
||||
fn write(&mut self, data: &str) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
// Trait for protocol layers that can be initialized
|
||||
pub trait ProtocolLayer<C: Connection>: Connection {
|
||||
fn new(inner: C) -> Result<Self, Error>
|
||||
where
|
||||
Self: Sized;
|
||||
fn initialize_client(&mut self) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
fn initialize_server(&mut self) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// impl Sized for dyn Connection {}
|
||||
|
||||
// pub trait AsyncConnection<C>
|
||||
// where
|
||||
// C: Connection,
|
||||
// {
|
||||
// fn as_async<T: Serialize + DeserializeOwned + Send + 'static>(
|
||||
// connection: C,
|
||||
// ) -> (Sender<T>, Receiver<T>);
|
||||
// }
|
||||
|
||||
pub trait ServerTrait<C: Connection> {
|
||||
fn get_info(&self) -> String;
|
||||
fn accept(&self) -> Result<C, Error>;
|
||||
fn bind(address: &SocketAddr) -> Result<Self, Error>
|
||||
where
|
||||
Self: Sized;
|
||||
}
|
||||
|
||||
pub trait ClientTrait<C: Connection + Sized> {
|
||||
fn connect(address: &SocketAddr) -> Result<C, Error>;
|
||||
}
|
||||
Reference in New Issue
Block a user