diff --git a/unshell-cli/src/main.rs b/unshell-cli/src/main.rs index 73e74db..df95876 100644 --- a/unshell-cli/src/main.rs +++ b/unshell-cli/src/main.rs @@ -1,4 +1,7 @@ -use std::collections::HashMap; +use std::{ + collections::HashMap, + io::{Write, stdin, stdout}, +}; use static_init::dynamic; use unshell_lib::{ @@ -41,39 +44,39 @@ fn main() -> Result<(), Box> { // Manager::st - Manager::join(manager); + // Manager::join(manager); - // loop { - // print!("> "); - // stdout().flush().expect("Failed to flush stdout"); - // let mut input = String::new(); - // stdin().read_line(&mut input).expect("Failed to read line"); + loop { + print!("> "); + stdout().flush().expect("Failed to flush stdout"); + let mut input = String::new(); + stdin().read_line(&mut input).expect("Failed to read line"); - // let args = input.trim().split(" ").collect::>(); + let args = input.trim().split(" ").collect::>(); - // match args[0] { - // "" => {} - // "test" => { - // if let Some(arg) = args.get(1) { - // println!("Test with argument: {}", arg); - // serverruntime - // .send(&Announcement::TestAnnouncement(arg.to_string())) - // .unwrap(); - // } else { - // println!("Test without argument"); - // } - // } - // _ => { - // println!("Invalid Command: '{}'", args[0]); - // } - // } + match args[0] { + "" => {} + "test" => { + if let Some(arg) = args.get(1) { + println!("Test with argument: {}", arg); + // serverruntime + // .send(&Announcement::TestAnnouncement(arg.to_string())) + // .unwrap(); + } else { + println!("Test without argument"); + } + } + _ => { + println!("Invalid Command: '{}'", args[0]); + } + } - // // println!("{:?}", args); - // } + // println!("{:?}", args); + } // serverruntime.send(&Announcement::GetRuntimes)?; // let response = serverruntime. - Ok(()) + // Ok(()) } diff --git a/unshell-lib/src/client/client_runtime.rs b/unshell-lib/src/client/client_runtime.rs index 035bc2a..057ae23 100644 --- a/unshell-lib/src/client/client_runtime.rs +++ b/unshell-lib/src/client/client_runtime.rs @@ -6,6 +6,7 @@ use std::{ atomic::{AtomicBool, Ordering}, }, thread::{self, JoinHandle}, + time::Duration, }; use crate::{config::RuntimeConfig, *}; @@ -32,35 +33,57 @@ impl ClientRuntime { } }; + let retry = match config.config.get("retry") { + Some(host) => Duration::from_millis(host.parse::().unwrap()), + None => { + return Err(ModuleError::Error( + "Could not find RETRY in Client Runtime".into(), + )); + } + }; + Ok(Self { thread_handle: thread::spawn(move || { debug!("Connecting to server..."); - let mut stream = match TcpStream::connect(host) { - Ok(stream) => stream, - Err(e) => { - error!("Failed to connect to server: {}", e); - return; - } - }; - info!("Connected"); - while !join_clone.load(Ordering::Relaxed) { - let mut size_buf = [0u8; 4]; - stream.read_exact(&mut size_buf).unwrap(); - let size = u32::from_be_bytes(size_buf); - - let mut buf = vec![0u8; size as usize]; - - stream.read_exact(&mut buf).unwrap(); - - let a = Announcement::decode(&buf).unwrap(); - - match a { - Announcement::TestAnnouncement(s) => { - println!("Received test announcement: {}", s) + loop { + let mut stream = match TcpStream::connect(host) { + Ok(stream) => stream, + Err(e) => { + error!("Failed to connect to server: {}", e); + thread::sleep(retry); + continue; + } + }; + info!("Connected"); + + while !join_clone.load(Ordering::Relaxed) { + let mut size_buf = [0u8; 4]; + match stream.read_exact(&mut size_buf) { + Ok(()) => {} + Err(_) => { + break; + } + }; + let size = u32::from_be_bytes(size_buf); + + let mut buf = vec![0u8; size as usize]; + + stream.read_exact(&mut buf).unwrap(); + + let a = Announcement::decode(&buf).unwrap(); + + match a { + Announcement::TestAnnouncement(s) => { + println!("Received test announcement: {}", s) + } + _ => {} } - _ => {} } + + debug!("Disconnected from {}", host); + + thread::sleep(retry); } }), join_signal, diff --git a/unshell-lib/src/module/manager.rs b/unshell-lib/src/module/manager.rs index d9388db..24425e2 100644 --- a/unshell-lib/src/module/manager.rs +++ b/unshell-lib/src/module/manager.rs @@ -1,12 +1,13 @@ use std::{ collections::HashMap, sync::{Arc, Mutex}, - thread, + thread::{self, JoinHandle}, time::Duration, }; use crate::{ config::{NamedComponent, PayloadConfig, RuntimeConfig}, + network::Connection, *, }; use module::Module; @@ -16,10 +17,14 @@ use unshell_obfuscate::symbol; pub struct Manager { id: &'static str, + handle: Option>, + pub modules: Vec, - active_runtimes: Vec>, components: HashMap, + active_runtimes: Vec>, + + pub connections: Vec, } // static mut MANAGER_RUNTIME: Option>> = None; @@ -28,16 +33,20 @@ impl Manager { fn new(id: &'static str, components: Vec, modules: Vec) -> Self { Self { id, + handle: None, + modules, components: components .into_iter() .map(|c| (c.name.to_string(), c)) .collect(), active_runtimes: Vec::new(), + + connections: Vec::new(), } } - /// Create Manager, and run initilization for each Module + /// Create Manager, and run initialization for each Module #[allow(static_mut_refs)] pub fn start(config: &'static PayloadConfig, modules: Vec) -> Arc> { // Construct self @@ -56,6 +65,8 @@ impl Manager { Self::start_runtime(this.clone(), runtime); } + this.lock().unwrap().handle = Some(Self::start_thread(this.clone())); + this } @@ -85,28 +96,47 @@ impl Manager { } } - /// Iterateratively loop through all runtimes, until all are finished executing + /// The manager thread. receives announcements, and kills runtimes. + fn start_thread(this: Arc>) -> JoinHandle<()> { + thread::spawn(move || { + loop { + thread::sleep(Duration::from_millis(10)); + + let mut this_lock = this.lock().unwrap(); + + if this_lock.active_runtimes.len() <= 0 { + debug!("There are no more runtimes! Exiting..."); + break; + } + + this_lock.active_runtimes.retain(|runtime| { + if runtime.is_running() { + true + } else { + debug!("Runtime exited!"); //TODO: Make this better + false + } + }); + + // Read announcements + this_lock.recv_connection_announcements(); + + // Prune dead connections + this_lock.prune_connections(); + + drop(this_lock) + } + }) + } + + /// Wait for manager thread to finish. pub fn join(this: Arc>) { loop { - let mut this_lock = this.lock().unwrap(); - - if this_lock.active_runtimes.len() <= 0 { - debug!("There are no more runtimes! Exiting..."); + if this.lock().unwrap().handle.as_ref().unwrap().is_finished() { break; } - this_lock.active_runtimes.retain(|runtime| { - if runtime.is_running() { - true - } else { - debug!("Runtime exited!"); //TODO: Make this better - false - } - }); - - drop(this_lock); - - thread::sleep(Duration::from_millis(500)); + thread::sleep(Duration::from_millis(100)); } } diff --git a/unshell-lib/src/module/manager_announcement.rs b/unshell-lib/src/module/manager_announcement.rs new file mode 100644 index 0000000..f6a3820 --- /dev/null +++ b/unshell-lib/src/module/manager_announcement.rs @@ -0,0 +1,16 @@ +use crate::{Announcement, module::Manager}; + +impl Manager { + pub fn recv_announcement(&mut self, announcement: &Announcement) { + match announcement { + Announcement::TestAnnouncement(str) => { + println!("Got test announcement: {}", str) + } + // Announcement::GetRuntimes => todo!(), + // Announcement::GetRuntimesAck(_) => todo!(), + // Announcement::StartRuntime(runtime_config) => todo!(), + // Announcement::StartRuntimeAck(_) => todo!(), + _ => {} + } + } +} diff --git a/unshell-lib/src/module/manager_connection.rs b/unshell-lib/src/module/manager_connection.rs new file mode 100644 index 0000000..70cfc49 --- /dev/null +++ b/unshell-lib/src/module/manager_connection.rs @@ -0,0 +1,29 @@ +use crate::{ + Announcement, + module::Manager, + network::{Connection, Stream}, +}; + +impl Manager { + pub fn add_connection(&mut self, connection: Connection) { + self.connections.push(connection); + } + + pub fn prune_connections(&mut self) { + self.connections.retain(|c| c.is_alive()); + } + + pub fn recv_connection_announcements(&mut self) { + // Collect all incoming announcements + let announcements = self + .connections + .iter() + .map(|c| c.read()) + .flat_map(|array| array) + .collect::>(); + + for announcement in announcements { + self.recv_announcement(&announcement) + } + } +} diff --git a/unshell-lib/src/module/mod.rs b/unshell-lib/src/module/mod.rs index 82e0a03..7581aed 100644 --- a/unshell-lib/src/module/mod.rs +++ b/unshell-lib/src/module/mod.rs @@ -1,4 +1,7 @@ mod manager; +mod manager_announcement; +mod manager_connection; + mod module; mod proc_load; diff --git a/unshell-lib/src/network/connection.rs b/unshell-lib/src/network/connection.rs index e275bc6..35a9367 100644 --- a/unshell-lib/src/network/connection.rs +++ b/unshell-lib/src/network/connection.rs @@ -43,11 +43,8 @@ impl Stream for Connection { self.rx.len() } - fn read(&mut self) -> Option { - match self.rx.is_empty() { - true => None, - false => self.rx.recv().ok(), - } + fn read(&self) -> Vec { + self.rx.try_iter().collect() } fn write(&mut self, data: Announcement) -> Result<(), crate::ModuleError> { diff --git a/unshell-lib/src/network/mod.rs b/unshell-lib/src/network/mod.rs index 90c4c90..b185c28 100644 --- a/unshell-lib/src/network/mod.rs +++ b/unshell-lib/src/network/mod.rs @@ -10,7 +10,7 @@ pub trait Stream: Send + Sync { fn is_alive(&self) -> bool; fn len(&self) -> usize; - fn read(&mut self) -> Option; + fn read(&self) -> Vec; fn write(&mut self, data: T) -> Result<(), ModuleError>; diff --git a/unshell-payload/src/main.rs b/unshell-payload/src/main.rs index 800de25..40bcf07 100644 --- a/unshell-payload/src/main.rs +++ b/unshell-payload/src/main.rs @@ -20,7 +20,10 @@ static PAYLOAD_CONFIG: PayloadConfig = PayloadConfig { runtime_config: vec![RuntimeConfig { parent_component: symbol!("client").to_string(), name: symbol!("client runtime").to_string(), - config: HashMap::from([(symbol!("host").to_string(), obs!("localhost:1234"))]), + config: HashMap::from([ + (symbol!("host").to_string(), obs!("localhost:1234")), + (symbol!("retry").to_string(), obs!("1000")), + ]), }], };