From fa8cb6269c05def60ff0e2f8975c2fbef078a15a Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Wed, 27 May 2026 11:04:22 -0600 Subject: [PATCH] Work on remaking routing --- Cargo.lock | 16 ++ Cargo.toml | 1 + unshell-protocol/Cargo.toml | 3 + unshell-protocol/src/endpoint/endpoint_ref.rs | 144 ++++++++++++++++++ unshell-protocol/src/endpoint/error.rs | 9 ++ unshell-protocol/src/endpoint/mod.rs | 54 +++++++ unshell-protocol/src/leaf/mod.rs | 6 + unshell-protocol/src/lib.rs | 7 +- unshell-protocol/src/tests/mod.rs | 107 +++++++++++++ unshell-protocol/src/types.rs | 14 ++ unshell-protocol/src/utils.rs | 1 - 11 files changed, 360 insertions(+), 2 deletions(-) create mode 100644 unshell-protocol/src/endpoint/endpoint_ref.rs create mode 100644 unshell-protocol/src/endpoint/error.rs create mode 100644 unshell-protocol/src/endpoint/mod.rs create mode 100644 unshell-protocol/src/leaf/mod.rs create mode 100644 unshell-protocol/src/tests/mod.rs create mode 100644 unshell-protocol/src/types.rs delete mode 100644 unshell-protocol/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index a4f6013..eef28b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,6 +216,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crypto-common" version = "0.2.1" @@ -839,6 +854,7 @@ dependencies = [ name = "unshell-protocol" version = "0.1.0" dependencies = [ + "crossbeam-channel", "rkyv", ] diff --git a/Cargo.toml b/Cargo.toml index a33bee8..1d73213 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,6 +68,7 @@ unshell-protocol = { workspace = true } # unshell-runtime = { workspace = true } # unshell-leaves = { workspace = true } + [profile.minimize] inherits = "release" strip = true # Strip symbols from the binary diff --git a/unshell-protocol/Cargo.toml b/unshell-protocol/Cargo.toml index bd4d2fb..5f56508 100644 --- a/unshell-protocol/Cargo.toml +++ b/unshell-protocol/Cargo.toml @@ -11,6 +11,9 @@ doctest = false rkyv = { workspace = true } # unshell-macros = { path = "../unshell-macros" } +[dev-dependencies] +crossbeam-channel.workspace = true + [lints.rust] elided_lifetimes_in_paths = "warn" future_incompatible = { level = "warn", priority = -1 } diff --git a/unshell-protocol/src/endpoint/endpoint_ref.rs b/unshell-protocol/src/endpoint/endpoint_ref.rs new file mode 100644 index 0000000..f31415d --- /dev/null +++ b/unshell-protocol/src/endpoint/endpoint_ref.rs @@ -0,0 +1,144 @@ +use alloc::{format, string::ToString}; + +use crate::{ + endpoint::error::EndpointError, + packet::Packet, + types::{ConnectionSet, HookMap, Path, RouteMap}, +}; + +#[derive(Debug)] +pub struct EndpointRef<'a> { + pub name: &'static str, + pub path: &'a Path, + + pub hooks: &'a mut HookMap, + + pub connections: &'a mut ConnectionSet, + + pub inbound: &'a mut RouteMap, + pub outbound: &'a mut RouteMap, +} + +impl<'a> EndpointRef<'a> { + pub fn add_inbound(&mut self, packet: Packet) -> Result<(), EndpointError> { + // If the packet is routed towards this endpoint + if packet.path.ends_with(self.name) { + if packet.is_upwards_call { + self.hooks.insert(packet.hook_id, packet.path.clone()); + } + + self.outbound + .entry(packet.path.clone()) + .or_default() + .push_back(packet); + + Ok(()) + } else { + // If the absolute path of this endpoint hasn't been set yet + if self.path.is_empty() { + return Err(EndpointError::NoAbsoultePathYet); + } + + if *self.path == packet.path { + return Err(EndpointError::IncorrectAbsolutePath); + } + + // For routing + let connection = if packet.is_upwards_call && self.path.starts_with(&packet.path) { + ( + packet + .path + .rsplit_once('/') + .map_or(packet.path.clone(), |(_, after)| after.to_string()), + true, + ) + } else if packet + .path + .starts_with(&format!("{}/{}", self.path, self.name)) + { + let concat_len = self.path.len() + self.name.len(); + + let after_self = &packet.path[concat_len..]; + + ( + after_self + .split_once('/') + .map_or(after_self.to_string(), |(before, _)| before.to_string()), + false, + ) + } else { + return Err(EndpointError::IncorrectAbsolutePath); + }; + + if !self.connections.contains(&connection) { + return Err(EndpointError::RouteNotExist); + } + + self.add_outbound(packet); + + Ok(()) + } + } + + pub fn add_outbound_upwards(&mut self, packet: Packet) -> Result<(), EndpointError> { + let next_hop = self + .hooks + .get(&packet.hook_id) + .ok_or(EndpointError::RouteNotExist)? + .clone(); + + if packet.end_hook { + let _ = self.hooks.remove(&packet.hook_id); + } + + self.outbound + .entry(next_hop.clone()) + .or_default() + .push_back(packet); + + Ok(()) + } + + pub fn add_outbound_downwards(&mut self, packet: Packet) -> Result<(), EndpointError> { + let next_hop = self + .hooks + .get(&packet.hook_id) + .ok_or(EndpointError::RouteNotExist)? + .clone(); + + if packet.end_hook { + let _ = self.hooks.remove(&packet.hook_id); + } + + self.outbound + .entry(next_hop.clone()) + .or_default() + .push_back(packet); + + Ok(()) + } + + pub fn take_intbound(&mut self, path: &str, f: F) + where + F: FnMut(&Packet), + { + if let Some(queue) = self.inbound.get_mut(path) { + let _ = queue.iter().map(f); + + queue.clear(); + } + } + + pub fn take_outbound(&mut self, path: &str, f: F) + where + F: FnMut(&Packet), + { + if let Some(queue) = self.inbound.get_mut(path) { + let _ = queue.iter().map(f); + + queue.clear(); + } + } +} + +// fn get_last_term_in_path(path: &Path) -> &str {} diff --git a/unshell-protocol/src/endpoint/error.rs b/unshell-protocol/src/endpoint/error.rs new file mode 100644 index 0000000..0a18ee7 --- /dev/null +++ b/unshell-protocol/src/endpoint/error.rs @@ -0,0 +1,9 @@ +#[derive(Debug)] +pub enum EndpointError { + NoAbsoultePathYet, + IncorrectAbsolutePath, + + RouteNotExist, + HookDuplicate, + HookNotExist, +} diff --git a/unshell-protocol/src/endpoint/mod.rs b/unshell-protocol/src/endpoint/mod.rs new file mode 100644 index 0000000..5d532b2 --- /dev/null +++ b/unshell-protocol/src/endpoint/mod.rs @@ -0,0 +1,54 @@ +mod endpoint_ref; +pub mod error; + +use alloc::{boxed::Box, string::String, vec::Vec}; + +use crate::{ + leaf::Leaf, + types::{ConnectionSet, HookMap, Path, RouteMap}, +}; + +pub use endpoint_ref::EndpointRef; + +pub struct Endpoint { + pub name: &'static str, + + // Absolute path for this node. + pub path: Path, + pub leaves: Vec>, + + pub connections: ConnectionSet, + + pub hooks: HookMap, + pub inbound: RouteMap, + pub outbound: RouteMap, +} + +impl Endpoint { + pub fn new(name: &'static str, leaves: Vec>) -> Self { + Self { + name, + path: String::new(), + leaves, + hooks: HookMap::new(), + connections: ConnectionSet::new(), + inbound: RouteMap::new(), + outbound: RouteMap::new(), + } + } + + pub fn update(&mut self) { + let mut self_ref = EndpointRef { + name: self.name, + path: &mut self.path, + hooks: &mut self.hooks, + connections: &mut self.connections, + inbound: &mut self.inbound, + outbound: &mut self.outbound, + }; + + let _ = self.leaves.iter_mut().map(|leaf| { + leaf.update(&mut self_ref); + }); + } +} diff --git a/unshell-protocol/src/leaf/mod.rs b/unshell-protocol/src/leaf/mod.rs new file mode 100644 index 0000000..5e544ce --- /dev/null +++ b/unshell-protocol/src/leaf/mod.rs @@ -0,0 +1,6 @@ +use crate::endpoint::EndpointRef; + +pub trait Leaf { + fn get_name(&self) -> &'static str; + fn update<'a>(&mut self, _: &mut EndpointRef<'a>); +} diff --git a/unshell-protocol/src/lib.rs b/unshell-protocol/src/lib.rs index 0e46874..70751de 100644 --- a/unshell-protocol/src/lib.rs +++ b/unshell-protocol/src/lib.rs @@ -2,5 +2,10 @@ pub extern crate alloc; +pub mod endpoint; +pub mod leaf; pub mod packet; -pub mod utils; +mod types; + +#[cfg(test)] +mod tests; diff --git a/unshell-protocol/src/tests/mod.rs b/unshell-protocol/src/tests/mod.rs new file mode 100644 index 0000000..9274fc6 --- /dev/null +++ b/unshell-protocol/src/tests/mod.rs @@ -0,0 +1,107 @@ +use crate::{endpoint::EndpointRef, leaf::Leaf, packet::Packet}; + +use alloc::{ + collections::vec_deque::VecDeque, + format, + string::{String, ToString}, + vec::Vec, +}; +use crossbeam_channel::{Receiver, Sender}; + +struct ControllerLeaf { + responder_id: String, + has_run: bool, +} +struct CommsLeaf { + tx: Sender>, + rx: Receiver>, + + remote_id: String, + is_authority: bool, + started: bool, +} +struct ResponderLeaf; + +impl Leaf for ControllerLeaf { + fn get_name(&self) -> &'static str { + "ControllerLeaf" + } + + fn update<'a>(&mut self, endpoint: &mut EndpointRef<'a>) { + if !self.has_run { + endpoint.add_outbound( + self.responder_id.clone(), + Packet { + hook_id: 0, + is_upwards_call: false, + end_hook: false, + path: format!("/{}", self.responder_id), + procedure_id: "echo".to_string(), + data: "ABC123".as_bytes().to_vec(), + }, + ); + + self.has_run = true; + } + } +} + +impl Leaf for CommsLeaf { + fn get_name(&self) -> &'static str { + "CommsLeaf" + } + + fn update<'a>(&mut self, endpoint: &mut EndpointRef<'a>) { + if !self.started { + endpoint + .connections + .insert((self.remote_id.clone(), self.is_authority)); + } + + while !self.rx.is_empty() { + let packet = Packet::deserialize(&self.rx.recv().unwrap()).unwrap(); + + endpoint.add_inbound(packet).unwrap(); + } + + endpoint.take_outbound(self.get_name(), |packet| { + let data = packet.serialize().unwrap(); + self.tx.send(data).unwrap(); + }); + } +} + +impl Leaf for ResponderLeaf { + fn get_name(&self) -> &'static str { + "ResponderLeaf" + } + + fn update<'a>(&mut self, endpoint: &mut EndpointRef<'a>) { + let packets = endpoint + .inbound + .get(self.get_name()) + .unwrap_or(&VecDeque::new()) + .iter() + .map(|packet| { + // let data = ; + + Packet { + hook_id: 0, + is_upwards_call: false, + end_hook: false, + path: String::new(), + // path: packet.path.clone(), + procedure_id: "echo".to_string(), + data: packet.data.clone(), + } + }) + .collect::>(); + + for packet in packets { + endpoint.add_outbound(packet); + } + } +} + +#[test] +fn test_comms() {} diff --git a/unshell-protocol/src/types.rs b/unshell-protocol/src/types.rs new file mode 100644 index 0000000..fab1a94 --- /dev/null +++ b/unshell-protocol/src/types.rs @@ -0,0 +1,14 @@ +use alloc::{ + collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque}, + string::String, +}; + +use crate::packet::Packet; + +pub type Path = String; +pub type EndpointName = String; +pub type HookID = u16; +pub type ConnectionSet = BTreeSet<(EndpointName, bool)>; +pub type HookMap = BTreeMap; +pub type PacketQueue = VecDeque; +pub type RouteMap = BTreeMap; diff --git a/unshell-protocol/src/utils.rs b/unshell-protocol/src/utils.rs deleted file mode 100644 index 8b13789..0000000 --- a/unshell-protocol/src/utils.rs +++ /dev/null @@ -1 +0,0 @@ -