Better static runtime config, work on connections.

This commit is contained in:
Michael Mikovsky
2025-11-25 14:27:06 -07:00
parent dc153774e5
commit 1efa3206ae
16 changed files with 634 additions and 122 deletions
+16
View File
@@ -153,6 +153,21 @@ dependencies = [
"libc",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "crypto-common"
version = "0.1.6"
@@ -521,6 +536,7 @@ version = "0.0.0"
dependencies = [
"bincode",
"chrono",
"crossbeam-channel",
"libc",
"libloading",
"rand",
+1
View File
@@ -25,3 +25,4 @@ serde = {version = "1.0.228", features=["derive"]}
serde_json = "1.0.145"
libc = "0.2.177"
rand = "0.9.2"
crossbeam-channel = "0.5.15"
+2
View File
@@ -8,5 +8,7 @@ pub fn get_components() -> Vec<NamedComponent> {
return vec![
#[cfg(feature = "client")]
crate::client::get_named_component(),
#[cfg(feature = "server")]
crate::server::get_named_component(),
];
}
+1
View File
@@ -4,6 +4,7 @@ pub mod client;
pub mod config;
pub mod logger;
pub mod module;
pub mod network;
pub mod server;
mod components;
+1
View File
@@ -40,6 +40,7 @@ impl Logger for DefaultLogger {
fn log(&self, _: Record) {}
}
#[allow(unused_variables)]
pub fn set_logger_box(logger: Box<dyn Logger>) {
#[cfg(feature = "log")]
unsafe {
+34 -49
View File
@@ -39,7 +39,7 @@ impl Manager {
/// Create Manager, and run initilization for each Module
#[allow(static_mut_refs)]
pub fn run(config: &'static PayloadConfig, modules: Vec<Module>) {
pub fn start(config: &'static PayloadConfig, modules: Vec<Module>) -> Arc<Mutex<Self>> {
// Construct self
let mut this = Self::new(&config.id, config.components.clone(), modules);
@@ -51,11 +51,12 @@ impl Manager {
let this = Arc::new(Mutex::new(this));
Self::start_runtimes(this.clone(), &config.runtime_config);
debug!("Starting runtimes...");
for runtime in &config.runtime_config {
Self::start_runtime(this.clone(), runtime);
}
// drop(config);
Self::join(this);
this
}
fn load_components(&mut self) {
@@ -84,39 +85,8 @@ impl Manager {
}
}
/// Start each runtime
fn start_runtimes(this: Arc<Mutex<Self>>, runtimes: &'static Vec<RuntimeConfig>) {
debug!("Starting runtimes...");
for runtime in runtimes {
let mut this_lock = this.lock().unwrap();
let component = match this_lock.components.get(&runtime.parent_component) {
Some(component) => component,
None => {
warn!(
"Could not find component '{}' which is referenced by runtime: {}",
runtime.parent_component, runtime.name
);
continue;
}
};
debug!("Starting runtime: {}", runtime.name);
let runtime = match (*component.start_runtime)(runtime) {
Ok(runtime) => runtime,
Err(e) => {
warn!("Failed to start runtime: {:?}", e);
continue;
}
};
this_lock.active_runtimes.push(runtime);
}
}
/// Iterateratively loop through all runtimes, until all are finished executing
fn join(this: Arc<Mutex<Self>>) {
pub fn join(this: Arc<Mutex<Self>>) {
loop {
let mut this_lock = this.lock().unwrap();
@@ -140,20 +110,35 @@ impl Manager {
}
}
// pub fn get_component(&self) -> HashMap<&'static str, Box<dyn Component>> {
// self.components.clone()
// }
/// Start a runtime
pub fn start_runtime<'a>(this: Arc<Mutex<Self>>, runtime: &'static RuntimeConfig) {
let mut this_lock = this.lock().unwrap();
let component = match this_lock.components.get(&runtime.parent_component) {
Some(component) => component,
None => {
warn!(
"Could not find component '{}' which is referenced by runtime: {}",
runtime.parent_component, runtime.name
);
return;
}
};
debug!("Starting runtime: {}", runtime.name);
let runtime = match (*component.start_runtime)(runtime) {
Ok(runtime) => runtime,
Err(e) => {
warn!("Failed to start runtime: {:?}", e);
return;
}
};
this_lock.active_runtimes.push(runtime);
}
pub fn get_name(&self) -> &str {
self.id
}
// pub extern "C" fn test1234(&self, float: f32) {
// info!("Manager Test Sucsessfull! {}", float.powf(2.));
// }
// #[allow(static_mut_refs, improper_ctypes_definitions)]
// pub extern "C" fn get_manager() -> Arc<Mutex<Manager>> {
// unsafe { MANAGER_RUNTIME.clone().unwrap() }
// }
}
+68
View File
@@ -0,0 +1,68 @@
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use crate::{Announcement, ModuleError, network::Stream};
use crossbeam_channel::{Receiver, Sender};
pub struct Connection {
tx: Sender<Announcement>,
rx: Receiver<Announcement>,
is_alive: Arc<AtomicBool>,
}
impl Connection {
pub fn new() -> (Connection, Connection) {
let (tx_mgr, rx) = crossbeam_channel::unbounded();
let (tx, rx_mgr) = crossbeam_channel::unbounded();
let alive = Arc::new(AtomicBool::new(false));
(
Self {
tx: tx_mgr,
rx: rx_mgr,
is_alive: alive.clone(),
},
Self {
tx,
rx,
is_alive: alive,
},
)
}
}
impl Stream<Announcement> for Connection {
fn is_alive(&self) -> bool {
self.is_alive.load(Ordering::Relaxed)
}
fn len(&self) -> usize {
self.rx.len()
}
fn read(&mut self) -> Option<Announcement> {
match self.rx.is_empty() {
true => None,
false => self.rx.recv().ok(),
}
}
fn write(&mut self, data: Announcement) -> Result<(), crate::ModuleError> {
self.tx
.send(data)
.map_err(|_| ModuleError::Error("Failed to send".into()))?;
Ok(())
}
fn try_clone(&self) -> Result<Box<dyn Stream<Announcement> + Send + Sync>, crate::ModuleError> {
Ok(Box::new(Self {
tx: self.tx.clone(),
rx: self.rx.clone(),
is_alive: self.is_alive.clone(),
}))
}
}
+18
View File
@@ -0,0 +1,18 @@
mod connection;
pub use connection::Connection;
use crate::ModuleError;
/// This is the data transmission type
pub trait Stream<T>: Send + Sync {
// fn get_info(&self) -> String;
fn is_alive(&self) -> bool;
fn len(&self) -> usize;
fn read(&mut self) -> Option<T>;
fn write(&mut self, data: T) -> Result<(), ModuleError>;
fn try_clone(&self) -> Result<Box<dyn Stream<T> + Send + Sync>, ModuleError>;
}
+23
View File
@@ -1,3 +1,26 @@
mod server_runtime;
pub use server_runtime::ListenerRuntime;
use crate::{
ModuleError, ModuleRuntime,
config::{InterfaceWrapper, NamedComponent, RuntimeConfig},
};
pub const COMPONENT_NAME: &'static str = "server";
fn get_interface() -> Option<&'static (dyn InterfaceWrapper + Sync)> {
None
}
fn start_runtime(config: &'static RuntimeConfig) -> Result<Box<dyn ModuleRuntime>, ModuleError> {
Ok(Box::new(ListenerRuntime::new(config)?))
}
pub const fn get_named_component() -> NamedComponent {
NamedComponent {
name: COMPONENT_NAME,
get_interface: &get_interface,
start_runtime: &start_runtime,
}
}
+19 -7
View File
@@ -5,10 +5,11 @@ use std::{
thread::{self, JoinHandle},
};
use crate::*;
use crate::{config::RuntimeConfig, *};
pub struct ListenerRuntime {
thread_handle: JoinHandle<()>,
// join_signal: Arc<AtomicBool>,
// listener: TcpListener,
streams: Arc<Mutex<Vec<TcpStream>>>,
// reader: BufReader<TcpListener>,
@@ -16,9 +17,19 @@ pub struct ListenerRuntime {
}
impl ListenerRuntime {
pub fn new() -> ListenerRuntime {
pub fn new(config: &'static RuntimeConfig) -> Result<Self, ModuleError> {
// info!("Starting listener runtime on {}",);
let listener = TcpListener::bind("127.0.0.1:1234").unwrap();
let host = match config.config.get("host") {
Some(host) => host,
None => {
return Err(ModuleError::Error(
"Could not find HOST in Server Runtime".into(),
));
}
};
let listener = TcpListener::bind(host).unwrap();
let streams = Arc::new(Mutex::new(Vec::new()));
let streams_clone = streams.clone();
@@ -27,14 +38,14 @@ impl ListenerRuntime {
let streams = streams_clone.clone();
for stream in listener.incoming() {
let stream = stream.unwrap();
println!("New connection from {}", stream.peer_addr().unwrap());
debug!("New connection from {}", stream.peer_addr().unwrap());
streams.lock().unwrap().push(stream);
}
});
Self {
Ok(Self {
thread_handle,
streams,
}
})
}
pub fn send(&mut self, announcement: &Announcement) -> Result<(), std::io::Error> {
@@ -48,7 +59,7 @@ impl ListenerRuntime {
stream.flush()?;
}
println!("Announcement {:?} sent", announcement);
debug!("Announcement {:?} sent", announcement);
Ok(())
}
@@ -81,6 +92,7 @@ impl ModuleRuntime for ListenerRuntime {
fn kill(self: Box<Self>) {
if !self.thread_handle.is_finished() {
// self.join_signal.store(true, Ordering::Relaxed);
let _ = self.thread_handle.join();
}
// drop(self);