mirror of
https://github.com/Astatin3/unshell-nodes-rs.git
synced 2026-06-08 16:18:08 -06:00
Finally solve deadlock problem
This commit is contained in:
@@ -5,8 +5,10 @@ edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
clap = { version = "4.5.39", features = ["derive"] }
|
||||
crossbeam-channel = "0.5.15"
|
||||
lazy_static = "1.5.0"
|
||||
log = "0.4.27"
|
||||
mio = { version = "1.0.4", features = ["os-poll"] }
|
||||
native-tls = "0.2.14"
|
||||
pretty_env_logger = "0.5.0"
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
|
||||
+95
-79
@@ -7,107 +7,123 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use unshell_rs_lib::networkers::{ClientTrait, Connection, TCPClient, TCPConnection};
|
||||
|
||||
use crate::packets::{GuiPacket, Parameter, Parameters};
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use unshell_rs_lib::{
|
||||
connection::{C2Packet, Parameter, Parameters},
|
||||
networkers::{AsyncConnection, ClientTrait, TCPClient, TCPConnection},
|
||||
};
|
||||
|
||||
pub struct UnshellClient {
|
||||
addr: SocketAddr,
|
||||
client: Arc<Mutex<TCPConnection>>,
|
||||
|
||||
tx: Sender<C2Packet>,
|
||||
rx: Receiver<C2Packet>,
|
||||
parameters: Arc<Mutex<Parameters>>,
|
||||
outgoing_packets: Arc<Mutex<Vec<GuiPacket>>>,
|
||||
}
|
||||
|
||||
impl UnshellClient {
|
||||
pub fn new(addr: SocketAddr) -> Result<Self, Box<dyn Error>> {
|
||||
let client = Arc::new(Mutex::new(TCPClient::connect(&addr)?));
|
||||
let outgoing_packets = Arc::new(Mutex::new(Vec::<GuiPacket>::new()));
|
||||
let client = TCPClient::connect(&addr)?;
|
||||
|
||||
let (tx, rx) = TCPConnection::as_async(client);
|
||||
|
||||
// mpsc
|
||||
|
||||
// let poll = Poll::new()?;
|
||||
|
||||
// let events = Events::with_capacity(128);
|
||||
|
||||
// const SERVER: Token = Token(0);
|
||||
// poll.registry()
|
||||
// .register(&mut listener, SERVER, Interest::READABLE)?;
|
||||
|
||||
// let client = Arc::new(Mutex::new());
|
||||
// let outgoing_packets = Arc::new(Mutex::new(Vec::<GuiPacket>::new()));
|
||||
let parameters = Arc::new(Mutex::new(Parameters::new()));
|
||||
|
||||
let tx_client = Arc::clone(&client);
|
||||
let tx_packets = Arc::clone(&outgoing_packets);
|
||||
// let tx_client = Arc::clone(&client);
|
||||
// let tx_packets = Arc::clone(&outgoing_packets);
|
||||
|
||||
// Recieve thread
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
info!("Lock 2");
|
||||
let mut packets_lock = tx_packets.lock().unwrap();
|
||||
info!("Lock 2");
|
||||
if !packets_lock.is_empty() {
|
||||
info!("Lock 3");
|
||||
if let Ok(packet) = packets_lock.pop().unwrap().encode() {
|
||||
info!("Lock 3");
|
||||
let mut client_lock = tx_client.lock().unwrap();
|
||||
info!("Wrote {}", packet.as_str());
|
||||
match client_lock.write(packet.as_str()) {
|
||||
Err(e) => {
|
||||
error!("Failed to send packet: {:?}", e);
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
}
|
||||
}
|
||||
std::mem::drop(packets_lock);
|
||||
// // Recieve thread
|
||||
// thread::spawn(move || {
|
||||
// loop {
|
||||
// info!("Lock 2");
|
||||
// let mut packets_lock = tx_packets.lock().unwrap();
|
||||
// info!("Lock 2");
|
||||
// if !packets_lock.is_empty() {
|
||||
// info!("Lock 3");
|
||||
// if let Ok(packet) = packets_lock.pop().unwrap().encode() {
|
||||
// info!("Lock 3");
|
||||
// info!("Lock 4");
|
||||
// let mut client_lock = tx_client.lock().unwrap();
|
||||
// info!("Lock 4");
|
||||
// info!("Wrote {}", packet.as_str());
|
||||
// match client_lock.write(packet.as_str()) {
|
||||
// Err(e) => {
|
||||
// error!("Failed to send packet: {:?}", e);
|
||||
// }
|
||||
// _ => {}
|
||||
// };
|
||||
// }
|
||||
// }
|
||||
// std::mem::drop(packets_lock);
|
||||
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
});
|
||||
// thread::sleep(Duration::from_millis(10));
|
||||
// }
|
||||
// });
|
||||
|
||||
let rx_client = Arc::clone(&client);
|
||||
let rx_params = Arc::clone(¶meters);
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
info!("Lock 4");
|
||||
let mut client = rx_client.lock().unwrap();
|
||||
info!("Lock 4");
|
||||
if !client.is_alive() {
|
||||
error!("Disconnected from {}!", client.get_info());
|
||||
}
|
||||
if let Ok(data) = client.read() {
|
||||
info!("Got {}", data);
|
||||
if let Ok(packet) = GuiPacket::decode(data.as_str()) {
|
||||
match packet {
|
||||
GuiPacket::ParameterUpate(name, parameter) => {
|
||||
rx_params.lock().unwrap().insert(name, parameter);
|
||||
}
|
||||
GuiPacket::Error(error_packet) => {
|
||||
error!("Got error: {}", print_type_of(&error_packet))
|
||||
}
|
||||
GuiPacket::SetAllParameters(parameters) => {
|
||||
let mut params_lock = rx_params.lock().unwrap();
|
||||
params_lock.clear();
|
||||
params_lock.extend(parameters);
|
||||
}
|
||||
_ => {
|
||||
error!("Unsupported packet: {}", data)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// std::mem::drop(client);
|
||||
// let rx_client = Arc::clone(&client);
|
||||
// let rx_params = Arc::clone(¶meters);
|
||||
// thread::spawn(move || {
|
||||
// loop {
|
||||
// info!("Lock 5");
|
||||
// let mut client = rx_client.lock().unwrap();
|
||||
// info!("Lock 5");
|
||||
// if !client.is_alive() {
|
||||
// error!("Disconnected from {}!", client.get_info());
|
||||
// }
|
||||
// if let Ok(data) = client.read() {
|
||||
// info!("Got {}", data);
|
||||
// if let Ok(packet) = GuiPacket::decode(data.as_str()) {
|
||||
// match packet {
|
||||
// GuiPacket::ParameterUpate(name, parameter) => {
|
||||
// rx_params.lock().unwrap().insert(name, parameter);
|
||||
// }
|
||||
// GuiPacket::Error(error_packet) => {
|
||||
// error!("Got error: {}", print_type_of(&error_packet))
|
||||
// }
|
||||
// GuiPacket::SetAllParameters(parameters) => {
|
||||
// let mut params_lock = rx_params.lock().unwrap();
|
||||
// params_lock.clear();
|
||||
// params_lock.extend(parameters);
|
||||
// }
|
||||
// _ => {
|
||||
// error!("Unsupported packet: {}", data)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// // std::mem::drop(client);
|
||||
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
});
|
||||
// thread::sleep(Duration::from_millis(10));
|
||||
// }
|
||||
// });
|
||||
|
||||
Ok(Self {
|
||||
addr,
|
||||
client,
|
||||
|
||||
tx,
|
||||
rx,
|
||||
|
||||
// client,
|
||||
parameters,
|
||||
outgoing_packets,
|
||||
// outgoing_packets,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_parameter(&mut self, key: String, param: Parameter) {
|
||||
self.parameters
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(key.clone(), param.clone());
|
||||
self.outgoing_packets
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push(GuiPacket::SetParameter(key, param));
|
||||
let mut params_lock = self.parameters.lock().unwrap();
|
||||
params_lock.insert(key.clone(), param.clone());
|
||||
self.tx.send(C2Packet::SetParameter(key, param)).unwrap();
|
||||
}
|
||||
|
||||
pub fn get_parameter(&self, key: &str) -> Option<Parameter> {
|
||||
|
||||
+2
-2
@@ -3,9 +3,9 @@ use std::{
|
||||
error::Error,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use unshell_rs_lib::config::campaign::CampaignConfig;
|
||||
use unshell_rs_lib::{config::campaign::CampaignConfig, connection::Parameter};
|
||||
|
||||
use crate::{client::UnshellClient, packets::Parameter};
|
||||
use crate::client::UnshellClient;
|
||||
|
||||
pub struct UnshellGui {
|
||||
client: UnshellClient,
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
extern crate log;
|
||||
|
||||
mod client;
|
||||
mod packets;
|
||||
mod server;
|
||||
|
||||
pub use client::UnshellClient;
|
||||
|
||||
@@ -1,37 +0,0 @@
|
||||
use serde_json::Result;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use unshell_rs_lib::connection::ErrorPacket;
|
||||
|
||||
pub type Parameters = HashMap<String, Parameter>;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum GuiPacket {
|
||||
GetParameter(String),
|
||||
AckGetParameter(String, Option<Parameter>),
|
||||
ParameterUpate(String, Parameter),
|
||||
|
||||
SetParameter(String, Parameter),
|
||||
AckSetParameter(bool),
|
||||
|
||||
SetAllParameters(Parameters),
|
||||
|
||||
Error(ErrorPacket),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum Parameter {
|
||||
Test1,
|
||||
CurrentTab(i32),
|
||||
}
|
||||
|
||||
impl GuiPacket {
|
||||
pub fn encode(&self) -> Result<String> {
|
||||
serde_json::to_string(self)
|
||||
}
|
||||
|
||||
pub fn decode(string: &str) -> Result<Self> {
|
||||
serde_json::from_str::<Self>(string)
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,7 @@ use unshell_rs_lib::config::campaign::CampaignConfig;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::packets::Parameters;
|
||||
use unshell_rs_lib::connection::Parameters;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref DEFAULT_CAMPAIGN: CampaignConfig = CampaignConfig {
|
||||
|
||||
+101
-89
@@ -9,18 +9,18 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use unshell_rs_lib::{
|
||||
config::campaign::CampaignConfig,
|
||||
connection::ErrorPacket,
|
||||
networkers::{Connection, ServerTrait, TCPConnection, TCPServer, run_listener_state},
|
||||
connection::{C2Packet, ErrorPacket, Parameters},
|
||||
networkers::{
|
||||
AsyncConnection, Connection, ServerTrait, TCPConnection, TCPServer, run_listener_state,
|
||||
},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
packets::{GuiPacket, Parameters},
|
||||
server::{DEFAULT_CAMPAIGN, DEFAULT_USERS, User, config::DEFAULT_PARAMETERS},
|
||||
};
|
||||
use crate::server::{DEFAULT_CAMPAIGN, DEFAULT_USERS, User, config::DEFAULT_PARAMETERS};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct UnshellServerConfig {
|
||||
@@ -29,9 +29,15 @@ pub struct UnshellServerConfig {
|
||||
users: Vec<User>,
|
||||
|
||||
#[serde(skip)]
|
||||
client_count: usize,
|
||||
#[serde(skip)]
|
||||
broadcast_flag: HashMap<usize, Option<String>>,
|
||||
clients: Vec<Client>,
|
||||
}
|
||||
|
||||
impl UnshellServerConfig {
|
||||
pub fn broadcast_update_param(&self, name: String) {
|
||||
for i in 0..self.clients.len() {
|
||||
let _ = self.clients.get(i).unwrap().broadcast_tx.send(name.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UnshellServer {
|
||||
@@ -40,7 +46,7 @@ pub struct UnshellServer {
|
||||
|
||||
impl UnshellServer {
|
||||
pub fn from_filepath(filepath: &str) -> Self {
|
||||
(|| -> Result<Self, Box<dyn Error>> {
|
||||
let s = (|| -> Result<Self, Box<dyn Error>> {
|
||||
let mut file = File::open(filepath.to_string())?;
|
||||
|
||||
let mut contents = String::new();
|
||||
@@ -62,92 +68,98 @@ impl UnshellServer {
|
||||
users: DEFAULT_USERS.clone(),
|
||||
parameters: DEFAULT_PARAMETERS.clone(),
|
||||
|
||||
client_count: 0,
|
||||
broadcast_flag: HashMap::new(),
|
||||
clients: Vec::new(),
|
||||
})),
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
// let (broadcast_tx, broadcast_rx) = crossbeam_channel::unbounded::<String>();
|
||||
// let mut config_lock = s.config.lock().unwrap();
|
||||
// config_lock.broadcast_tx = Some(broadcast_tx);
|
||||
// config_lock.broadcast_rx = Some(broadcast_rx);
|
||||
// std::mem::drop(config_lock);
|
||||
|
||||
s
|
||||
}
|
||||
|
||||
pub fn run(&mut self, addr: SocketAddr) -> Result<(), Box<dyn Error>> {
|
||||
let on_connect = |connection: TCPConnection,
|
||||
config_clone: Arc<Mutex<UnshellServerConfig>>| {
|
||||
// Recieve loop
|
||||
thread::spawn(move || {
|
||||
let config = Arc::clone(&config_clone);
|
||||
|
||||
let mut connection = connection;
|
||||
|
||||
let send = |c: &mut TCPConnection, packet: GuiPacket| {
|
||||
if let Ok(packet) = packet.encode() {
|
||||
info!("Send {}", packet);
|
||||
c.write(packet.as_str()).unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
let mut config_lock = config.lock().unwrap();
|
||||
let client_id = config_lock.client_count.clone();
|
||||
config_lock.client_count += 1;
|
||||
send(
|
||||
&mut connection,
|
||||
GuiPacket::SetAllParameters(config_lock.parameters.clone()),
|
||||
);
|
||||
std::mem::drop(config_lock);
|
||||
|
||||
loop {
|
||||
if !connection.is_alive() {
|
||||
warn!("Client {} disconnected!", connection.get_info());
|
||||
break;
|
||||
}
|
||||
if let Ok(data) = connection.read() {
|
||||
if let Ok(packet) = GuiPacket::decode(data.as_str()) {
|
||||
match packet {
|
||||
GuiPacket::GetParameter(param) => send(
|
||||
&mut connection,
|
||||
GuiPacket::AckGetParameter(param.clone(), {
|
||||
let config_lock = config.lock().unwrap();
|
||||
let result = config_lock.parameters.get(¶m);
|
||||
result.cloned()
|
||||
}),
|
||||
),
|
||||
GuiPacket::SetParameter(name, param) => send(
|
||||
&mut connection,
|
||||
GuiPacket::AckSetParameter({
|
||||
let mut config_lock = config.lock().unwrap();
|
||||
config_lock.parameters.insert(name.clone(), param);
|
||||
config_lock.broadcast_flag.insert(client_id, Some(name));
|
||||
|
||||
true
|
||||
}),
|
||||
),
|
||||
_ => send(
|
||||
&mut connection,
|
||||
GuiPacket::Error(ErrorPacket::UnsupportedRequestError),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut config_lock = config.lock().unwrap();
|
||||
if let Some(Some(key)) = config_lock.broadcast_flag.get(&client_id) {
|
||||
send(
|
||||
&mut connection,
|
||||
GuiPacket::ParameterUpate(
|
||||
key.clone(),
|
||||
config_lock.parameters.get(key).unwrap().clone(),
|
||||
),
|
||||
);
|
||||
config_lock.broadcast_flag.insert(client_id, None);
|
||||
}
|
||||
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
let config_clone = Arc::clone(&self.config);
|
||||
run_listener_state(TCPServer::bind(&addr)?, on_connect, config_clone);
|
||||
run_listener_state(TCPServer::bind(&addr)?, Client::run, config_clone);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Remote client type for unshell parameters
|
||||
struct Client {
|
||||
pub broadcast_tx: Sender<String>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn run(connection: TCPConnection, config: Arc<Mutex<UnshellServerConfig>>) {
|
||||
let (tx, rx) = TCPConnection::as_async::<C2Packet>(connection);
|
||||
|
||||
let (broadcast_tx, broadcast_rx) = crossbeam_channel::unbounded::<String>();
|
||||
|
||||
let s = Self { broadcast_tx };
|
||||
|
||||
let mut config_lock = config.lock().unwrap();
|
||||
config_lock.clients.push(s);
|
||||
let config_clone = Arc::clone(&config);
|
||||
let tx_clone = tx.clone();
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
if let Ok(key) = broadcast_rx.recv() {
|
||||
let config_lock = config_clone.lock().unwrap();
|
||||
if let Err(e) = tx_clone.send(C2Packet::ParameterUpate(
|
||||
key.clone(),
|
||||
config_lock.parameters.get(&key).unwrap().clone(),
|
||||
)) {
|
||||
error!("Failed to send packet: {}", e);
|
||||
};
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if let Err(e) = tx.send(C2Packet::SetAllParameters(config_lock.parameters.clone())) {
|
||||
error!("Failed to send packet: {}", e);
|
||||
};
|
||||
std::mem::drop(config_lock);
|
||||
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
// if !connection.is_alive() {
|
||||
// warn!("Client {} disconnected!", connection.get_info());
|
||||
// break;
|
||||
// }
|
||||
if let Ok(packet) = rx.recv() {
|
||||
if let Err(e) = match packet {
|
||||
C2Packet::GetParameter(param) => {
|
||||
tx.send(C2Packet::AckGetParameter(param.clone(), {
|
||||
let config_lock = config.lock().unwrap();
|
||||
let result = config_lock.parameters.get(¶m);
|
||||
result.cloned()
|
||||
}))
|
||||
}
|
||||
C2Packet::SetParameter(name, param) => {
|
||||
tx.send(C2Packet::AckSetParameter({
|
||||
let mut config_lock = config.lock().unwrap();
|
||||
config_lock.parameters.insert(name.clone(), param);
|
||||
config_lock.broadcast_update_param(name);
|
||||
true
|
||||
}))
|
||||
}
|
||||
|
||||
C2Packet::Error(error) => {
|
||||
warn!("Got error: {:?}", error);
|
||||
Ok(())
|
||||
}
|
||||
_ => tx.send(C2Packet::Error(ErrorPacket::UnsupportedRequestError)),
|
||||
} {
|
||||
error!("Failed to send packet: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
base64 = "0.22.1"
|
||||
crossbeam-channel = "0.5.15"
|
||||
log = "0.4.27"
|
||||
mio = { version = "1.0.4", features = ["os-poll"] }
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = "1.0.140"
|
||||
|
||||
@@ -2,7 +2,6 @@ use std::{
|
||||
error::Error,
|
||||
net::SocketAddr,
|
||||
sync::{Arc, Mutex},
|
||||
thread,
|
||||
};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -2,3 +2,5 @@ mod packets;
|
||||
|
||||
pub use packets::C2Packet;
|
||||
pub use packets::ErrorPacket;
|
||||
pub use packets::Parameter;
|
||||
pub use packets::Parameters;
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use std::fmt;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fmt::{self, Display},
|
||||
};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Result;
|
||||
@@ -16,6 +19,15 @@ pub enum C2Packet {
|
||||
SetCampaign(CampaignConfig),
|
||||
AckSetCampaign,
|
||||
|
||||
GetParameter(String),
|
||||
AckGetParameter(String, Option<Parameter>),
|
||||
ParameterUpate(String, Parameter),
|
||||
|
||||
SetParameter(String, Parameter),
|
||||
AckSetParameter(bool),
|
||||
|
||||
SetAllParameters(Parameters),
|
||||
|
||||
Error(ErrorPacket),
|
||||
|
||||
Sysinfo { hostname: String },
|
||||
@@ -32,6 +44,14 @@ impl fmt::Debug for CampaignConfig {
|
||||
}
|
||||
}
|
||||
|
||||
pub type Parameters = HashMap<String, Parameter>;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum Parameter {
|
||||
Test1,
|
||||
CurrentTab(i32),
|
||||
}
|
||||
|
||||
impl C2Packet {
|
||||
pub fn encode(&self) -> Result<String> {
|
||||
serde_json::to_string(self)
|
||||
|
||||
@@ -10,6 +10,17 @@ pub trait Connection: Send + Sync {
|
||||
fn write(&mut self, data: &str) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
pub trait AsyncConnection<C>
|
||||
where
|
||||
C: Connection,
|
||||
{
|
||||
type Error: std::fmt::Debug;
|
||||
|
||||
fn as_async<T: Serialize + DeserializeOwned + Send + 'static>(
|
||||
connection: C,
|
||||
) -> (Sender<T>, Receiver<T>);
|
||||
}
|
||||
|
||||
pub trait ServerTrait<C: Connection> {
|
||||
type Error: std::fmt::Debug;
|
||||
|
||||
@@ -95,6 +106,12 @@ use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
use crossbeam_channel::Receiver;
|
||||
use crossbeam_channel::Sender;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
pub use tcp::TCPClient;
|
||||
pub use tcp::TCPConnection;
|
||||
pub use tcp::TCPServer;
|
||||
|
||||
use crate::connection;
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
use std::{
|
||||
error::Error,
|
||||
io::{self, BufRead, BufReader, Write},
|
||||
net::{SocketAddr, TcpListener, TcpStream},
|
||||
thread,
|
||||
};
|
||||
|
||||
use crate::networkers::{ClientTrait, Connection, ServerTrait};
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use serde::{Serialize, de::DeserializeOwned};
|
||||
|
||||
use crate::networkers::{AsyncConnection, ClientTrait, Connection, ServerTrait};
|
||||
|
||||
pub struct TCPConnection {
|
||||
stream: TcpStream,
|
||||
@@ -48,6 +53,67 @@ impl Connection for TCPConnection {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncConnection<TCPConnection> for TCPConnection {
|
||||
type Error = io::Error;
|
||||
|
||||
fn as_async<T: Serialize + DeserializeOwned + Send + 'static>(
|
||||
connection: TCPConnection,
|
||||
) -> (Sender<T>, Receiver<T>) {
|
||||
let (send_tx, send_rx) = crossbeam_channel::unbounded::<T>();
|
||||
let (recv_tx, recv_rx) = crossbeam_channel::unbounded::<T>();
|
||||
|
||||
let tx_clone = send_tx.clone();
|
||||
thread::spawn(move || {
|
||||
let mut reader = connection.reader;
|
||||
|
||||
let mut read = || -> Result<String, Self::Error> {
|
||||
let mut line = String::new();
|
||||
let n = reader.read_line(&mut line)?;
|
||||
|
||||
Ok(line.trim_end().to_string())
|
||||
};
|
||||
|
||||
loop {
|
||||
if let Ok(data) = read() {
|
||||
if data.is_empty() {
|
||||
break;
|
||||
}
|
||||
info!("Got {}", data);
|
||||
if let Ok(decoded) = serde_json::from_str::<T>(&data) {
|
||||
if let Err(e) = tx_clone.send(decoded) {
|
||||
error!("Got error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let rx_clone = recv_rx.clone();
|
||||
thread::spawn(move || {
|
||||
let mut stream = connection.stream;
|
||||
|
||||
let mut write = |data: String| -> Result<(), Self::Error> {
|
||||
writeln!(stream, "{}", data)?;
|
||||
stream.flush()?;
|
||||
Ok(())
|
||||
};
|
||||
|
||||
loop {
|
||||
if let Ok(data) = rx_clone.recv() {
|
||||
if let Ok(encoded) = serde_json::to_string(&data) {
|
||||
info!("Write {}", encoded);
|
||||
if let Err(e) = write(encoded) {
|
||||
error!("Got error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
(recv_tx, send_rx)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TCPServer {
|
||||
listener: TcpListener,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user