Add derive-based protocol leaf declarations

This commit is contained in:
Michael Mikovsky
2026-04-25 14:41:00 -06:00
parent 3e764610eb
commit b1ebe34ec1
11 changed files with 1056 additions and 1 deletions
+46
View File
@@ -0,0 +1,46 @@
use std::error::Error;
use unshell::Leaf;
use unshell::protocol::tree::{Endpoint, Ingress, LocalEvent, ProtocolEndpoint};
#[derive(Leaf)]
#[leaf(org = "org", product = "example", version = "v1", leaf_name = "echo")]
#[leaf(procedures(call, stream))]
struct EchoLeaf;
fn path(parts: &[&str]) -> Vec<String> {
parts.iter().map(|part| (*part).to_owned()).collect()
}
fn main() -> Result<(), Box<dyn Error>> {
let mut endpoint = ProtocolEndpoint::new(
path(&["agent"]),
Some(Vec::new()),
Vec::new(),
vec![EchoLeaf::protocol_leaf_spec()],
);
let hook_id = endpoint.allocate_hook_id();
let frame = endpoint.make_call(
path(&["agent"]),
Some(EchoLeaf::protocol_leaf_name()),
EchoLeaf::protocol_procedure_id("call").expect("known procedure suffix"),
Some(hook_id),
b"hello leaf".to_vec(),
)?;
let outcome = endpoint.receive(&Ingress::Parent, frame)?;
let Some(LocalEvent::Call { header, message }) = outcome.event else {
return Err("expected local leaf call".into());
};
assert_eq!(header.dst_leaf.as_deref(), Some("org.example.v1.echo"));
assert_eq!(message.procedure_id, "org.example.v1.echo.call");
println!(
"leaf={} procedure={}",
EchoLeaf::protocol_leaf_name(),
message.procedure_id
);
Ok(())
}
+279
View File
@@ -0,0 +1,279 @@
#[path = "support/protocol_remote_shell_common.rs"]
mod common;
use std::error::Error;
use std::io::{self, Read, Write};
use std::net::TcpStream;
use std::process::{Child, ChildStdin, Command, ExitStatus, Stdio};
use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender};
use std::thread;
use std::time::Duration;
use unshell::protocol::tree::{Endpoint, Ingress, LocalEvent};
struct ShellSession {
child: Child,
stdin: Option<ChildStdin>,
return_path: Vec<String>,
hook_id: u64,
procedure_id: String,
readers_closed: usize,
exit_status: Option<ExitStatus>,
}
enum OutputEvent {
Chunk(Vec<u8>),
ReaderClosed,
}
fn main() -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect(common::LISTEN_ADDR)?;
let frame_rx = common::spawn_frame_reader(stream.try_clone()?);
let mut endpoint = common::build_agent_endpoint();
let mut session: Option<ShellSession> = None;
let mut output_rx: Option<Receiver<OutputEvent>> = None;
println!("connected to controller at {}", common::LISTEN_ADDR);
loop {
match frame_rx.recv_timeout(Duration::from_millis(25)) {
Ok(result) => {
let frame = result?;
let outcome = endpoint.receive(&Ingress::Parent, frame)?;
if let Some(event) = common::pump_outcome(&mut stream, outcome)? {
handle_local_event(
&mut endpoint,
&mut stream,
&mut session,
&mut output_rx,
event,
)?;
}
}
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => break,
}
if let Some(rx) = output_rx.as_ref() {
while let Ok(event) = rx.try_recv() {
handle_shell_output(&mut endpoint, &mut stream, &mut session, event)?;
}
}
if finalize_exited_shell(&mut endpoint, &mut stream, &mut session)? {
output_rx = None;
}
}
Ok(())
}
fn handle_local_event(
endpoint: &mut unshell::protocol::tree::ProtocolEndpoint,
stream: &mut TcpStream,
session: &mut Option<ShellSession>,
output_rx: &mut Option<Receiver<OutputEvent>>,
event: LocalEvent,
) -> Result<(), Box<dyn Error>> {
match event {
LocalEvent::Call { header, message } => {
let shell_leaf_name = common::shell_leaf_name();
let start_procedure = common::shell_start_procedure();
if header.dst_leaf.as_deref() != Some(shell_leaf_name.as_str())
|| message.procedure_id != start_procedure
{
return Ok(());
}
let Some(hook) = message.response_hook else {
return Ok(());
};
let (new_session, rx) =
start_shell(&hook.return_path, hook.hook_id, &message.procedure_id)?;
*session = Some(new_session);
*output_rx = Some(rx);
let outcome = endpoint.send_data(
hook.return_path,
hook.hook_id,
message.procedure_id,
b"shell ready\n".to_vec(),
false,
)?;
let _ = common::pump_outcome(stream, outcome)?;
}
LocalEvent::Data { message, .. } => {
let Some(active_session) = session.as_mut() else {
return Ok(());
};
if !message.data.is_empty() {
let Some(stdin) = active_session.stdin.as_mut() else {
return Ok(());
};
stdin.write_all(&message.data)?;
stdin.flush()?;
}
if message.end_hook {
active_session.stdin.take();
}
}
LocalEvent::Fault { message, .. } => {
eprintln!(
"controller reported protocol fault: 0x{:02X}",
message.fault.0
);
}
}
Ok(())
}
fn handle_shell_output(
endpoint: &mut unshell::protocol::tree::ProtocolEndpoint,
stream: &mut TcpStream,
session: &mut Option<ShellSession>,
event: OutputEvent,
) -> Result<(), Box<dyn Error>> {
let Some(active_session) = session.as_mut() else {
return Ok(());
};
match event {
OutputEvent::Chunk(bytes) => {
let outcome = endpoint.send_data(
active_session.return_path.clone(),
active_session.hook_id,
active_session.procedure_id.clone(),
bytes,
false,
)?;
let _ = common::pump_outcome(stream, outcome)?;
}
OutputEvent::ReaderClosed => {
active_session.readers_closed += 1;
}
}
Ok(())
}
fn finalize_exited_shell(
endpoint: &mut unshell::protocol::tree::ProtocolEndpoint,
stream: &mut TcpStream,
session: &mut Option<ShellSession>,
) -> Result<bool, Box<dyn Error>> {
let Some(active_session) = session.as_mut() else {
return Ok(false);
};
if active_session.exit_status.is_none() {
active_session.exit_status = active_session.child.try_wait()?;
}
let Some(exit_status) = active_session.exit_status else {
return Ok(false);
};
if active_session.readers_closed < 2 {
return Ok(false);
}
let summary = format!("shell exited with {exit_status}\n");
let outcome = endpoint.send_data(
active_session.return_path.clone(),
active_session.hook_id,
active_session.procedure_id.clone(),
summary.into_bytes(),
true,
)?;
let _ = common::pump_outcome(stream, outcome)?;
*session = None;
Ok(true)
}
fn start_shell(
return_path: &[String],
hook_id: u64,
procedure_id: &str,
) -> io::Result<(ShellSession, Receiver<OutputEvent>)> {
let mut command = if cfg!(windows) {
let mut command = Command::new("cmd.exe");
command.arg("/Q");
command
} else {
let mut command = Command::new("/bin/sh");
command.arg("-i");
command
};
let mut child = command
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdin = child
.stdin
.take()
.ok_or_else(|| io::Error::other("failed to capture shell stdin"))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| io::Error::other("failed to capture shell stdout"))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| io::Error::other("failed to capture shell stderr"))?;
let (tx, rx) = mpsc::channel();
spawn_pipe_reader(stdout, tx.clone());
spawn_pipe_reader(stderr, tx);
Ok((
ShellSession {
child,
stdin: Some(stdin),
return_path: return_path.to_vec(),
hook_id,
procedure_id: procedure_id.to_owned(),
readers_closed: 0,
exit_status: None,
},
rx,
))
}
fn spawn_pipe_reader<R>(mut reader: R, tx: Sender<OutputEvent>)
where
R: Read + Send + 'static,
{
thread::spawn(move || {
let mut buffer = [0u8; 1024];
loop {
match reader.read(&mut buffer) {
Ok(0) => {
let _ = tx.send(OutputEvent::ReaderClosed);
break;
}
Ok(read_len) => {
if tx
.send(OutputEvent::Chunk(buffer[..read_len].to_vec()))
.is_err()
{
break;
}
}
Err(error) if error.kind() == io::ErrorKind::Interrupted => {}
Err(error) => {
let _ = tx.send(OutputEvent::Chunk(
format!("shell pipe read error: {error}\n").into_bytes(),
));
let _ = tx.send(OutputEvent::ReaderClosed);
break;
}
}
}
});
}
+73
View File
@@ -0,0 +1,73 @@
#[path = "support/protocol_remote_shell_common.rs"]
mod common;
use std::error::Error;
use std::net::TcpListener;
use unshell::protocol::tree::{Endpoint, Ingress, LocalEvent};
fn main() -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind(common::LISTEN_ADDR)?;
println!("listening on {}", common::LISTEN_ADDR);
let (mut stream, peer_addr) = listener.accept()?;
println!("accepted endpoint connection from {peer_addr}");
let frame_rx = common::spawn_frame_reader(stream.try_clone()?);
let mut endpoint = common::build_controller_endpoint();
let hook_id = endpoint.allocate_hook_id();
let shell_leaf_name = common::shell_leaf_name();
let start_procedure = common::shell_start_procedure();
let outcome = endpoint.send_call(
common::agent_path(),
Some(shell_leaf_name),
start_procedure.clone(),
Some(hook_id),
Vec::new(),
)?;
let _ = common::pump_outcome(&mut stream, outcome)?;
let mut commands_sent = false;
for result in frame_rx {
let frame = result?;
let outcome = endpoint.receive(&Ingress::Child(common::agent_path()), frame)?;
let event = common::pump_outcome(&mut stream, outcome)?;
let Some(event) = event else {
continue;
};
match event {
LocalEvent::Data { message, .. } => {
print!("{}", String::from_utf8_lossy(&message.data));
if !commands_sent {
commands_sent = true;
for (index, command) in ["pwd\n", "whoami\n", "exit\n"].iter().enumerate() {
let outcome = endpoint.send_data(
common::agent_path(),
hook_id,
start_procedure.clone(),
command.as_bytes().to_vec(),
index == 2,
)?;
let _ = common::pump_outcome(&mut stream, outcome)?;
}
}
if message.end_hook {
break;
}
}
LocalEvent::Fault { message, .. } => {
eprintln!("received protocol fault: 0x{:02X}", message.fault.0);
break;
}
LocalEvent::Call { .. } => {}
}
}
Ok(())
}
@@ -0,0 +1,118 @@
use std::io::{self, ErrorKind, Read, Write};
use std::net::TcpStream;
use std::sync::mpsc::{self, Receiver};
use std::thread;
use unshell::Leaf;
use unshell::protocol::FrameBytes;
use unshell::protocol::tree::{ChildRoute, EndpointOutcome, LocalEvent, ProtocolEndpoint};
pub const LISTEN_ADDR: &str = "127.0.0.1:4444";
#[derive(Leaf)]
#[leaf(org = "org", product = "example", version = "v1", leaf_name = "shell")]
#[leaf(procedures(start))]
pub struct RemoteShellLeaf;
pub fn agent_path() -> Vec<String> {
path(&["agent"])
}
pub fn path(parts: &[&str]) -> Vec<String> {
parts.iter().map(|part| (*part).to_owned()).collect()
}
#[allow(dead_code)]
pub fn build_controller_endpoint() -> ProtocolEndpoint {
ProtocolEndpoint::new(
Vec::new(),
None,
vec![ChildRoute::registered(agent_path())],
Vec::new(),
)
}
#[allow(dead_code)]
pub fn build_agent_endpoint() -> ProtocolEndpoint {
ProtocolEndpoint::new(
agent_path(),
Some(Vec::new()),
Vec::new(),
vec![RemoteShellLeaf::protocol_leaf_spec()],
)
}
pub fn shell_leaf_name() -> String {
RemoteShellLeaf::protocol_leaf_name()
}
pub fn shell_start_procedure() -> String {
RemoteShellLeaf::protocol_procedure_id("start")
.expect("remote shell leaf declares a start procedure")
}
pub fn write_frame(stream: &mut TcpStream, frame: &[u8]) -> io::Result<()> {
let frame_len = u32::try_from(frame.len())
.map_err(|_| io::Error::new(ErrorKind::InvalidData, "frame exceeds u32 transport size"))?;
stream.write_all(&frame_len.to_be_bytes())?;
stream.write_all(frame)?;
stream.flush()?;
Ok(())
}
pub fn pump_outcome(
stream: &mut TcpStream,
outcome: EndpointOutcome,
) -> io::Result<Option<LocalEvent>> {
if let Some((_route, frame)) = outcome.forward {
// These examples model one direct parent-child link over one TCP stream, so
// any forwarded protocol frame is emitted on the same socket.
write_frame(stream, &frame)?;
}
Ok(outcome.event)
}
pub fn spawn_frame_reader(mut stream: TcpStream) -> Receiver<io::Result<FrameBytes>> {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
loop {
match read_frame(&mut stream) {
Ok(Some(frame)) => {
if tx.send(Ok(frame)).is_err() {
break;
}
}
Ok(None) => break,
Err(error) => {
let _ = tx.send(Err(error));
break;
}
}
}
});
rx
}
fn read_frame(stream: &mut TcpStream) -> io::Result<Option<FrameBytes>> {
let mut len_bytes = [0u8; 4];
match stream.read_exact(&mut len_bytes) {
Ok(()) => {}
Err(error) if error.kind() == ErrorKind::UnexpectedEof => return Ok(None),
Err(error) => return Err(error),
}
let frame_len = u32::from_be_bytes(len_bytes) as usize;
let mut bytes = vec![0u8; frame_len];
match stream.read_exact(&mut bytes) {
Ok(()) => {}
Err(error) if error.kind() == ErrorKind::UnexpectedEof => return Ok(None),
Err(error) => return Err(error),
}
let mut frame = FrameBytes::with_capacity(bytes.len());
frame.extend_from_slice(&bytes);
Ok(Some(frame))
}