Big rewrite.

This commit is contained in:
Michael Mikovsky
2026-05-16 13:10:51 -06:00
parent da9166daf0
commit 56abb5e1e0
63 changed files with 4 additions and 14547 deletions
-692
View File
@@ -1,692 +0,0 @@
//! Crossbeam-channel-backed router leaf for in-process protocol simulations.
//!
//! This leaf owns parent/child transport links backed by `crossbeam_channel`, so
//! tests and examples can exercise full packet routing without opening real
//! sockets.
use std::collections::BTreeMap;
use crossbeam_channel::Sender;
use rkyv::{Archive, Deserialize, Serialize};
use unshell_protocol::FrameBytes;
use unshell_protocol::tree::{
CallLeaf, ChildRoute, Endpoint, Ingress, ProtocolEndpoint, RouterLeaf,
};
use crate::{leaf, procedures};
/// One inbound frame delivered across a simulated channel hop.
///
/// What it is: the transport envelope sent between in-process nodes when this
/// leaf forwards protocol traffic over `crossbeam_channel`.
///
/// Why it exists: routing needs both the encoded frame bytes and the ingress side
/// that the receiver should apply when validating source paths.
///
/// # Example
/// ```rust
/// use unshell_leaves::crossbeam_channel::CrossbeamEnvelope;
/// use unshell_leaves::protocol::{FrameBytes, tree::Ingress};
/// let envelope = CrossbeamEnvelope {
/// ingress: Ingress::Parent,
/// frame: FrameBytes::new(),
/// };
/// assert!(matches!(envelope.ingress, Ingress::Parent));
/// ```
#[derive(Debug, Clone)]
pub struct CrossbeamEnvelope {
/// Which side of the tree the receiving endpoint should treat this frame as coming from.
pub ingress: Ingress,
/// Encoded protocol frame bytes.
pub frame: FrameBytes,
}
/// Request payload for promoting or pruning one simulated connection.
///
/// What it is: the protocol payload shared by the `add_connection` and
/// `remove_connection` procedures.
///
/// Why it exists: the leaf only needs the peer endpoint path to decide whether the
/// connection is a direct parent edge or a direct child edge.
///
/// # Example
/// ```rust
/// use unshell_leaves::crossbeam_channel::ConnectionRequest;
/// let request = ConnectionRequest {
/// peer_path: vec!["agent".into(), "child".into()],
/// };
/// assert_eq!(request.peer_path.len(), 2);
/// ```
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct ConnectionRequest {
/// Absolute endpoint path of the peer connection being managed.
pub peer_path: Vec<String>,
}
/// Machine-readable snapshot of the leaf's active simulated connections.
///
/// What it is: the reply payload returned by `get_connections`, `add_connection`,
/// and `remove_connection`.
///
/// Why it exists: connection-management procedures should return the resulting
/// topology immediately so tests and tooling can confirm what changed.
///
/// # Example
/// ```rust
/// use unshell_leaves::crossbeam_channel::ConnectionSnapshot;
/// let snapshot = ConnectionSnapshot {
/// parent: Some(vec!["agent".into()]),
/// children: vec![vec!["agent".into(), "child".into()]],
/// };
/// assert_eq!(snapshot.children.len(), 1);
/// ```
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct ConnectionSnapshot {
/// The direct parent path, if this endpoint currently has one.
pub parent: Option<Vec<String>>,
/// The currently active direct child paths.
pub children: Vec<Vec<String>>,
}
/// Errors surfaced by the channel-backed router leaf.
///
/// What it is: the small, deterministic error set used by both the management
/// procedures and the transport forwarding hooks.
///
/// Why it exists: tests and examples need structured failures when a staged link is
/// missing, a path is not a direct neighbor, or a channel is already closed.
///
/// # Example
/// ```rust
/// use unshell_leaves::crossbeam_channel::CrossbeamChannelError;
/// let error = CrossbeamChannelError::MissingStagedConnection;
/// assert_eq!(error.to_string(), "missing staged connection");
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CrossbeamChannelError {
/// The requested peer path does not have a staged sender ready to activate.
MissingStagedConnection,
/// The requested peer path is neither the direct parent nor a direct child.
InvalidPeerPath,
/// No active parent link exists for upstream forwarding.
MissingParentConnection,
/// No active child link exists for the requested child path.
MissingChildConnection,
/// The receiving side of the channel is already disconnected.
ChannelClosed,
}
impl core::fmt::Display for CrossbeamChannelError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::MissingStagedConnection => f.write_str("missing staged connection"),
Self::InvalidPeerPath => f.write_str("peer path is not a direct parent or child"),
Self::MissingParentConnection => f.write_str("missing parent connection"),
Self::MissingChildConnection => f.write_str("missing child connection"),
Self::ChannelClosed => f.write_str("channel receiver is disconnected"),
}
}
}
impl core::error::Error for CrossbeamChannelError {}
/// Shared compile-time declaration for the crossbeam-channel router leaf.
///
/// What it is: the public leaf declaration that owns the canonical leaf name and
/// exported management procedure ids for [`CrossbeamChannelLeaf`].
///
/// Why it exists: endpoint code, examples, and tests should all derive the same
/// protocol-facing metadata from one source of truth instead of hand-assembling
/// the leaf id and procedure inventory.
///
/// # Example
/// ```rust
/// use unshell_leaves::crossbeam_channel::CrossbeamChannel;
/// assert!(CrossbeamChannel::protocol_leaf_name().contains("crossbeam_channel"));
/// ```
#[leaf(
id = "org.unshell.v1.crossbeam_channel",
endpoint_struct = CrossbeamChannelLeaf,
procedures = ["add_connection", "remove_connection", "get_connections"]
)]
pub struct CrossbeamChannel;
/// In-process router leaf backed by `crossbeam_channel` senders.
///
/// What it is: a leaf host that stores one optional parent sender, any number of
/// child senders, and a staging area for connections that should only become live
/// after an explicit procedure call.
///
/// Why it exists: protocol tests need a realistic forwarding surface with parent
/// and child links, but opening TCP sockets would make those tests slower and more
/// brittle than necessary.
///
/// # Example
/// ```rust
/// use crossbeam_channel::unbounded;
/// use unshell_leaves::crossbeam_channel::CrossbeamChannelLeaf;
/// let (tx, _rx) = unbounded();
/// let mut leaf = CrossbeamChannelLeaf::default();
/// let previous = leaf.stage_connection(vec!["agent".into()], tx);
/// assert!(previous.is_none());
/// ```
#[derive(Default)]
pub struct CrossbeamChannelLeaf {
parent: Option<ChannelConnection>,
children: BTreeMap<Vec<String>, Sender<CrossbeamEnvelope>>,
child_routes: Vec<ChildRoute>,
staged: BTreeMap<Vec<String>, Sender<CrossbeamEnvelope>>,
}
#[derive(Debug, Clone)]
struct ChannelConnection {
path: Vec<String>,
sender: Sender<CrossbeamEnvelope>,
}
impl CrossbeamChannelLeaf {
/// Stages one channel sender so a later protocol procedure can activate it.
///
/// What it is: a bootstrap helper that prepares the transport handle before the
/// leaf promotes it into active routing state.
///
/// Why it exists: the sender itself is not a serializable protocol payload, so
/// tests and examples need a local way to install it before calling
/// `add_connection`.
pub fn stage_connection(
&mut self,
peer_path: Vec<String>,
sender: Sender<CrossbeamEnvelope>,
) -> Option<Sender<CrossbeamEnvelope>> {
self.staged.insert(peer_path, sender)
}
/// Promotes one staged connection into the active topology.
///
/// This is the same operation used by the public `add_connection` procedure,
/// but it is also useful for local bootstrap code that has not yet wired the
/// control plane needed to issue that call remotely.
pub fn connect_staged(
&mut self,
endpoint: &mut ProtocolEndpoint,
peer_path: Vec<String>,
) -> Result<ConnectionSnapshot, CrossbeamChannelError> {
if !is_direct_parent(endpoint.path(), &peer_path)
&& !is_direct_child(endpoint.path(), &peer_path)
{
return Err(CrossbeamChannelError::InvalidPeerPath);
}
let Some(sender) = self.staged.remove(&peer_path) else {
return Err(CrossbeamChannelError::MissingStagedConnection);
};
if is_direct_parent(endpoint.path(), &peer_path) {
self.parent = Some(ChannelConnection {
path: peer_path.clone(),
sender,
});
endpoint
.set_parent_path(Some(peer_path))
.map_err(|_| CrossbeamChannelError::InvalidPeerPath)?;
return Ok(ConnectionSnapshot::from_endpoint(endpoint));
}
if is_direct_child(endpoint.path(), &peer_path) {
self.children.insert(peer_path.clone(), sender);
self.sync_child_routes();
endpoint
.upsert_child_route(ChildRoute::registered(peer_path))
.map_err(|_| CrossbeamChannelError::InvalidPeerPath)?;
return Ok(ConnectionSnapshot::from_endpoint(endpoint));
}
unreachable!("direct-neighbor validation returned early above")
}
/// Removes one active connection and returns it to the staged set.
pub fn disconnect(
&mut self,
endpoint: &mut ProtocolEndpoint,
peer_path: &[String],
) -> Result<ConnectionSnapshot, CrossbeamChannelError> {
if !is_direct_parent(endpoint.path(), peer_path)
&& !is_direct_child(endpoint.path(), peer_path)
{
return Err(CrossbeamChannelError::InvalidPeerPath);
}
if self
.parent
.as_ref()
.is_some_and(|parent| parent.path == peer_path)
{
let Some(parent) = self.parent.take() else {
return Err(CrossbeamChannelError::MissingParentConnection);
};
self.staged.insert(parent.path, parent.sender);
endpoint
.set_parent_path(None)
.map_err(|_| CrossbeamChannelError::InvalidPeerPath)?;
return Ok(ConnectionSnapshot::from_endpoint(endpoint));
}
let Some(sender) = self.children.remove(peer_path) else {
return Err(CrossbeamChannelError::MissingChildConnection);
};
self.staged.insert(peer_path.to_vec(), sender);
self.sync_child_routes();
endpoint.remove_child_route(peer_path);
Ok(ConnectionSnapshot::from_endpoint(endpoint))
}
fn sync_child_routes(&mut self) {
self.child_routes = self
.children
.keys()
.cloned()
.map(ChildRoute::registered)
.collect();
}
}
impl ConnectionSnapshot {
fn from_endpoint(endpoint: &ProtocolEndpoint) -> Self {
Self {
parent: endpoint.parent_path().map(<[String]>::to_vec),
children: endpoint
.child_routes()
.iter()
.map(|child| child.path.clone())
.collect(),
}
}
}
#[procedures(error = CrossbeamChannelError)]
impl CrossbeamChannelLeaf {
#[call]
fn add_connection(
&mut self,
endpoint: &mut ProtocolEndpoint,
request: ConnectionRequest,
) -> Result<ConnectionSnapshot, CrossbeamChannelError> {
self.connect_staged(endpoint, request.peer_path)
}
#[call]
fn remove_connection(
&mut self,
endpoint: &mut ProtocolEndpoint,
request: ConnectionRequest,
) -> Result<ConnectionSnapshot, CrossbeamChannelError> {
self.disconnect(endpoint, &request.peer_path)
}
#[call]
fn get_connections(&mut self, endpoint: &ProtocolEndpoint) -> ConnectionSnapshot {
ConnectionSnapshot::from_endpoint(endpoint)
}
}
impl CallLeaf for CrossbeamChannelLeaf {
type Error = CrossbeamChannelError;
}
impl RouterLeaf for CrossbeamChannelLeaf {
type RouteError = CrossbeamChannelError;
fn parent_path(&self) -> Option<&[String]> {
self.parent.as_ref().map(|parent| parent.path.as_slice())
}
fn child_routes(&self) -> &[ChildRoute] {
&self.child_routes
}
fn route_to_parent(
&mut self,
local_path: &[String],
frame: FrameBytes,
) -> Result<(), Self::RouteError> {
let Some(parent) = &self.parent else {
return Err(CrossbeamChannelError::MissingParentConnection);
};
parent
.sender
.send(CrossbeamEnvelope {
ingress: Ingress::Child(local_path.to_vec()),
frame,
})
.map_err(|_| CrossbeamChannelError::ChannelClosed)
}
fn route_to_child(
&mut self,
child_path: &[String],
frame: FrameBytes,
) -> Result<(), Self::RouteError> {
let Some(sender) = self.children.get(child_path) else {
return Err(CrossbeamChannelError::MissingChildConnection);
};
sender
.send(CrossbeamEnvelope {
ingress: Ingress::Parent,
frame,
})
.map_err(|_| CrossbeamChannelError::ChannelClosed)
}
}
fn is_direct_parent(local_path: &[String], peer_path: &[String]) -> bool {
local_path
.split_last()
.is_some_and(|(_, parent_path)| parent_path == peer_path)
}
fn is_direct_child(local_path: &[String], peer_path: &[String]) -> bool {
peer_path.len() == local_path.len() + 1 && peer_path.starts_with(local_path)
}
#[cfg(test)]
mod tests {
use crossbeam_channel::{Receiver, unbounded};
use unshell_protocol::decode_frame;
use unshell_protocol::tree::{
Endpoint, EndpointOutcome, LeafRuntime, decode_call_input, encode_call_reply,
};
use super::*;
fn path(parts: &[&str]) -> Vec<String> {
parts.iter().map(|part| (*part).to_owned()).collect()
}
struct ChannelNode {
runtime: LeafRuntime<CrossbeamChannelLeaf>,
rx: Receiver<CrossbeamEnvelope>,
}
impl ChannelNode {
fn new(path: Vec<String>) -> (Self, Sender<CrossbeamEnvelope>) {
let (tx, rx) = unbounded();
let endpoint = ProtocolEndpoint::new(
path,
None,
Vec::new(),
vec![CrossbeamChannelLeaf::protocol_leaf_spec()],
);
(
Self {
runtime: LeafRuntime::new(endpoint, CrossbeamChannelLeaf::default()),
rx,
},
tx,
)
}
fn drain(&mut self) -> usize {
let mut processed = 0usize;
while let Ok(envelope) = self.rx.try_recv() {
let outcome = self
.runtime
.receive_routed(&envelope.ingress, envelope.frame)
.expect("node should process routed frame");
self.runtime
.route_forwarded(outcome.forwarded)
.expect("router leaf should forward emitted frames");
processed += 1;
}
processed
}
fn stage_connection(&mut self, peer_path: Vec<String>, sender: Sender<CrossbeamEnvelope>) {
let _ = self.runtime.leaf_mut().stage_connection(peer_path, sender);
}
fn connect_staged(&mut self, peer_path: Vec<String>) {
let snapshot = {
let runtime = &mut self.runtime;
let mut leaf = core::mem::take(runtime.leaf_mut());
let result = leaf.connect_staged(runtime.endpoint_mut(), peer_path);
*runtime.leaf_mut() = leaf;
result
};
snapshot.expect("staged connection should activate");
}
}
#[test]
fn crossbeam_channel_leaf_routes_calls_and_replies_across_parent_and_child_links() {
let (mut agent, root_to_agent) = ChannelNode::new(path(&["agent"]));
let (mut child, agent_to_child) = ChannelNode::new(path(&["agent", "child"]));
let (agent_to_root, root_rx) = unbounded();
let mut root = ProtocolEndpoint::new(
Vec::new(),
None,
vec![ChildRoute::registered(path(&["agent"]))],
Vec::new(),
);
agent.stage_connection(Vec::new(), agent_to_root);
agent.connect_staged(Vec::new());
child.stage_connection(path(&["agent"]), root_to_agent.clone());
child.connect_staged(path(&["agent"]));
agent.stage_connection(path(&["agent", "child"]), agent_to_child);
let hook_id = root.allocate_hook_id();
let add_connection = root
.send_call(
path(&["agent"]),
Some(CrossbeamChannelLeaf::protocol_leaf_name()),
CrossbeamChannelLeaf::protocol_procedure_id("add_connection")
.expect("procedure should exist"),
Some(hook_id),
encode_call_reply(&ConnectionRequest {
peer_path: path(&["agent", "child"]),
})
.expect("request should encode"),
)
.expect("root should build add-connection call");
let EndpointOutcome::Forward { frame, .. } = add_connection else {
panic!("root should forward add-connection call");
};
root_to_agent
.send(CrossbeamEnvelope {
ingress: Ingress::Parent,
frame,
})
.expect("root should deliver frame to agent");
for _ in 0..8 {
let mut progress = 0usize;
progress += agent.drain();
progress += child.drain();
while let Ok(envelope) = root_rx.try_recv() {
let outcome = root
.receive(&envelope.ingress, envelope.frame)
.expect("root should accept reply frame");
if let EndpointOutcome::Local(local) = outcome {
match local {
unshell_protocol::tree::LocalEvent::Data { .. }
| unshell_protocol::tree::LocalEvent::Fault { .. } => {}
unshell_protocol::tree::LocalEvent::Call { .. } => {}
}
}
progress += 1;
}
if progress == 0 {
break;
}
}
assert_eq!(agent.runtime.endpoint().child_routes().len(), 1);
let query_hook = root.allocate_hook_id();
let query = root
.send_call(
path(&["agent", "child"]),
Some(CrossbeamChannelLeaf::protocol_leaf_name()),
CrossbeamChannelLeaf::protocol_procedure_id("get_connections")
.expect("procedure should exist"),
Some(query_hook),
encode_call_reply(&()).expect("unit request should encode"),
)
.expect("root should build query call");
let EndpointOutcome::Forward { frame, .. } = query else {
panic!("root should forward query call");
};
root_to_agent
.send(CrossbeamEnvelope {
ingress: Ingress::Parent,
frame,
})
.expect("root should deliver query to agent");
let mut reply = None;
for _ in 0..12 {
let mut progress = 0usize;
progress += agent.drain();
progress += child.drain();
while let Ok(envelope) = root_rx.try_recv() {
let outcome = root
.receive(&envelope.ingress, envelope.frame)
.expect("root should accept routed reply");
if let EndpointOutcome::Local(unshell_protocol::tree::LocalEvent::Data {
message,
..
}) = outcome
{
reply = Some(
decode_call_input::<ConnectionSnapshot>(message.data.as_slice())
.expect("reply payload should decode"),
);
}
progress += 1;
}
if reply.is_some() || progress == 0 {
break;
}
}
let reply = reply.expect("root should receive child connection snapshot");
assert_eq!(reply.parent, Some(path(&["agent"])));
assert!(reply.children.is_empty());
let remove_hook = root.allocate_hook_id();
let remove = root
.send_call(
path(&["agent"]),
Some(CrossbeamChannelLeaf::protocol_leaf_name()),
CrossbeamChannelLeaf::protocol_procedure_id("remove_connection")
.expect("procedure should exist"),
Some(remove_hook),
encode_call_reply(&ConnectionRequest {
peer_path: path(&["agent", "child"]),
})
.expect("request should encode"),
)
.expect("root should build remove-connection call");
let EndpointOutcome::Forward { frame, .. } = remove else {
panic!("root should forward remove-connection call");
};
root_to_agent
.send(CrossbeamEnvelope {
ingress: Ingress::Parent,
frame,
})
.expect("root should deliver removal call to agent");
for _ in 0..8 {
let mut progress = 0usize;
progress += agent.drain();
progress += child.drain();
while let Ok(envelope) = root_rx.try_recv() {
let _ = root
.receive(&envelope.ingress, envelope.frame)
.expect("root should process removal reply");
progress += 1;
}
if progress == 0 {
break;
}
}
assert!(agent.runtime.endpoint().child_routes().is_empty());
let final_hook = root.allocate_hook_id();
let dropped = root
.send_call(
path(&["agent", "child"]),
Some(CrossbeamChannelLeaf::protocol_leaf_name()),
CrossbeamChannelLeaf::protocol_procedure_id("get_connections")
.expect("procedure should exist"),
Some(final_hook),
encode_call_reply(&()).expect("unit request should encode"),
)
.expect("query call should encode after removal");
assert!(matches!(dropped, EndpointOutcome::Forward { .. }));
if let EndpointOutcome::Forward { frame, .. } = dropped {
root_to_agent
.send(CrossbeamEnvelope {
ingress: Ingress::Parent,
frame,
})
.expect("root should still reach the agent");
}
let mut saw_reply = false;
for _ in 0..8 {
let mut progress = 0usize;
progress += agent.drain();
progress += child.drain();
while let Ok(envelope) = root_rx.try_recv() {
progress += 1;
if let EndpointOutcome::Local(unshell_protocol::tree::LocalEvent::Data {
message,
..
}) = root
.receive(&envelope.ingress, envelope.frame)
.expect("root should process any late reply")
{
let _ = decode_frame(message.data.as_slice());
saw_reply = true;
}
}
if progress == 0 {
break;
}
}
assert!(
!saw_reply,
"removed child route should stop forwarded replies"
);
}
#[test]
fn invalid_add_connection_keeps_staged_sender_available_for_retry() {
let (tx, _rx) = unbounded();
let mut leaf = CrossbeamChannelLeaf::default();
let mut endpoint = ProtocolEndpoint::new(path(&["agent"]), None, Vec::new(), Vec::new());
leaf.stage_connection(path(&["elsewhere"]), tx);
let error = leaf
.connect_staged(&mut endpoint, path(&["elsewhere"]))
.expect_err("non-neighbor path should fail");
assert_eq!(error, CrossbeamChannelError::InvalidPeerPath);
assert!(leaf.staged.contains_key(&path(&["elsewhere"])));
}
#[test]
fn invalid_remove_connection_reports_invalid_peer_path() {
let mut leaf = CrossbeamChannelLeaf::default();
let mut endpoint = ProtocolEndpoint::new(path(&["agent"]), None, Vec::new(), Vec::new());
let error = leaf
.disconnect(&mut endpoint, &path(&["not", "a", "neighbor"]))
.expect_err("non-neighbor removal should fail");
assert_eq!(error, CrossbeamChannelError::InvalidPeerPath);
}
}
-135
View File
@@ -1,136 +1 @@
//! Application-layer leaves and user-facing surfaces built on top of the UnShell
//! protocol runtime.
//!
//! Each leaf module always exports its shared protocol-facing types. Role-specific
//! implementations are selected with the crate-wide `leaf_endpoint` and `leaf_tui`
//! features, and can optionally be re-exported behind one stable alias.
#[allow(unused_extern_crates)]
extern crate self as unshell;
pub extern crate alloc;
use unshell_protocol::DataMessage;
pub use unshell_macros::{Procedure, leaf, procedures};
pub use unshell_protocol as protocol;
/// Re-exports one role-specific type behind a stable public alias.
///
/// What it is: a small macro that binds one public type alias to either an
/// endpoint-facing leaf host or a TUI-facing leaf host based on active features.
///
/// Why it exists: downstream code should be able to import one stable name such as
/// `RemoteShell` without caring which concrete role implementation was compiled for
/// the current binary.
///
/// # Example
/// ```rust
/// use unshell_leaves::role_leaf;
/// mod endpoint { pub struct DemoEndpoint; }
/// mod tui { pub struct DemoTui; }
/// # #[cfg(not(all(feature = "leaf_endpoint", feature = "leaf_tui")))]
/// role_leaf! {
/// pub type DemoLeaf {
/// endpoint => endpoint::DemoEndpoint,
/// tui => tui::DemoTui,
/// }
/// }
/// # #[cfg(all(feature = "leaf_endpoint", not(feature = "leaf_tui")))]
/// # let _ = core::marker::PhantomData::<DemoLeaf>;
/// ```
#[macro_export]
macro_rules! role_leaf {
(
$(#[$meta:meta])*
$vis:vis type $alias:ident {
endpoint => $endpoint:path,
tui => $tui:path $(,)?
}
) => {
#[cfg(all(feature = "leaf_endpoint", feature = "leaf_tui"))]
compile_error!(concat!(
"`",
stringify!($alias),
"` can only alias one concrete role at a time; enable either `leaf_endpoint` or `leaf_tui`, not both"
));
#[cfg(feature = "leaf_endpoint")]
$(#[$meta])*
$vis type $alias = $endpoint;
#[cfg(all(not(feature = "leaf_endpoint"), feature = "leaf_tui"))]
$(#[$meta])*
$vis type $alias = $tui;
};
}
/// Minimal leaf-specific TUI contract.
///
/// What it is: the smallest public trait a leaf-specific user interface needs in
/// order to consume protocol `DataMessage` values and render a textual frame.
///
/// Why it exists: leaf UIs should remain transport-agnostic and renderer-agnostic,
/// so callers can experiment with CLIs and TUIs without coupling the core leaf API
/// to any one terminal framework.
///
/// # Example
/// ```rust
/// use unshell_leaves::{LeafTui, TuiError};
/// use unshell_leaves::protocol::DataMessage;
/// struct DemoTui;
/// impl LeafTui for DemoTui {
/// fn leaf_name(&self) -> String { "org.example.v1.demo".into() }
/// fn handle_data(&mut self, _message: &DataMessage) -> Result<(), TuiError> { Ok(()) }
/// fn render(&self) -> String { String::from("demo") }
/// }
/// assert_eq!(DemoTui.render(), "demo");
/// ```
pub trait LeafTui {
/// Returns the canonical protocol leaf name this UI understands.
fn leaf_name(&self) -> String;
/// Applies one inbound hook payload to the local UI state.
fn handle_data(&mut self, message: &DataMessage) -> Result<(), TuiError>;
/// Produces the current textual frame for the leaf.
fn render(&self) -> String;
}
/// Lightweight error used by the leaf TUI surface.
///
/// What it is: a small owned-string error for UI adapters built on [`LeafTui`].
///
/// Why it exists: the TUI surface should not force downstream UIs into a heavier
/// error dependency just to report leaf-local rendering or decoding failures.
///
/// # Example
/// ```rust
/// use unshell_leaves::TuiError;
/// let error = TuiError::new("invalid frame");
/// assert_eq!(error.to_string(), "invalid frame");
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TuiError {
message: String,
}
impl TuiError {
/// Creates one UI-surface error from owned text.
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
}
}
}
impl core::fmt::Display for TuiError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.write_str(&self.message)
}
}
impl core::error::Error for TuiError {}
pub mod crossbeam_channel;
pub mod remote_shell;
@@ -1,65 +0,0 @@
//! PTY-backed endpoint implementation for the remote shell leaf.
mod errors;
mod session;
mod transport;
use std::collections::BTreeMap;
use unshell::protocol::tree::{Call, HookKey, Procedure, ProcedureEffect, ProcedureStore};
pub use errors::ShellLeafError;
pub use session::Open;
pub use transport::{LISTEN_ADDR, send_forward, spawn_frame_reader, write_frames};
use super::OpenRequest;
/// Leaf state for the remote shell endpoint runtime.
///
/// The endpoint keeps each live shell session in an explicit map keyed by the
/// caller-owned hook identity. That makes ownership and cleanup of hook-backed
/// shell processes easy to inspect during debugging.
#[derive(Default)]
pub struct RemoteShell {
sessions: BTreeMap<HookKey, Open>,
}
impl ProcedureStore<Open> for RemoteShell {
fn procedure_sessions(&mut self) -> &mut BTreeMap<HookKey, Open> {
&mut self.sessions
}
}
impl Procedure<RemoteShell> for Open {
type Error = ShellLeafError;
type Input = OpenRequest;
fn open(_leaf: &mut RemoteShell, call: Call<Self::Input>) -> Result<Self, Self::Error> {
let hook_key = call.response_hook.ok_or(ShellLeafError::MissingHook)?;
Open::spawn(hook_key.return_path, hook_key.hook_id, call.procedure_id)
}
fn on_data(
_leaf: &mut RemoteShell,
session: &mut Self,
data: unshell::protocol::tree::IncomingData,
) -> Result<ProcedureEffect, Self::Error> {
session.on_data(data)
}
fn on_fault(
_leaf: &mut RemoteShell,
_session: &mut Self,
_fault: unshell::protocol::tree::IncomingFault,
) -> Result<(), Self::Error> {
Ok(())
}
fn poll(_leaf: &mut RemoteShell, session: &mut Self) -> Result<ProcedureEffect, Self::Error> {
session.poll()
}
fn close(_leaf: &mut RemoteShell, mut session: Self) -> Result<(), Self::Error> {
session.terminate()
}
}
@@ -1,28 +0,0 @@
use std::fmt;
use std::io;
/// Error produced by the remote shell endpoint implementation.
#[derive(Debug)]
pub enum ShellLeafError {
/// Underlying PTY or I/O failure.
Io(io::Error),
/// Shell open requires a response hook so the session can stream bytes back.
MissingHook,
}
impl fmt::Display for ShellLeafError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Io(error) => write!(f, "{error}"),
Self::MissingHook => f.write_str("shell open requires a response hook"),
}
}
}
impl std::error::Error for ShellLeafError {}
impl From<io::Error> for ShellLeafError {
fn from(value: io::Error) -> Self {
Self::Io(value)
}
}
@@ -1,289 +0,0 @@
//! Per-hook remote shell session lifecycle.
//!
//! A session opens one PTY-backed shell process and translates protocol hook
//! traffic into stdin writes and stdout or stderr chunks. Close is intentionally
//! two-sided: the peer signals input completion with `end_hook`, while the local
//! side closes only after the child exits and the PTY reader drains.
use std::io::{self, Read, Write};
use std::process::Command;
use std::sync::mpsc::{self, Receiver, SyncSender, TryRecvError};
use std::thread;
use portable_pty::{CommandBuilder, ExitStatus, PtySize, native_pty_system};
use unshell::Procedure;
use unshell::protocol::tree::{IncomingData, OutgoingData, ProcedureEffect};
use super::RemoteShell;
use super::errors::ShellLeafError;
/// Per-hook shell session created by the `open` procedure.
///
/// The procedure type is also the stored session type so the mapping between
/// one opening procedure and one live hook remains direct and visible.
#[derive(Procedure)]
#[procedure(leaf = RemoteShell, name = "open")]
pub struct Open {
/// Spawned PTY child process.
pub(super) child: Box<dyn portable_pty::Child + Send>,
/// Process-group leader used for Unix hangup and kill signaling.
process_group_leader: Option<u32>,
/// Buffered stdin bridge into the shell process.
stdin_tx: Option<SyncSender<Vec<u8>>>,
/// Buffered output stream read from the PTY.
output_rx: Receiver<OutputEvent>,
/// Hook return path for packets emitted by this session.
return_path: Vec<String>,
/// Hook identifier allocated by the caller.
hook_id: u64,
/// Procedure id bound to this shell hook.
procedure_id: String,
/// Whether the PTY reader has closed and drained.
output_closed: bool,
/// Observed child exit status, once known.
pub(super) exit_status: Option<ExitStatus>,
/// Whether this session already emitted its terminal local packet.
pub(super) local_end_sent: bool,
}
/// One event forwarded from the PTY reader thread.
enum OutputEvent {
Chunk(Vec<u8>),
ReaderClosed,
}
impl Open {
pub(super) fn spawn(
return_path: Vec<String>,
hook_id: u64,
procedure_id: String,
) -> Result<Self, ShellLeafError> {
let command = build_shell_command();
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
})
.map_err(|error| io::Error::other(error.to_string()))?;
let child = pair
.slave
.spawn_command(command)
.map_err(|error| io::Error::other(error.to_string()))?;
let process_group_leader = child.process_id();
let stdin = pair
.master
.take_writer()
.map_err(|error| io::Error::other(error.to_string()))?;
let stdout = pair
.master
.try_clone_reader()
.map_err(|error| io::Error::other(error.to_string()))?;
let (stdin_tx, rx) = spawn_io_threads(stdin, stdout);
Ok(Self {
child,
process_group_leader,
stdin_tx: Some(stdin_tx),
output_rx: rx,
return_path,
hook_id,
procedure_id,
output_closed: false,
exit_status: None,
local_end_sent: false,
})
}
/// Builds one outgoing hook packet owned by this session.
pub(super) fn packet(&self, data: Vec<u8>, end_hook: bool) -> OutgoingData {
OutgoingData {
dst_path: self.return_path.clone(),
hook_id: self.hook_id,
procedure_id: self.procedure_id.clone(),
data,
end_hook,
}
}
/// Forces the underlying shell process to stop and records its exit status.
pub(super) fn terminate(&mut self) -> Result<(), ShellLeafError> {
self.stdin_tx.take();
match self.child.try_wait()? {
Some(status) => {
self.exit_status = Some(status);
Ok(())
}
None => {
self.signal_process_group("-KILL");
self.child
.kill()
.map_err(|error| io::Error::other(error.to_string()))?;
self.exit_status = Some(
self.child
.wait()
.map_err(|error| io::Error::other(error.to_string()))?,
);
Ok(())
}
}
}
/// Drains any currently buffered PTY output into protocol packets.
pub(super) fn drain_output(&mut self, outgoing: &mut Vec<OutgoingData>) {
loop {
match self.output_rx.try_recv() {
Ok(OutputEvent::Chunk(bytes)) => outgoing.push(self.packet(bytes, false)),
Ok(OutputEvent::ReaderClosed) => self.output_closed = true,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
self.output_closed = true;
break;
}
}
}
}
/// Applies one inbound hook payload to the shell process.
pub(super) fn on_data(
&mut self,
data: IncomingData,
) -> Result<ProcedureEffect, ShellLeafError> {
if !data.message.data.is_empty() {
let Some(stdin_tx) = self.stdin_tx.as_ref() else {
return Ok(ProcedureEffect::default());
};
stdin_tx.try_send(data.message.data).map_err(|_| {
io::Error::new(io::ErrorKind::WouldBlock, "shell stdin channel full")
})?;
}
if !data.message.end_hook {
return Ok(ProcedureEffect::default());
}
// Peer end means no more stdin from the caller. Keep the process alive so
// buffered PTY output can drain through the normal poll path.
self.stdin_tx.take();
self.signal_process_group("-HUP");
Ok(ProcedureEffect::default())
}
/// Polls the shell for locally-generated output.
pub(super) fn poll(&mut self) -> Result<ProcedureEffect, ShellLeafError> {
let mut outgoing = Vec::new();
self.drain_output(&mut outgoing);
if self.local_end_sent {
return Ok(ProcedureEffect::outgoing(outgoing));
}
if self.exit_status.is_none() {
self.exit_status = self
.child
.try_wait()
.map_err(|error| io::Error::other(error.to_string()))?;
}
if self.exit_status.is_some() && !self.output_closed {
self.signal_process_group("-KILL");
}
if self.exit_status.is_some() && self.output_closed {
outgoing.push(self.packet(Vec::new(), true));
self.local_end_sent = true;
return Ok(ProcedureEffect::close(outgoing));
}
Ok(ProcedureEffect::outgoing(outgoing))
}
fn signal_process_group(&self, signal: &str) {
#[cfg(unix)]
if let Some(process_group_leader) = self.process_group_leader {
let _ = Command::new("kill")
.arg(signal)
.arg(format!("-{}", process_group_leader))
.status();
}
}
}
impl Drop for Open {
fn drop(&mut self) {
let _ = self.terminate();
}
}
fn spawn_pipe_writer(mut stdin: Box<dyn Write + Send>, rx: Receiver<Vec<u8>>) {
thread::spawn(move || {
for bytes in rx {
if stdin.write_all(&bytes).is_err() {
break;
}
if stdin.flush().is_err() {
break;
}
}
});
}
fn build_shell_command() -> CommandBuilder {
if cfg!(windows) {
let mut command = CommandBuilder::new("cmd.exe");
command.arg("/Q");
command
} else {
let mut command = CommandBuilder::new("/bin/sh");
command.arg("-i");
command
}
}
fn spawn_io_threads(
stdin: Box<dyn Write + Send>,
stdout: Box<dyn Read + Send>,
) -> (SyncSender<Vec<u8>>, Receiver<OutputEvent>) {
let (stdin_tx, stdin_rx) = mpsc::sync_channel(64);
let (tx, rx) = mpsc::sync_channel(64);
spawn_pipe_writer(stdin, stdin_rx);
spawn_pipe_reader(stdout, tx);
(stdin_tx, rx)
}
fn spawn_pipe_reader<R>(mut reader: R, tx: mpsc::SyncSender<OutputEvent>)
where
R: Read + Send + 'static,
{
thread::spawn(move || {
loop {
let mut buffer = [0u8; 1024];
match reader.read(&mut buffer) {
Ok(0) => {
let _ = tx.send(OutputEvent::ReaderClosed);
break;
}
Ok(read_len) => {
if tx
.send(OutputEvent::Chunk(buffer[..read_len].to_vec()))
.is_err()
{
break;
}
}
Err(error) if error.kind() == io::ErrorKind::Interrupted => {}
Err(error) => {
let _ = tx.send(OutputEvent::Chunk(
format!("shell pipe read error: {error}\n").into_bytes(),
));
let _ = tx.send(OutputEvent::ReaderClosed);
break;
}
}
}
});
}
@@ -1,93 +0,0 @@
use std::io::{self, ErrorKind, Read, Write};
use std::net::TcpStream;
use std::sync::mpsc::{self, Receiver};
use std::thread;
use unshell::protocol::FrameBytes;
use unshell::protocol::tree::EndpointOutcome;
/// TCP listen address used by the remote shell examples.
pub const LISTEN_ADDR: &str = "127.0.0.1:4444";
const MAX_FRAME_BYTES: usize = 1024 * 1024;
/// Writes the forwarded frame produced by one endpoint outcome.
pub fn send_forward(stream: &mut TcpStream, outcome: EndpointOutcome) -> io::Result<()> {
match outcome {
EndpointOutcome::Forward { frame, .. } => write_frames(stream, &[frame]),
EndpointOutcome::Local(_) | EndpointOutcome::Dropped => write_frames(stream, &[]),
}
}
/// Writes one or more framed packets onto the example TCP stream.
pub fn write_frames(stream: &mut TcpStream, frames: &[FrameBytes]) -> io::Result<()> {
for frame in frames {
let frame_len = u32::try_from(frame.len()).map_err(|_| {
io::Error::new(ErrorKind::InvalidData, "frame exceeds u32 transport size")
})?;
stream.write_all(&frame_len.to_be_bytes())?;
stream.write_all(frame)?;
}
stream.flush()?;
Ok(())
}
/// Spawns the example frame reader that lifts prefixed frames off the TCP stream.
pub fn spawn_frame_reader(mut stream: TcpStream) -> Receiver<io::Result<FrameBytes>> {
let (tx, rx) = mpsc::sync_channel(64);
thread::spawn(move || {
loop {
match read_frame(&mut stream) {
Ok(Some(frame)) => {
if tx.send(Ok(frame)).is_err() {
break;
}
}
Ok(None) => break,
Err(error) => {
let _ = tx.send(Err(error));
break;
}
}
}
});
rx
}
fn read_frame(stream: &mut TcpStream) -> io::Result<Option<FrameBytes>> {
let Some(len_bytes) = read_prefix(stream)? else {
return Ok(None);
};
let frame_len = u32::from_be_bytes(len_bytes) as usize;
if frame_len > MAX_FRAME_BYTES {
return Err(io::Error::new(
ErrorKind::InvalidData,
"frame exceeds remote shell example transport limit",
));
}
let mut bytes = vec![0u8; frame_len];
stream.read_exact(&mut bytes)?;
let mut frame = FrameBytes::with_capacity(bytes.len());
frame.extend_from_slice(&bytes);
Ok(Some(frame))
}
fn read_prefix(stream: &mut TcpStream) -> io::Result<Option<[u8; 4]>> {
let mut len_bytes = [0u8; 4];
let mut filled = 0usize;
while filled < len_bytes.len() {
match stream.read(&mut len_bytes[filled..]) {
Ok(0) if filled == 0 => return Ok(None),
Ok(0) => return Err(io::Error::from(ErrorKind::UnexpectedEof)),
Ok(read_len) => filled += read_len,
Err(error) if error.kind() == ErrorKind::Interrupted => {}
Err(error) => return Err(error),
}
}
Ok(Some(len_bytes))
}
-26
View File
@@ -1,26 +0,0 @@
//! Remote shell leaf and its user-facing surfaces.
//!
//! The module always exports the protocol contract for the leaf together with the
//! endpoint and TUI host implementations.
use rkyv::{Archive, Deserialize, Serialize};
use unshell_macros::leaf;
pub mod endpoint;
pub mod tui;
/// Open-request payload for the remote shell leaf.
///
/// The shell currently needs no structured arguments, but a named payload type is
/// easier for downstream code to discover than a bare `()`.
#[derive(Archive, Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
pub struct OpenRequest;
#[leaf(
name = "remote_shell",
procedures = [Open],
endpoint = endpoint,
tui = tui,
)]
/// Shared compile-time declaration for the `remote_shell` leaf surface.
pub struct RemoteShell;
-48
View File
@@ -1,48 +0,0 @@
//! Placeholder client-side TUI surface for the remote shell leaf.
//!
//! The first application-layer consumer will be a CLI and later a full GUI. This
//! stub keeps the leaf-specific interpretation point in place without forcing a
//! rendering-library decision yet.
use std::string::String;
use std::vec::Vec;
use unshell::protocol::DataMessage;
use unshell_macros::Procedure;
use crate::{LeafTui, TuiError};
/// Stub TUI surface for the remote shell leaf.
#[derive(Default)]
pub struct RemoteShell {
transcript: Vec<u8>,
}
impl RemoteShell {
/// Returns a short explanation of the current stub status.
pub fn status_line(&self) -> &'static str {
"remote shell TUI stub: rendering is placeholder-only for now"
}
}
impl LeafTui for RemoteShell {
fn leaf_name(&self) -> String {
Self::protocol_leaf_name()
}
fn handle_data(&mut self, message: &DataMessage) -> Result<(), TuiError> {
self.transcript.extend_from_slice(&message.data);
Ok(())
}
fn render(&self) -> String {
let body = String::from_utf8_lossy(&self.transcript);
format!("{}\n\n{}", self.status_line(), body)
}
}
/// TUI-side placeholder procedure symbol for the shared `remote_shell` leaf
/// declaration.
#[derive(Procedure)]
#[procedure(leaf = RemoteShell, name = "open")]
pub struct Open {}