diff --git a/Cargo.lock b/Cargo.lock index 6f5a62e..86042f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1440,6 +1440,7 @@ name = "unshell" version = "0.1.0" dependencies = [ "chrono", + "crossbeam-channel", "rkyv", "static_init", "thiserror 2.0.18", @@ -1452,6 +1453,7 @@ dependencies = [ name = "unshell-leaves" version = "0.1.0" dependencies = [ + "crossbeam-channel", "portable-pty", "rkyv", "unshell-macros", diff --git a/Cargo.toml b/Cargo.toml index e1361b9..7313486 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ syn = "2.0.117" quote = "1.0.45" proc-macro2 = "1.0.106" portable-pty = "0.9.0" +crossbeam-channel = "0.5.15" unshell = { path = "." } unshell-protocol = { path = "./unshell-protocol" } unshell-leaves = { path = "./unshell-leaves" } @@ -64,10 +65,17 @@ unshell-macros = { workspace = true } unshell-protocol = { workspace = true } unshell-leaves = { workspace = true } +[dev-dependencies] +crossbeam-channel = { workspace = true } + [[example]] name = "leaf_derive" path = "examples/protocol/leaf_derive.rs" +[[example]] +name = "crossbeam_channel_leaf" +path = "examples/protocol/crossbeam_channel_leaf.rs" + [[example]] name = "remote_shell_endpoint" path = "examples/protocol/remote_shell_endpoint.rs" diff --git a/examples/protocol/crossbeam_channel_leaf.rs b/examples/protocol/crossbeam_channel_leaf.rs new file mode 100644 index 0000000..dffad2e --- /dev/null +++ b/examples/protocol/crossbeam_channel_leaf.rs @@ -0,0 +1,177 @@ +//! Crossbeam-channel router leaf example. +//! +//! This example wires a root controller to an `agent` node, promotes a staged +//! child connection on that agent via the `add_connection` procedure, and then +//! queries the grandchild's connection snapshot through a fully routed call/reply +//! exchange. + +use std::error::Error; + +use crossbeam_channel::{Receiver, Sender, unbounded}; +use unshell::leaves::crossbeam_channel::{ + ConnectionRequest, ConnectionSnapshot, CrossbeamChannelLeaf, CrossbeamEnvelope, +}; +use unshell::protocol::tree::ProtocolEndpoint; +use unshell::protocol::tree::{ + ChildRoute, Endpoint, EndpointOutcome, Ingress, LeafRuntime, decode_call_input, + encode_call_reply, +}; + +fn main() -> Result<(), Box> { + let (mut agent, root_to_agent) = ChannelNode::new(path(&["agent"])); + let (mut child, agent_to_child) = ChannelNode::new(path(&["agent", "child"])); + let (agent_to_root, root_rx) = unbounded(); + + let mut root = ProtocolEndpoint::new( + Vec::new(), + None, + vec![ChildRoute::registered(path(&["agent"]))], + Vec::new(), + ); + + agent.stage_connection(Vec::new(), agent_to_root); + agent.connect_staged(Vec::new())?; + + child.stage_connection(path(&["agent"]), root_to_agent.clone()); + child.connect_staged(path(&["agent"]))?; + + agent.stage_connection(path(&["agent", "child"]), agent_to_child); + + call_root( + &mut root, + &root_to_agent, + &mut agent, + &mut child, + &root_rx, + path(&["agent"]), + CrossbeamChannelLeaf::protocol_procedure_id("add_connection").expect("procedure exists"), + encode_call_reply(&ConnectionRequest { + peer_path: path(&["agent", "child"]), + })?, + )?; + + let reply = call_root( + &mut root, + &root_to_agent, + &mut agent, + &mut child, + &root_rx, + path(&["agent", "child"]), + CrossbeamChannelLeaf::protocol_procedure_id("get_connections").expect("procedure exists"), + encode_call_reply(&())?, + )?; + let snapshot = decode_call_input::(reply.as_slice())?; + + println!("child parent: {:?}", snapshot.parent); + println!("child children: {:?}", snapshot.children); + + Ok(()) +} + +struct ChannelNode { + runtime: LeafRuntime, + rx: Receiver, +} + +impl ChannelNode { + fn new(path: Vec) -> (Self, Sender) { + let (tx, rx) = unbounded(); + let endpoint = ProtocolEndpoint::new( + path, + None, + Vec::new(), + vec![CrossbeamChannelLeaf::protocol_leaf_spec()], + ); + ( + Self { + runtime: LeafRuntime::new(endpoint, CrossbeamChannelLeaf::default()), + rx, + }, + tx, + ) + } + + fn stage_connection(&mut self, peer_path: Vec, sender: Sender) { + let _ = self.runtime.leaf_mut().stage_connection(peer_path, sender); + } + + fn connect_staged(&mut self, peer_path: Vec) -> Result<(), Box> { + let runtime = &mut self.runtime; + let mut leaf = core::mem::take(runtime.leaf_mut()); + let result = leaf.connect_staged(runtime.endpoint_mut(), peer_path); + *runtime.leaf_mut() = leaf; + result?; + Ok(()) + } + + fn drain(&mut self) -> Result> { + let mut processed = 0usize; + while let Ok(envelope) = self.rx.try_recv() { + let outcome = self + .runtime + .receive_routed(&envelope.ingress, envelope.frame)?; + self.runtime.route_forwarded(outcome.forwarded)?; + processed += 1; + } + Ok(processed) + } +} + +fn call_root( + root: &mut ProtocolEndpoint, + root_to_agent: &Sender, + agent: &mut ChannelNode, + child: &mut ChannelNode, + root_rx: &Receiver, + dst_path: Vec, + procedure_id: String, + data: Vec, +) -> Result, Box> { + let hook_id = root.allocate_hook_id(); + let outcome = root.send_call( + dst_path, + Some(CrossbeamChannelLeaf::protocol_leaf_name()), + procedure_id, + Some(hook_id), + data, + )?; + let EndpointOutcome::Forward { frame, .. } = outcome else { + return Err("root call did not forward".into()); + }; + root_to_agent.send(CrossbeamEnvelope { + ingress: Ingress::Parent, + frame, + })?; + + for _ in 0..16 { + let mut progress = 0usize; + progress += agent.drain()?; + progress += child.drain()?; + + while let Ok(envelope) = root_rx.try_recv() { + progress += 1; + let outcome = root.receive(&envelope.ingress, envelope.frame)?; + if let EndpointOutcome::Local(event) = outcome { + match event { + unshell::protocol::tree::LocalEvent::Data { message, .. } => { + return Ok(message.data); + } + unshell::protocol::tree::LocalEvent::Fault { message, .. } => { + return Err(format!("routed call faulted: {:?}", message.fault).into()); + } + unshell::protocol::tree::LocalEvent::Call { .. } => {} + } + } + } + + if progress == 0 { + break; + } + } + + Err("timed out waiting for routed reply".into()) +} + +fn path(parts: &[&str]) -> Vec { + parts.iter().map(|part| (*part).to_owned()).collect() +} diff --git a/unshell-leaves/Cargo.toml b/unshell-leaves/Cargo.toml index cf397e4..f27c2e1 100644 --- a/unshell-leaves/Cargo.toml +++ b/unshell-leaves/Cargo.toml @@ -12,6 +12,7 @@ leaf_tui = [] [dependencies] rkyv = { workspace = true } portable-pty = { workspace = true } +crossbeam-channel = { workspace = true } unshell-macros = { workspace = true } unshell-protocol = { workspace = true } diff --git a/unshell-leaves/src/crossbeam_channel.rs b/unshell-leaves/src/crossbeam_channel.rs new file mode 100644 index 0000000..2ad7cfd --- /dev/null +++ b/unshell-leaves/src/crossbeam_channel.rs @@ -0,0 +1,692 @@ +//! Crossbeam-channel-backed router leaf for in-process protocol simulations. +//! +//! This leaf owns parent/child transport links backed by `crossbeam_channel`, so +//! tests and examples can exercise full packet routing without opening real +//! sockets. + +use std::collections::BTreeMap; + +use crossbeam_channel::Sender; +use rkyv::{Archive, Deserialize, Serialize}; +use unshell_protocol::FrameBytes; +use unshell_protocol::tree::{ + CallLeaf, ChildRoute, Endpoint, Ingress, ProtocolEndpoint, RouterLeaf, +}; + +use crate::{leaf, procedures}; + +/// One inbound frame delivered across a simulated channel hop. +/// +/// What it is: the transport envelope sent between in-process nodes when this +/// leaf forwards protocol traffic over `crossbeam_channel`. +/// +/// Why it exists: routing needs both the encoded frame bytes and the ingress side +/// that the receiver should apply when validating source paths. +/// +/// # Example +/// ```rust +/// use unshell_leaves::crossbeam_channel::CrossbeamEnvelope; +/// use unshell_leaves::protocol::{FrameBytes, tree::Ingress}; +/// let envelope = CrossbeamEnvelope { +/// ingress: Ingress::Parent, +/// frame: FrameBytes::new(), +/// }; +/// assert!(matches!(envelope.ingress, Ingress::Parent)); +/// ``` +#[derive(Debug, Clone)] +pub struct CrossbeamEnvelope { + /// Which side of the tree the receiving endpoint should treat this frame as coming from. + pub ingress: Ingress, + /// Encoded protocol frame bytes. + pub frame: FrameBytes, +} + +/// Request payload for promoting or pruning one simulated connection. +/// +/// What it is: the protocol payload shared by the `add_connection` and +/// `remove_connection` procedures. +/// +/// Why it exists: the leaf only needs the peer endpoint path to decide whether the +/// connection is a direct parent edge or a direct child edge. +/// +/// # Example +/// ```rust +/// use unshell_leaves::crossbeam_channel::ConnectionRequest; +/// let request = ConnectionRequest { +/// peer_path: vec!["agent".into(), "child".into()], +/// }; +/// assert_eq!(request.peer_path.len(), 2); +/// ``` +#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct ConnectionRequest { + /// Absolute endpoint path of the peer connection being managed. + pub peer_path: Vec, +} + +/// Machine-readable snapshot of the leaf's active simulated connections. +/// +/// What it is: the reply payload returned by `get_connections`, `add_connection`, +/// and `remove_connection`. +/// +/// Why it exists: connection-management procedures should return the resulting +/// topology immediately so tests and tooling can confirm what changed. +/// +/// # Example +/// ```rust +/// use unshell_leaves::crossbeam_channel::ConnectionSnapshot; +/// let snapshot = ConnectionSnapshot { +/// parent: Some(vec!["agent".into()]), +/// children: vec![vec!["agent".into(), "child".into()]], +/// }; +/// assert_eq!(snapshot.children.len(), 1); +/// ``` +#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] +pub struct ConnectionSnapshot { + /// The direct parent path, if this endpoint currently has one. + pub parent: Option>, + /// The currently active direct child paths. + pub children: Vec>, +} + +/// Errors surfaced by the channel-backed router leaf. +/// +/// What it is: the small, deterministic error set used by both the management +/// procedures and the transport forwarding hooks. +/// +/// Why it exists: tests and examples need structured failures when a staged link is +/// missing, a path is not a direct neighbor, or a channel is already closed. +/// +/// # Example +/// ```rust +/// use unshell_leaves::crossbeam_channel::CrossbeamChannelError; +/// let error = CrossbeamChannelError::MissingStagedConnection; +/// assert_eq!(error.to_string(), "missing staged connection"); +/// ``` +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CrossbeamChannelError { + /// The requested peer path does not have a staged sender ready to activate. + MissingStagedConnection, + /// The requested peer path is neither the direct parent nor a direct child. + InvalidPeerPath, + /// No active parent link exists for upstream forwarding. + MissingParentConnection, + /// No active child link exists for the requested child path. + MissingChildConnection, + /// The receiving side of the channel is already disconnected. + ChannelClosed, +} + +impl core::fmt::Display for CrossbeamChannelError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::MissingStagedConnection => f.write_str("missing staged connection"), + Self::InvalidPeerPath => f.write_str("peer path is not a direct parent or child"), + Self::MissingParentConnection => f.write_str("missing parent connection"), + Self::MissingChildConnection => f.write_str("missing child connection"), + Self::ChannelClosed => f.write_str("channel receiver is disconnected"), + } + } +} + +impl core::error::Error for CrossbeamChannelError {} + +/// Shared compile-time declaration for the crossbeam-channel router leaf. +/// +/// What it is: the public leaf declaration that owns the canonical leaf name and +/// exported management procedure ids for [`CrossbeamChannelLeaf`]. +/// +/// Why it exists: endpoint code, examples, and tests should all derive the same +/// protocol-facing metadata from one source of truth instead of hand-assembling +/// the leaf id and procedure inventory. +/// +/// # Example +/// ```rust +/// use unshell_leaves::crossbeam_channel::CrossbeamChannel; +/// assert!(CrossbeamChannel::protocol_leaf_name().contains("crossbeam_channel")); +/// ``` +#[leaf( + id = "org.unshell.v1.crossbeam_channel", + endpoint_struct = CrossbeamChannelLeaf, + procedures = ["add_connection", "remove_connection", "get_connections"] +)] +pub struct CrossbeamChannel; + +/// In-process router leaf backed by `crossbeam_channel` senders. +/// +/// What it is: a leaf host that stores one optional parent sender, any number of +/// child senders, and a staging area for connections that should only become live +/// after an explicit procedure call. +/// +/// Why it exists: protocol tests need a realistic forwarding surface with parent +/// and child links, but opening TCP sockets would make those tests slower and more +/// brittle than necessary. +/// +/// # Example +/// ```rust +/// use crossbeam_channel::unbounded; +/// use unshell_leaves::crossbeam_channel::CrossbeamChannelLeaf; +/// let (tx, _rx) = unbounded(); +/// let mut leaf = CrossbeamChannelLeaf::default(); +/// let previous = leaf.stage_connection(vec!["agent".into()], tx); +/// assert!(previous.is_none()); +/// ``` +#[derive(Default)] +pub struct CrossbeamChannelLeaf { + parent: Option, + children: BTreeMap, Sender>, + child_routes: Vec, + staged: BTreeMap, Sender>, +} + +#[derive(Debug, Clone)] +struct ChannelConnection { + path: Vec, + sender: Sender, +} + +impl CrossbeamChannelLeaf { + /// Stages one channel sender so a later protocol procedure can activate it. + /// + /// What it is: a bootstrap helper that prepares the transport handle before the + /// leaf promotes it into active routing state. + /// + /// Why it exists: the sender itself is not a serializable protocol payload, so + /// tests and examples need a local way to install it before calling + /// `add_connection`. + pub fn stage_connection( + &mut self, + peer_path: Vec, + sender: Sender, + ) -> Option> { + self.staged.insert(peer_path, sender) + } + + /// Promotes one staged connection into the active topology. + /// + /// This is the same operation used by the public `add_connection` procedure, + /// but it is also useful for local bootstrap code that has not yet wired the + /// control plane needed to issue that call remotely. + pub fn connect_staged( + &mut self, + endpoint: &mut ProtocolEndpoint, + peer_path: Vec, + ) -> Result { + if !is_direct_parent(endpoint.path(), &peer_path) + && !is_direct_child(endpoint.path(), &peer_path) + { + return Err(CrossbeamChannelError::InvalidPeerPath); + } + + let Some(sender) = self.staged.remove(&peer_path) else { + return Err(CrossbeamChannelError::MissingStagedConnection); + }; + + if is_direct_parent(endpoint.path(), &peer_path) { + self.parent = Some(ChannelConnection { + path: peer_path.clone(), + sender, + }); + endpoint + .set_parent_path(Some(peer_path)) + .map_err(|_| CrossbeamChannelError::InvalidPeerPath)?; + return Ok(ConnectionSnapshot::from_endpoint(endpoint)); + } + + if is_direct_child(endpoint.path(), &peer_path) { + self.children.insert(peer_path.clone(), sender); + self.sync_child_routes(); + endpoint + .upsert_child_route(ChildRoute::registered(peer_path)) + .map_err(|_| CrossbeamChannelError::InvalidPeerPath)?; + return Ok(ConnectionSnapshot::from_endpoint(endpoint)); + } + + unreachable!("direct-neighbor validation returned early above") + } + + /// Removes one active connection and returns it to the staged set. + pub fn disconnect( + &mut self, + endpoint: &mut ProtocolEndpoint, + peer_path: &[String], + ) -> Result { + if !is_direct_parent(endpoint.path(), peer_path) + && !is_direct_child(endpoint.path(), peer_path) + { + return Err(CrossbeamChannelError::InvalidPeerPath); + } + + if self + .parent + .as_ref() + .is_some_and(|parent| parent.path == peer_path) + { + let Some(parent) = self.parent.take() else { + return Err(CrossbeamChannelError::MissingParentConnection); + }; + self.staged.insert(parent.path, parent.sender); + endpoint + .set_parent_path(None) + .map_err(|_| CrossbeamChannelError::InvalidPeerPath)?; + return Ok(ConnectionSnapshot::from_endpoint(endpoint)); + } + + let Some(sender) = self.children.remove(peer_path) else { + return Err(CrossbeamChannelError::MissingChildConnection); + }; + self.staged.insert(peer_path.to_vec(), sender); + self.sync_child_routes(); + endpoint.remove_child_route(peer_path); + Ok(ConnectionSnapshot::from_endpoint(endpoint)) + } + + fn sync_child_routes(&mut self) { + self.child_routes = self + .children + .keys() + .cloned() + .map(ChildRoute::registered) + .collect(); + } +} + +impl ConnectionSnapshot { + fn from_endpoint(endpoint: &ProtocolEndpoint) -> Self { + Self { + parent: endpoint.parent_path().map(<[String]>::to_vec), + children: endpoint + .child_routes() + .iter() + .map(|child| child.path.clone()) + .collect(), + } + } +} + +#[procedures(error = CrossbeamChannelError)] +impl CrossbeamChannelLeaf { + #[call] + fn add_connection( + &mut self, + endpoint: &mut ProtocolEndpoint, + request: ConnectionRequest, + ) -> Result { + self.connect_staged(endpoint, request.peer_path) + } + + #[call] + fn remove_connection( + &mut self, + endpoint: &mut ProtocolEndpoint, + request: ConnectionRequest, + ) -> Result { + self.disconnect(endpoint, &request.peer_path) + } + + #[call] + fn get_connections(&mut self, endpoint: &ProtocolEndpoint) -> ConnectionSnapshot { + ConnectionSnapshot::from_endpoint(endpoint) + } +} + +impl CallLeaf for CrossbeamChannelLeaf { + type Error = CrossbeamChannelError; +} + +impl RouterLeaf for CrossbeamChannelLeaf { + type RouteError = CrossbeamChannelError; + + fn parent_path(&self) -> Option<&[String]> { + self.parent.as_ref().map(|parent| parent.path.as_slice()) + } + + fn child_routes(&self) -> &[ChildRoute] { + &self.child_routes + } + + fn route_to_parent( + &mut self, + local_path: &[String], + frame: FrameBytes, + ) -> Result<(), Self::RouteError> { + let Some(parent) = &self.parent else { + return Err(CrossbeamChannelError::MissingParentConnection); + }; + parent + .sender + .send(CrossbeamEnvelope { + ingress: Ingress::Child(local_path.to_vec()), + frame, + }) + .map_err(|_| CrossbeamChannelError::ChannelClosed) + } + + fn route_to_child( + &mut self, + child_path: &[String], + frame: FrameBytes, + ) -> Result<(), Self::RouteError> { + let Some(sender) = self.children.get(child_path) else { + return Err(CrossbeamChannelError::MissingChildConnection); + }; + sender + .send(CrossbeamEnvelope { + ingress: Ingress::Parent, + frame, + }) + .map_err(|_| CrossbeamChannelError::ChannelClosed) + } +} + +fn is_direct_parent(local_path: &[String], peer_path: &[String]) -> bool { + local_path + .split_last() + .is_some_and(|(_, parent_path)| parent_path == peer_path) +} + +fn is_direct_child(local_path: &[String], peer_path: &[String]) -> bool { + peer_path.len() == local_path.len() + 1 && peer_path.starts_with(local_path) +} + +#[cfg(test)] +mod tests { + use crossbeam_channel::{Receiver, unbounded}; + use unshell_protocol::decode_frame; + use unshell_protocol::tree::{ + Endpoint, EndpointOutcome, LeafRuntime, decode_call_input, encode_call_reply, + }; + + use super::*; + + fn path(parts: &[&str]) -> Vec { + parts.iter().map(|part| (*part).to_owned()).collect() + } + + struct ChannelNode { + runtime: LeafRuntime, + rx: Receiver, + } + + impl ChannelNode { + fn new(path: Vec) -> (Self, Sender) { + let (tx, rx) = unbounded(); + let endpoint = ProtocolEndpoint::new( + path, + None, + Vec::new(), + vec![CrossbeamChannelLeaf::protocol_leaf_spec()], + ); + ( + Self { + runtime: LeafRuntime::new(endpoint, CrossbeamChannelLeaf::default()), + rx, + }, + tx, + ) + } + + fn drain(&mut self) -> usize { + let mut processed = 0usize; + while let Ok(envelope) = self.rx.try_recv() { + let outcome = self + .runtime + .receive_routed(&envelope.ingress, envelope.frame) + .expect("node should process routed frame"); + self.runtime + .route_forwarded(outcome.forwarded) + .expect("router leaf should forward emitted frames"); + processed += 1; + } + processed + } + + fn stage_connection(&mut self, peer_path: Vec, sender: Sender) { + let _ = self.runtime.leaf_mut().stage_connection(peer_path, sender); + } + + fn connect_staged(&mut self, peer_path: Vec) { + let snapshot = { + let runtime = &mut self.runtime; + let mut leaf = core::mem::take(runtime.leaf_mut()); + let result = leaf.connect_staged(runtime.endpoint_mut(), peer_path); + *runtime.leaf_mut() = leaf; + result + }; + snapshot.expect("staged connection should activate"); + } + } + + #[test] + fn crossbeam_channel_leaf_routes_calls_and_replies_across_parent_and_child_links() { + let (mut agent, root_to_agent) = ChannelNode::new(path(&["agent"])); + let (mut child, agent_to_child) = ChannelNode::new(path(&["agent", "child"])); + let (agent_to_root, root_rx) = unbounded(); + + let mut root = ProtocolEndpoint::new( + Vec::new(), + None, + vec![ChildRoute::registered(path(&["agent"]))], + Vec::new(), + ); + + agent.stage_connection(Vec::new(), agent_to_root); + agent.connect_staged(Vec::new()); + + child.stage_connection(path(&["agent"]), root_to_agent.clone()); + child.connect_staged(path(&["agent"])); + + agent.stage_connection(path(&["agent", "child"]), agent_to_child); + + let hook_id = root.allocate_hook_id(); + let add_connection = root + .send_call( + path(&["agent"]), + Some(CrossbeamChannelLeaf::protocol_leaf_name()), + CrossbeamChannelLeaf::protocol_procedure_id("add_connection") + .expect("procedure should exist"), + Some(hook_id), + encode_call_reply(&ConnectionRequest { + peer_path: path(&["agent", "child"]), + }) + .expect("request should encode"), + ) + .expect("root should build add-connection call"); + let EndpointOutcome::Forward { frame, .. } = add_connection else { + panic!("root should forward add-connection call"); + }; + root_to_agent + .send(CrossbeamEnvelope { + ingress: Ingress::Parent, + frame, + }) + .expect("root should deliver frame to agent"); + + for _ in 0..8 { + let mut progress = 0usize; + progress += agent.drain(); + progress += child.drain(); + while let Ok(envelope) = root_rx.try_recv() { + let outcome = root + .receive(&envelope.ingress, envelope.frame) + .expect("root should accept reply frame"); + if let EndpointOutcome::Local(local) = outcome { + match local { + unshell_protocol::tree::LocalEvent::Data { .. } + | unshell_protocol::tree::LocalEvent::Fault { .. } => {} + unshell_protocol::tree::LocalEvent::Call { .. } => {} + } + } + progress += 1; + } + if progress == 0 { + break; + } + } + + assert_eq!(agent.runtime.endpoint().child_routes().len(), 1); + + let query_hook = root.allocate_hook_id(); + let query = root + .send_call( + path(&["agent", "child"]), + Some(CrossbeamChannelLeaf::protocol_leaf_name()), + CrossbeamChannelLeaf::protocol_procedure_id("get_connections") + .expect("procedure should exist"), + Some(query_hook), + encode_call_reply(&()).expect("unit request should encode"), + ) + .expect("root should build query call"); + let EndpointOutcome::Forward { frame, .. } = query else { + panic!("root should forward query call"); + }; + root_to_agent + .send(CrossbeamEnvelope { + ingress: Ingress::Parent, + frame, + }) + .expect("root should deliver query to agent"); + + let mut reply = None; + for _ in 0..12 { + let mut progress = 0usize; + progress += agent.drain(); + progress += child.drain(); + while let Ok(envelope) = root_rx.try_recv() { + let outcome = root + .receive(&envelope.ingress, envelope.frame) + .expect("root should accept routed reply"); + if let EndpointOutcome::Local(unshell_protocol::tree::LocalEvent::Data { + message, + .. + }) = outcome + { + reply = Some( + decode_call_input::(message.data.as_slice()) + .expect("reply payload should decode"), + ); + } + progress += 1; + } + if reply.is_some() || progress == 0 { + break; + } + } + + let reply = reply.expect("root should receive child connection snapshot"); + assert_eq!(reply.parent, Some(path(&["agent"]))); + assert!(reply.children.is_empty()); + + let remove_hook = root.allocate_hook_id(); + let remove = root + .send_call( + path(&["agent"]), + Some(CrossbeamChannelLeaf::protocol_leaf_name()), + CrossbeamChannelLeaf::protocol_procedure_id("remove_connection") + .expect("procedure should exist"), + Some(remove_hook), + encode_call_reply(&ConnectionRequest { + peer_path: path(&["agent", "child"]), + }) + .expect("request should encode"), + ) + .expect("root should build remove-connection call"); + let EndpointOutcome::Forward { frame, .. } = remove else { + panic!("root should forward remove-connection call"); + }; + root_to_agent + .send(CrossbeamEnvelope { + ingress: Ingress::Parent, + frame, + }) + .expect("root should deliver removal call to agent"); + + for _ in 0..8 { + let mut progress = 0usize; + progress += agent.drain(); + progress += child.drain(); + while let Ok(envelope) = root_rx.try_recv() { + let _ = root + .receive(&envelope.ingress, envelope.frame) + .expect("root should process removal reply"); + progress += 1; + } + if progress == 0 { + break; + } + } + + assert!(agent.runtime.endpoint().child_routes().is_empty()); + let final_hook = root.allocate_hook_id(); + let dropped = root + .send_call( + path(&["agent", "child"]), + Some(CrossbeamChannelLeaf::protocol_leaf_name()), + CrossbeamChannelLeaf::protocol_procedure_id("get_connections") + .expect("procedure should exist"), + Some(final_hook), + encode_call_reply(&()).expect("unit request should encode"), + ) + .expect("query call should encode after removal"); + assert!(matches!(dropped, EndpointOutcome::Forward { .. })); + + if let EndpointOutcome::Forward { frame, .. } = dropped { + root_to_agent + .send(CrossbeamEnvelope { + ingress: Ingress::Parent, + frame, + }) + .expect("root should still reach the agent"); + } + let mut saw_reply = false; + for _ in 0..8 { + let mut progress = 0usize; + progress += agent.drain(); + progress += child.drain(); + while let Ok(envelope) = root_rx.try_recv() { + progress += 1; + if let EndpointOutcome::Local(unshell_protocol::tree::LocalEvent::Data { + message, + .. + }) = root + .receive(&envelope.ingress, envelope.frame) + .expect("root should process any late reply") + { + let _ = decode_frame(message.data.as_slice()); + saw_reply = true; + } + } + if progress == 0 { + break; + } + } + assert!( + !saw_reply, + "removed child route should stop forwarded replies" + ); + } + + #[test] + fn invalid_add_connection_keeps_staged_sender_available_for_retry() { + let (tx, _rx) = unbounded(); + let mut leaf = CrossbeamChannelLeaf::default(); + let mut endpoint = ProtocolEndpoint::new(path(&["agent"]), None, Vec::new(), Vec::new()); + leaf.stage_connection(path(&["elsewhere"]), tx); + + let error = leaf + .connect_staged(&mut endpoint, path(&["elsewhere"])) + .expect_err("non-neighbor path should fail"); + assert_eq!(error, CrossbeamChannelError::InvalidPeerPath); + assert!(leaf.staged.contains_key(&path(&["elsewhere"]))); + } + + #[test] + fn invalid_remove_connection_reports_invalid_peer_path() { + let mut leaf = CrossbeamChannelLeaf::default(); + let mut endpoint = ProtocolEndpoint::new(path(&["agent"]), None, Vec::new(), Vec::new()); + + let error = leaf + .disconnect(&mut endpoint, &path(&["not", "a", "neighbor"])) + .expect_err("non-neighbor removal should fail"); + assert_eq!(error, CrossbeamChannelError::InvalidPeerPath); + } +} diff --git a/unshell-leaves/src/lib.rs b/unshell-leaves/src/lib.rs index 6a390d7..7545f0a 100644 --- a/unshell-leaves/src/lib.rs +++ b/unshell-leaves/src/lib.rs @@ -17,8 +17,27 @@ pub use unshell_protocol as protocol; /// Re-exports one role-specific type behind a stable public alias. /// -/// This keeps consumers on a single name such as `RemoteShell` while still -/// compiling only the role implementation needed by the current binary. +/// What it is: a small macro that binds one public type alias to either an +/// endpoint-facing leaf host or a TUI-facing leaf host based on active features. +/// +/// Why it exists: downstream code should be able to import one stable name such as +/// `RemoteShell` without caring which concrete role implementation was compiled for +/// the current binary. +/// +/// # Example +/// ```rust +/// use unshell_leaves::role_leaf; +/// mod endpoint { pub struct DemoEndpoint; } +/// mod tui { pub struct DemoTui; } +/// role_leaf! { +/// pub type DemoLeaf { +/// endpoint => endpoint::DemoEndpoint, +/// tui => tui::DemoTui, +/// } +/// } +/// # #[cfg(feature = "leaf_endpoint")] +/// # let _ = core::marker::PhantomData::; +/// ``` #[macro_export] macro_rules! role_leaf { ( @@ -47,9 +66,25 @@ macro_rules! role_leaf { /// Minimal leaf-specific TUI contract. /// -/// The initial implementation intentionally stays transport-agnostic. A CLI can -/// feed validated protocol `DataMessage` values into a leaf TUI and ask it for a -/// textual frame without depending on a specific rendering crate yet. +/// What it is: the smallest public trait a leaf-specific user interface needs in +/// order to consume protocol `DataMessage` values and render a textual frame. +/// +/// Why it exists: leaf UIs should remain transport-agnostic and renderer-agnostic, +/// so callers can experiment with CLIs and TUIs without coupling the core leaf API +/// to any one terminal framework. +/// +/// # Example +/// ```rust +/// use unshell_leaves::{LeafTui, TuiError}; +/// use unshell_leaves::protocol::DataMessage; +/// struct DemoTui; +/// impl LeafTui for DemoTui { +/// fn leaf_name(&self) -> String { "org.example.v1.demo".into() } +/// fn handle_data(&mut self, _message: &DataMessage) -> Result<(), TuiError> { Ok(()) } +/// fn render(&self) -> String { String::from("demo") } +/// } +/// assert_eq!(DemoTui.render(), "demo"); +/// ``` pub trait LeafTui { /// Returns the canonical protocol leaf name this UI understands. fn leaf_name(&self) -> String; @@ -62,6 +97,18 @@ pub trait LeafTui { } /// Lightweight error used by the leaf TUI surface. +/// +/// What it is: a small owned-string error for UI adapters built on [`LeafTui`]. +/// +/// Why it exists: the TUI surface should not force downstream UIs into a heavier +/// error dependency just to report leaf-local rendering or decoding failures. +/// +/// # Example +/// ```rust +/// use unshell_leaves::TuiError; +/// let error = TuiError::new("invalid frame"); +/// assert_eq!(error.to_string(), "invalid frame"); +/// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct TuiError { message: String, @@ -84,4 +131,5 @@ impl core::fmt::Display for TuiError { impl core::error::Error for TuiError {} +pub mod crossbeam_channel; pub mod remote_shell; diff --git a/unshell-macros/src/procedures.rs b/unshell-macros/src/procedures.rs index 4d23fc8..f40f64a 100644 --- a/unshell-macros/src/procedures.rs +++ b/unshell-macros/src/procedures.rs @@ -253,7 +253,9 @@ fn expand_invocation( }}) } -fn split_endpoint_arg<'a>(inputs: &[&'a FnArg]) -> Result<(Option, Vec<&'a FnArg>)> { +fn split_endpoint_arg<'a>( + inputs: &[&'a FnArg], +) -> Result<(Option, Vec<&'a FnArg>)> { let Some(first) = inputs.first() else { return Ok((None, Vec::new())); }; diff --git a/unshell-protocol/src/protocol/tests/call.rs b/unshell-protocol/src/protocol/tests/call.rs index de25565..368a079 100644 --- a/unshell-protocol/src/protocol/tests/call.rs +++ b/unshell-protocol/src/protocol/tests/call.rs @@ -276,8 +276,7 @@ fn generated_call_procedure_can_query_and_mutate_endpoint_topology() { .send_call( path(&["agent"]), Some(TopologyLeaf::protocol_leaf_name()), - TopologyLeaf::protocol_procedure_id("remove_child") - .expect("suffix should resolve"), + TopologyLeaf::protocol_procedure_id("remove_child").expect("suffix should resolve"), Some(remove_hook), encode_call_reply(&ChildRequest { child_path: path(&["agent", "child"]), diff --git a/unshell-protocol/src/protocol/tree/call.rs b/unshell-protocol/src/protocol/tree/call.rs index 56587a5..ca93e5b 100644 --- a/unshell-protocol/src/protocol/tree/call.rs +++ b/unshell-protocol/src/protocol/tree/call.rs @@ -9,11 +9,11 @@ use crate::protocol::{ CallMessage, DataMessage, FrameBytes, FrameError, HookTarget, PacketHeader, ProtocolFault, }; +use super::endpoint::ForwardedFrame; use super::{ Endpoint, EndpointError, HookKey, Ingress, LocalEvent, ProtocolEndpoint, ProtocolLeaf, RouteDecision, RouterLeaf, }; -use super::endpoint::ForwardedFrame; /// One typed incoming `Call` passed to a leaf procedure. /// @@ -665,9 +665,10 @@ where packet.data, packet.end_hook, )?; - runtime - .forwarded - .extend(self.process_endpoint_outcome_routed(endpoint_outcome)?.forwarded); + runtime.forwarded.extend( + self.process_endpoint_outcome_routed(endpoint_outcome)? + .forwarded, + ); } Ok(runtime) } diff --git a/unshell-protocol/src/protocol/tree/endpoint/builders.rs b/unshell-protocol/src/protocol/tree/endpoint/builders.rs index 1c721cc..7fdcae8 100644 --- a/unshell-protocol/src/protocol/tree/endpoint/builders.rs +++ b/unshell-protocol/src/protocol/tree/endpoint/builders.rs @@ -281,7 +281,11 @@ impl ProtocolEndpoint { /// ``` pub fn upsert_child_route(&mut self, route: ChildRoute) -> Result<(), EndpointError> { self.validate_direct_child_path(&route.path)?; - if let Some(existing) = self.children.iter_mut().find(|child| child.path == route.path) { + if let Some(existing) = self + .children + .iter_mut() + .find(|child| child.path == route.path) + { *existing = route; } else { self.children.push(route); @@ -371,23 +375,27 @@ impl ProtocolEndpoint { fn validate_direct_parent_path(&self, parent_path: &[String]) -> Result<(), EndpointError> { let Some((_, expected_parent)) = self.path.split_last() else { - return Err(EndpointError::Validation(ValidationError::TopologyInvariant( - "root endpoints cannot declare a parent path", - ))); + return Err(EndpointError::Validation( + ValidationError::TopologyInvariant("root endpoints cannot declare a parent path"), + )); }; if parent_path != expected_parent { - return Err(EndpointError::Validation(ValidationError::TopologyInvariant( - "parent path must equal the direct path prefix of this endpoint", - ))); + return Err(EndpointError::Validation( + ValidationError::TopologyInvariant( + "parent path must equal the direct path prefix of this endpoint", + ), + )); } Ok(()) } fn validate_direct_child_path(&self, child_path: &[String]) -> Result<(), EndpointError> { if child_path.len() != self.path.len() + 1 || !child_path.starts_with(&self.path) { - return Err(EndpointError::Validation(ValidationError::TopologyInvariant( - "child path must be one direct descendant of this endpoint", - ))); + return Err(EndpointError::Validation( + ValidationError::TopologyInvariant( + "child path must be one direct descendant of this endpoint", + ), + )); } Ok(()) } diff --git a/unshell-protocol/src/protocol/tree/endpoint/mod.rs b/unshell-protocol/src/protocol/tree/endpoint/mod.rs index 0f402ba..b8692c9 100644 --- a/unshell-protocol/src/protocol/tree/endpoint/mod.rs +++ b/unshell-protocol/src/protocol/tree/endpoint/mod.rs @@ -11,6 +11,6 @@ mod introspection; mod receive; pub use core::{ - ChildRoute, Endpoint, EndpointError, EndpointOutcome, ForwardedFrame, Ingress, LeafSpec, LocalEvent, - ProtocolEndpoint, + ChildRoute, Endpoint, EndpointError, EndpointOutcome, ForwardedFrame, Ingress, LeafSpec, + LocalEvent, ProtocolEndpoint, }; diff --git a/unshell-protocol/src/protocol/tree/leaf.rs b/unshell-protocol/src/protocol/tree/leaf.rs index ca87621..90e6324 100644 --- a/unshell-protocol/src/protocol/tree/leaf.rs +++ b/unshell-protocol/src/protocol/tree/leaf.rs @@ -190,13 +190,13 @@ pub trait CallProcedures: LeafDeclaration { /// use unshell::protocol::tree::{CallProcedures, DispatchError, IncomingCall, ProtocolLeaf}; /// struct ExampleLeaf; /// impl ProtocolLeaf for ExampleLeaf { fn leaf_name() -> String { "org.example.v1.echo".into() } } -/// impl CallProcedures for ExampleLeaf { -/// type Error = core::convert::Infallible; -/// fn procedure_suffixes() -> &'static [&'static str] { &["invoke"] } -/// fn dispatch_call(&mut self, _endpoint: &mut unshell::protocol::tree::ProtocolEndpoint, _call: IncomingCall) -> Result> { -/// Ok(unshell::protocol::tree::CallReply::NoReply) -/// } -/// } + /// impl CallProcedures for ExampleLeaf { + /// type Error = core::convert::Infallible; + /// fn procedure_suffixes() -> &'static [&'static str] { &["invoke"] } + /// fn dispatch_call(&mut self, _endpoint: &mut unshell::protocol::tree::ProtocolEndpoint, _call: IncomingCall) -> Result> { + /// Ok(unshell::protocol::tree::CallReply::NoReply) + /// } + /// } /// # let _ = ExampleLeaf; /// ``` fn dispatch_call(