Reorganize protocol examples

This commit is contained in:
Michael Mikovsky
2026-04-25 14:46:59 -06:00
parent b1ebe34ec1
commit 396c707662
12 changed files with 95 additions and 13 deletions
+408
View File
@@ -0,0 +1,408 @@
use std::hint::black_box;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::Instant;
use unshell::protocol::tree::{
ChildRoute, Endpoint, Ingress, LeafSpec, LocalEvent, ProtocolEndpoint,
};
use unshell::protocol::{CallMessage, PacketHeader, PacketType, decode_frame, encode_packet};
const SAMPLES: usize = 500;
const ITERS: usize = 1_000;
const TOOL_ITERS: usize = 10_000;
fn main() {
if std::env::args().nth(1).as_deref() == Some("tools") {
run_external_tools();
return;
}
println!("protocol benchmark");
println!("samples: {SAMPLES}");
println!("iterations/sample: {ITERS}");
println!();
let benches = [
bench_encode_call(),
bench_decode_call(),
bench_forward_call_receive(),
bench_local_call_receive(),
bench_hook_data_receive(),
];
println!(
"{:32} {:>14} {:>14} {:>14}",
"benchmark", "mean ns/op", "stddev", "samples"
);
for bench in benches {
println!(
"{:32} {:>14.2} {:>14.2} {:>14}",
bench.name, bench.mean_ns, bench.stddev_ns, bench.samples
);
}
println!();
println!("Run `cargo run --example bench -- tools` to build and execute");
println!("the standalone operation binaries under strace, perf, and heaptrack.");
}
struct BenchResult {
name: &'static str,
mean_ns: f64,
stddev_ns: f64,
samples: usize,
}
fn bench_encode_call() -> BenchResult {
let header = PacketHeader {
packet_type: PacketType::Call,
src_path: path(&["root"]),
dst_path: path(&["root", "worker"]),
dst_leaf: Some(String::from("service")),
hook_id: None,
};
let message = CallMessage {
procedure_id: String::from("example.service.v1.invoke"),
data: vec![7; 64],
response_hook: None,
};
run_bench("encode_call", || {
let frame =
encode_packet(black_box(&header), black_box(&message)).expect("encode should work");
black_box(frame.len());
})
}
fn bench_decode_call() -> BenchResult {
let header = PacketHeader {
packet_type: PacketType::Call,
src_path: path(&["root"]),
dst_path: path(&["root", "worker"]),
dst_leaf: Some(String::from("service")),
hook_id: None,
};
let message = CallMessage {
procedure_id: String::from("example.service.v1.invoke"),
data: vec![9; 64],
response_hook: None,
};
let frame = encode_packet(&header, &message).expect("seed frame should encode");
run_bench("decode_call", || {
let parsed = decode_frame(black_box(frame.as_slice())).expect("decode should work");
let call = parsed.deserialize_call().expect("call should deserialize");
black_box(call.data.len());
})
}
fn bench_forward_call_receive() -> BenchResult {
run_prebuilt_bench(
"forward_call_receive",
build_forward_call_cases,
|(mut root, frame)| {
let outcome = root
.receive(&Ingress::Local, frame)
.expect("forward receive should work");
black_box(outcome.forward.is_some());
},
)
}
fn bench_local_call_receive() -> BenchResult {
run_prebuilt_bench(
"local_call_receive",
build_local_call_cases,
|(mut endpoint, frame)| {
let outcome = endpoint
.receive(&Ingress::Parent, frame)
.expect("local call should work");
match black_box(outcome.event) {
Some(LocalEvent::Call { .. }) => {}
other => panic!("expected local call event, got {other:?}"),
}
},
)
}
fn bench_hook_data_receive() -> BenchResult {
run_prebuilt_bench(
"hook_data_receive",
build_hook_data_cases,
|(mut host, frame)| {
let outcome = host
.receive(&Ingress::Child(path(&["worker"])), frame)
.expect("hook data should work");
match black_box(outcome.event) {
Some(LocalEvent::Data { .. }) => {}
other => panic!("expected local data event, got {other:?}"),
}
},
)
}
fn run_bench(name: &'static str, mut op: impl FnMut()) -> BenchResult {
let mut samples = Vec::with_capacity(SAMPLES);
for _ in 0..SAMPLES {
let start = Instant::now();
for _ in 0..ITERS {
op();
}
let elapsed = start.elapsed().as_nanos() as f64 / ITERS as f64;
samples.push(elapsed);
}
summarize(name, &samples)
}
fn run_prebuilt_bench<T, F>(
name: &'static str,
mut build_cases: F,
mut op: impl FnMut(T),
) -> BenchResult
where
F: FnMut() -> Vec<T>,
{
let mut repeated = Vec::with_capacity(SAMPLES);
for _ in 0..SAMPLES {
let mut cases = build_cases();
assert_eq!(cases.len(), ITERS);
let start = Instant::now();
for case in cases.drain(..) {
op(case);
}
let elapsed = start.elapsed().as_nanos() as f64 / ITERS as f64;
repeated.push(elapsed);
}
summarize(name, &repeated)
}
fn build_forward_call_cases() -> Vec<(ProtocolEndpoint, unshell::protocol::FrameBytes)> {
(0..ITERS)
.map(|_| {
let mut root = ProtocolEndpoint::new(
Vec::new(),
None,
vec![ChildRoute::registered(path(&["edge"]))],
Vec::new(),
);
let hook_id = root.allocate_hook_id();
let frame = root
.make_call(
path(&["edge", "worker"]),
Some(String::from("service")),
String::from("example.service.v1.invoke"),
Some(hook_id),
vec![1; 32],
)
.expect("seed call should encode");
(root, frame)
})
.collect()
}
fn build_local_call_cases() -> Vec<(ProtocolEndpoint, unshell::protocol::FrameBytes)> {
(0..ITERS)
.map(|_| {
let endpoint = ProtocolEndpoint::new(
path(&["worker"]),
Some(Vec::new()),
Vec::new(),
vec![LeafSpec {
name: String::from("service"),
procedures: vec![String::from("example.service.v1.invoke")],
}],
);
let frame = encode_packet(
&PacketHeader {
packet_type: PacketType::Call,
src_path: Vec::new(),
dst_path: path(&["worker"]),
dst_leaf: Some(String::from("service")),
hook_id: None,
},
&CallMessage {
procedure_id: String::from("example.service.v1.invoke"),
data: vec![2; 32],
response_hook: Some(unshell::protocol::HookTarget {
hook_id: 42,
return_path: Vec::new(),
}),
},
)
.expect("seed local call should encode");
(endpoint, frame)
})
.collect()
}
fn build_hook_data_cases() -> Vec<(ProtocolEndpoint, unshell::protocol::FrameBytes)> {
(0..ITERS)
.map(|_| {
let mut host = ProtocolEndpoint::new(
Vec::new(),
None,
vec![ChildRoute::registered(path(&["worker"]))],
Vec::new(),
);
let hook_id = host.allocate_hook_id();
host.make_call(
path(&["worker"]),
None,
String::from("example.service.v1.invoke"),
Some(hook_id),
vec![3; 8],
)
.expect("seed active hook should encode");
let frame = encode_packet(
&PacketHeader {
packet_type: PacketType::Data,
src_path: path(&["worker"]),
dst_path: Vec::new(),
dst_leaf: None,
hook_id: Some(hook_id),
},
&unshell::protocol::DataMessage {
procedure_id: String::from("example.service.v1.invoke"),
data: vec![4; 16],
end_hook: false,
},
)
.expect("seed data should encode");
(host, frame)
})
.collect()
}
fn summarize(name: &'static str, samples: &[f64]) -> BenchResult {
let mean = samples.iter().sum::<f64>() / samples.len() as f64;
let variance = samples
.iter()
.map(|sample| {
let delta = sample - mean;
delta * delta
})
.sum::<f64>()
/ samples.len() as f64;
BenchResult {
name,
mean_ns: mean,
stddev_ns: variance.sqrt(),
samples: samples.len(),
}
}
fn path(parts: &[&str]) -> Vec<String> {
parts.iter().map(|part| String::from(*part)).collect()
}
fn run_external_tools() {
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
build_examples(root);
let ops = [
("encode_call", "op_encode_call"),
("decode_call", "op_decode_call"),
("forward_call_receive", "op_forward_call_receive"),
("local_call_receive", "op_local_call_receive"),
("hook_data_receive", "op_hook_data_receive"),
];
let heap_dir = root.join("heaptrack-cli");
std::fs::create_dir_all(&heap_dir).expect("heaptrack-cli directory should be creatable");
for (name, binary) in ops {
let binary_path = root.join("target/debug/examples").join(binary);
println!();
println!("=== {name} ===");
run_binary(&binary_path, TOOL_ITERS, "direct run");
run_strace(&binary_path, TOOL_ITERS);
run_perf(&binary_path, TOOL_ITERS);
run_heaptrack(root, &heap_dir, name, &binary_path, TOOL_ITERS);
}
}
fn build_examples(root: &Path) {
run_command(
"cargo build --examples",
Command::new("cargo")
.arg("build")
.arg("--examples")
.current_dir(root),
);
}
fn run_binary(binary: &Path, iterations: usize, label: &str) {
run_command(label, Command::new(binary).arg(iterations.to_string()));
}
fn run_strace(binary: &Path, iterations: usize) {
run_command(
"strace -c memory syscalls",
Command::new("strace")
.arg("-qq")
.arg("-c")
.arg("-e")
.arg("trace=brk,mmap,mremap,munmap,mprotect,madvise")
.arg(binary)
.arg(iterations.to_string()),
);
}
fn run_perf(binary: &Path, iterations: usize) {
run_command(
"perf stat",
Command::new("perf")
.arg("stat")
.arg("-e")
.arg("task-clock,cycles,instructions,branches,branch-misses,cache-references,cache-misses")
.arg(binary)
.arg(iterations.to_string()),
);
}
fn run_heaptrack(root: &Path, heap_dir: &Path, name: &str, binary: &Path, iterations: usize) {
let prefix = heap_dir.join(format!("{name}.zst"));
run_command(
"heaptrack --record-only",
Command::new("heaptrack")
.arg("--record-only")
.arg("-o")
.arg(&prefix)
.arg(binary)
.arg(iterations.to_string())
.current_dir(root),
);
let recorded = PathBuf::from(format!("{}.zst", prefix.display()));
run_command(
"heaptrack_print summary",
Command::new("heaptrack_print")
.arg("-f")
.arg(recorded)
.arg("-n")
.arg("4")
.arg("-s")
.arg("2")
.current_dir(root),
);
}
fn run_command(label: &str, command: &mut Command) {
println!("--- {label} ---");
let output = command
.output()
.unwrap_or_else(|error| panic!("{label} failed to launch: {error}"));
if !output.stdout.is_empty() {
print!("{}", String::from_utf8_lossy(&output.stdout));
}
if !output.stderr.is_empty() {
print!("{}", String::from_utf8_lossy(&output.stderr));
}
assert!(
output.status.success(),
"{label} failed with status {}",
output.status
);
}
@@ -0,0 +1,8 @@
#[path = "support/bench_common.rs"]
mod common;
fn main() {
let iterations = common::iterations_from_args(1_000);
let checksum = common::run_decode_call(iterations);
println!("decode_call iterations={iterations} checksum={checksum}");
}
@@ -0,0 +1,8 @@
#[path = "support/bench_common.rs"]
mod common;
fn main() {
let iterations = common::iterations_from_args(1_000);
let checksum = common::run_encode_call(iterations);
println!("encode_call iterations={iterations} checksum={checksum}");
}
@@ -0,0 +1,8 @@
#[path = "support/bench_common.rs"]
mod common;
fn main() {
let iterations = common::iterations_from_args(1_000);
let checksum = common::run_forward_call_receive(iterations);
println!("forward_call_receive iterations={iterations} checksum={checksum}");
}
@@ -0,0 +1,8 @@
#[path = "support/bench_common.rs"]
mod common;
fn main() {
let iterations = common::iterations_from_args(1_000);
let checksum = common::run_hook_data_receive(iterations);
println!("hook_data_receive iterations={iterations} checksum={checksum}");
}
@@ -0,0 +1,8 @@
#[path = "support/bench_common.rs"]
mod common;
fn main() {
let iterations = common::iterations_from_args(1_000);
let checksum = common::run_local_call_receive(iterations);
println!("local_call_receive iterations={iterations} checksum={checksum}");
}
@@ -0,0 +1,222 @@
#![allow(dead_code)]
use std::hint::black_box;
use unshell::protocol::tree::{
ChildRoute, Endpoint, Ingress, LeafSpec, LocalEvent, ProtocolEndpoint,
};
use unshell::protocol::{CallMessage, PacketHeader, PacketType, decode_frame, encode_packet};
pub fn iterations_from_args(default: usize) -> usize {
std::env::args()
.nth(1)
.map(|value| {
value
.parse::<usize>()
.expect("iterations must be a positive integer")
})
.unwrap_or(default)
}
#[inline(never)]
pub fn run_encode_call(iterations: usize) -> usize {
let header = PacketHeader {
packet_type: PacketType::Call,
src_path: path(&["root"]),
dst_path: path(&["root", "worker"]),
dst_leaf: Some(String::from("service")),
hook_id: None,
};
let message = CallMessage {
procedure_id: String::from("example.service.v1.invoke"),
data: vec![7; 64],
response_hook: None,
};
let mut checksum = 0usize;
for _ in 0..iterations {
let frame =
encode_packet(black_box(&header), black_box(&message)).expect("encode should work");
checksum = checksum.wrapping_add(frame.len());
}
black_box(checksum)
}
#[inline(never)]
pub fn run_decode_call(iterations: usize) -> usize {
let header = PacketHeader {
packet_type: PacketType::Call,
src_path: path(&["root"]),
dst_path: path(&["root", "worker"]),
dst_leaf: Some(String::from("service")),
hook_id: None,
};
let message = CallMessage {
procedure_id: String::from("example.service.v1.invoke"),
data: vec![9; 64],
response_hook: None,
};
let frame = encode_packet(&header, &message).expect("seed frame should encode");
let mut checksum = 0usize;
for _ in 0..iterations {
let parsed = decode_frame(black_box(frame.as_slice())).expect("decode should work");
let call = parsed.deserialize_call().expect("call should deserialize");
checksum = checksum
.wrapping_add(call.data.len())
.wrapping_add(call.procedure_id.len())
.wrapping_add(call.response_hook.is_some() as usize);
}
black_box(checksum)
}
#[inline(never)]
pub fn run_forward_call_receive(iterations: usize) -> usize {
let mut checksum = 0usize;
for _ in 0..iterations {
let mut root = ProtocolEndpoint::new(
Vec::new(),
None,
vec![ChildRoute::registered(path(&["edge"]))],
Vec::new(),
);
let hook_id = root.allocate_hook_id();
let frame = root
.make_call(
path(&["edge", "worker"]),
Some(String::from("service")),
String::from("example.service.v1.invoke"),
Some(hook_id),
vec![1; 32],
)
.expect("seed call should encode");
let outcome = root
.receive(&Ingress::Local, frame)
.expect("forward receive should work");
let forwarded = outcome
.forward
.as_ref()
.map(|(route, frame)| route_value(*route).wrapping_add(frame.len()))
.unwrap_or_default();
checksum = checksum
.wrapping_add(forwarded)
.wrapping_add(outcome.dropped as usize);
}
black_box(checksum)
}
#[inline(never)]
pub fn run_local_call_receive(iterations: usize) -> usize {
let mut checksum = 0usize;
for _ in 0..iterations {
let mut endpoint = ProtocolEndpoint::new(
path(&["worker"]),
Some(Vec::new()),
Vec::new(),
vec![LeafSpec {
name: String::from("service"),
procedures: vec![String::from("example.service.v1.invoke")],
}],
);
let frame = encode_packet(
&PacketHeader {
packet_type: PacketType::Call,
src_path: Vec::new(),
dst_path: path(&["worker"]),
dst_leaf: Some(String::from("service")),
hook_id: None,
},
&CallMessage {
procedure_id: String::from("example.service.v1.invoke"),
data: vec![2; 32],
response_hook: Some(unshell::protocol::HookTarget {
hook_id: 42,
return_path: Vec::new(),
}),
},
)
.expect("seed local call should encode");
let outcome = endpoint
.receive(&Ingress::Parent, frame)
.expect("local call should work");
match outcome.event {
Some(LocalEvent::Call { header, message }) => {
checksum = checksum
.wrapping_add(header.dst_path.len())
.wrapping_add(header.src_path.len())
.wrapping_add(header.dst_leaf.as_ref().map_or(0, String::len))
.wrapping_add(message.data.len())
.wrapping_add(message.procedure_id.len());
}
other => panic!("expected local call event, got {other:?}"),
}
}
black_box(checksum)
}
#[inline(never)]
pub fn run_hook_data_receive(iterations: usize) -> usize {
let mut checksum = 0usize;
for _ in 0..iterations {
let mut host = ProtocolEndpoint::new(
Vec::new(),
None,
vec![ChildRoute::registered(path(&["worker"]))],
Vec::new(),
);
let hook_id = host.allocate_hook_id();
host.make_call(
path(&["worker"]),
None,
String::from("example.service.v1.invoke"),
Some(hook_id),
vec![3; 8],
)
.expect("seed active hook should encode");
let frame = encode_packet(
&PacketHeader {
packet_type: PacketType::Data,
src_path: path(&["worker"]),
dst_path: Vec::new(),
dst_leaf: None,
hook_id: Some(hook_id),
},
&unshell::protocol::DataMessage {
procedure_id: String::from("example.service.v1.invoke"),
data: vec![4; 16],
end_hook: false,
},
)
.expect("seed data should encode");
let outcome = host
.receive(&Ingress::Child(path(&["worker"])), frame)
.expect("hook data should work");
match outcome.event {
Some(LocalEvent::Data { header, message }) => {
checksum = checksum
.wrapping_add(header.hook_id.unwrap_or_default() as usize)
.wrapping_add(message.data.len())
.wrapping_add(message.procedure_id.len())
.wrapping_add(message.end_hook as usize);
}
other => panic!("expected local data event, got {other:?}"),
}
}
black_box(checksum)
}
pub fn path(parts: &[&str]) -> Vec<String> {
parts.iter().map(|part| String::from(*part)).collect()
}
fn route_value(route: unshell::protocol::tree::RouteDecision) -> usize {
match route {
unshell::protocol::tree::RouteDecision::Child(index) => index,
unshell::protocol::tree::RouteDecision::Local => usize::MAX - 2,
unshell::protocol::tree::RouteDecision::Parent => usize::MAX - 1,
unshell::protocol::tree::RouteDecision::Drop => usize::MAX,
}
}
+46
View File
@@ -0,0 +1,46 @@
use std::error::Error;
use unshell::Leaf;
use unshell::protocol::tree::{Endpoint, Ingress, LocalEvent, ProtocolEndpoint};
#[derive(Leaf)]
#[leaf(org = "org", product = "example", version = "v1", leaf_name = "echo")]
#[leaf(procedures(call, stream))]
struct EchoLeaf;
fn path(parts: &[&str]) -> Vec<String> {
parts.iter().map(|part| (*part).to_owned()).collect()
}
fn main() -> Result<(), Box<dyn Error>> {
let mut endpoint = ProtocolEndpoint::new(
path(&["agent"]),
Some(Vec::new()),
Vec::new(),
vec![EchoLeaf::protocol_leaf_spec()],
);
let hook_id = endpoint.allocate_hook_id();
let frame = endpoint.make_call(
path(&["agent"]),
Some(EchoLeaf::protocol_leaf_name()),
EchoLeaf::protocol_procedure_id("call").expect("known procedure suffix"),
Some(hook_id),
b"hello leaf".to_vec(),
)?;
let outcome = endpoint.receive(&Ingress::Parent, frame)?;
let Some(LocalEvent::Call { header, message }) = outcome.event else {
return Err("expected local leaf call".into());
};
assert_eq!(header.dst_leaf.as_deref(), Some("org.example.v1.echo"));
assert_eq!(message.procedure_id, "org.example.v1.echo.call");
println!(
"leaf={} procedure={}",
EchoLeaf::protocol_leaf_name(),
message.procedure_id
);
Ok(())
}
+279
View File
@@ -0,0 +1,279 @@
#[path = "support/remote_shell_common.rs"]
mod common;
use std::error::Error;
use std::io::{self, Read, Write};
use std::net::TcpStream;
use std::process::{Child, ChildStdin, Command, ExitStatus, Stdio};
use std::sync::mpsc::{self, Receiver, RecvTimeoutError, Sender};
use std::thread;
use std::time::Duration;
use unshell::protocol::tree::{Endpoint, Ingress, LocalEvent};
struct ShellSession {
child: Child,
stdin: Option<ChildStdin>,
return_path: Vec<String>,
hook_id: u64,
procedure_id: String,
readers_closed: usize,
exit_status: Option<ExitStatus>,
}
enum OutputEvent {
Chunk(Vec<u8>),
ReaderClosed,
}
fn main() -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect(common::LISTEN_ADDR)?;
let frame_rx = common::spawn_frame_reader(stream.try_clone()?);
let mut endpoint = common::build_agent_endpoint();
let mut session: Option<ShellSession> = None;
let mut output_rx: Option<Receiver<OutputEvent>> = None;
println!("connected to controller at {}", common::LISTEN_ADDR);
loop {
match frame_rx.recv_timeout(Duration::from_millis(25)) {
Ok(result) => {
let frame = result?;
let outcome = endpoint.receive(&Ingress::Parent, frame)?;
if let Some(event) = common::pump_outcome(&mut stream, outcome)? {
handle_local_event(
&mut endpoint,
&mut stream,
&mut session,
&mut output_rx,
event,
)?;
}
}
Err(RecvTimeoutError::Timeout) => {}
Err(RecvTimeoutError::Disconnected) => break,
}
if let Some(rx) = output_rx.as_ref() {
while let Ok(event) = rx.try_recv() {
handle_shell_output(&mut endpoint, &mut stream, &mut session, event)?;
}
}
if finalize_exited_shell(&mut endpoint, &mut stream, &mut session)? {
output_rx = None;
}
}
Ok(())
}
fn handle_local_event(
endpoint: &mut unshell::protocol::tree::ProtocolEndpoint,
stream: &mut TcpStream,
session: &mut Option<ShellSession>,
output_rx: &mut Option<Receiver<OutputEvent>>,
event: LocalEvent,
) -> Result<(), Box<dyn Error>> {
match event {
LocalEvent::Call { header, message } => {
let shell_leaf_name = common::shell_leaf_name();
let start_procedure = common::shell_start_procedure();
if header.dst_leaf.as_deref() != Some(shell_leaf_name.as_str())
|| message.procedure_id != start_procedure
{
return Ok(());
}
let Some(hook) = message.response_hook else {
return Ok(());
};
let (new_session, rx) =
start_shell(&hook.return_path, hook.hook_id, &message.procedure_id)?;
*session = Some(new_session);
*output_rx = Some(rx);
let outcome = endpoint.send_data(
hook.return_path,
hook.hook_id,
message.procedure_id,
b"shell ready\n".to_vec(),
false,
)?;
let _ = common::pump_outcome(stream, outcome)?;
}
LocalEvent::Data { message, .. } => {
let Some(active_session) = session.as_mut() else {
return Ok(());
};
if !message.data.is_empty() {
let Some(stdin) = active_session.stdin.as_mut() else {
return Ok(());
};
stdin.write_all(&message.data)?;
stdin.flush()?;
}
if message.end_hook {
active_session.stdin.take();
}
}
LocalEvent::Fault { message, .. } => {
eprintln!(
"controller reported protocol fault: 0x{:02X}",
message.fault.0
);
}
}
Ok(())
}
fn handle_shell_output(
endpoint: &mut unshell::protocol::tree::ProtocolEndpoint,
stream: &mut TcpStream,
session: &mut Option<ShellSession>,
event: OutputEvent,
) -> Result<(), Box<dyn Error>> {
let Some(active_session) = session.as_mut() else {
return Ok(());
};
match event {
OutputEvent::Chunk(bytes) => {
let outcome = endpoint.send_data(
active_session.return_path.clone(),
active_session.hook_id,
active_session.procedure_id.clone(),
bytes,
false,
)?;
let _ = common::pump_outcome(stream, outcome)?;
}
OutputEvent::ReaderClosed => {
active_session.readers_closed += 1;
}
}
Ok(())
}
fn finalize_exited_shell(
endpoint: &mut unshell::protocol::tree::ProtocolEndpoint,
stream: &mut TcpStream,
session: &mut Option<ShellSession>,
) -> Result<bool, Box<dyn Error>> {
let Some(active_session) = session.as_mut() else {
return Ok(false);
};
if active_session.exit_status.is_none() {
active_session.exit_status = active_session.child.try_wait()?;
}
let Some(exit_status) = active_session.exit_status else {
return Ok(false);
};
if active_session.readers_closed < 2 {
return Ok(false);
}
let summary = format!("shell exited with {exit_status}\n");
let outcome = endpoint.send_data(
active_session.return_path.clone(),
active_session.hook_id,
active_session.procedure_id.clone(),
summary.into_bytes(),
true,
)?;
let _ = common::pump_outcome(stream, outcome)?;
*session = None;
Ok(true)
}
fn start_shell(
return_path: &[String],
hook_id: u64,
procedure_id: &str,
) -> io::Result<(ShellSession, Receiver<OutputEvent>)> {
let mut command = if cfg!(windows) {
let mut command = Command::new("cmd.exe");
command.arg("/Q");
command
} else {
let mut command = Command::new("/bin/sh");
command.arg("-i");
command
};
let mut child = command
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdin = child
.stdin
.take()
.ok_or_else(|| io::Error::other("failed to capture shell stdin"))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| io::Error::other("failed to capture shell stdout"))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| io::Error::other("failed to capture shell stderr"))?;
let (tx, rx) = mpsc::channel();
spawn_pipe_reader(stdout, tx.clone());
spawn_pipe_reader(stderr, tx);
Ok((
ShellSession {
child,
stdin: Some(stdin),
return_path: return_path.to_vec(),
hook_id,
procedure_id: procedure_id.to_owned(),
readers_closed: 0,
exit_status: None,
},
rx,
))
}
fn spawn_pipe_reader<R>(mut reader: R, tx: Sender<OutputEvent>)
where
R: Read + Send + 'static,
{
thread::spawn(move || {
let mut buffer = [0u8; 1024];
loop {
match reader.read(&mut buffer) {
Ok(0) => {
let _ = tx.send(OutputEvent::ReaderClosed);
break;
}
Ok(read_len) => {
if tx
.send(OutputEvent::Chunk(buffer[..read_len].to_vec()))
.is_err()
{
break;
}
}
Err(error) if error.kind() == io::ErrorKind::Interrupted => {}
Err(error) => {
let _ = tx.send(OutputEvent::Chunk(
format!("shell pipe read error: {error}\n").into_bytes(),
));
let _ = tx.send(OutputEvent::ReaderClosed);
break;
}
}
}
});
}
+73
View File
@@ -0,0 +1,73 @@
#[path = "support/remote_shell_common.rs"]
mod common;
use std::error::Error;
use std::net::TcpListener;
use unshell::protocol::tree::{Endpoint, Ingress, LocalEvent};
fn main() -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind(common::LISTEN_ADDR)?;
println!("listening on {}", common::LISTEN_ADDR);
let (mut stream, peer_addr) = listener.accept()?;
println!("accepted endpoint connection from {peer_addr}");
let frame_rx = common::spawn_frame_reader(stream.try_clone()?);
let mut endpoint = common::build_controller_endpoint();
let hook_id = endpoint.allocate_hook_id();
let shell_leaf_name = common::shell_leaf_name();
let start_procedure = common::shell_start_procedure();
let outcome = endpoint.send_call(
common::agent_path(),
Some(shell_leaf_name),
start_procedure.clone(),
Some(hook_id),
Vec::new(),
)?;
let _ = common::pump_outcome(&mut stream, outcome)?;
let mut commands_sent = false;
for result in frame_rx {
let frame = result?;
let outcome = endpoint.receive(&Ingress::Child(common::agent_path()), frame)?;
let event = common::pump_outcome(&mut stream, outcome)?;
let Some(event) = event else {
continue;
};
match event {
LocalEvent::Data { message, .. } => {
print!("{}", String::from_utf8_lossy(&message.data));
if !commands_sent {
commands_sent = true;
for (index, command) in ["pwd\n", "whoami\n", "exit\n"].iter().enumerate() {
let outcome = endpoint.send_data(
common::agent_path(),
hook_id,
start_procedure.clone(),
command.as_bytes().to_vec(),
index == 2,
)?;
let _ = common::pump_outcome(&mut stream, outcome)?;
}
}
if message.end_hook {
break;
}
}
LocalEvent::Fault { message, .. } => {
eprintln!("received protocol fault: 0x{:02X}", message.fault.0);
break;
}
LocalEvent::Call { .. } => {}
}
}
Ok(())
}
@@ -0,0 +1,118 @@
use std::io::{self, ErrorKind, Read, Write};
use std::net::TcpStream;
use std::sync::mpsc::{self, Receiver};
use std::thread;
use unshell::Leaf;
use unshell::protocol::FrameBytes;
use unshell::protocol::tree::{ChildRoute, EndpointOutcome, LocalEvent, ProtocolEndpoint};
pub const LISTEN_ADDR: &str = "127.0.0.1:4444";
#[derive(Leaf)]
#[leaf(org = "org", product = "example", version = "v1", leaf_name = "shell")]
#[leaf(procedures(start))]
pub struct RemoteShellLeaf;
pub fn agent_path() -> Vec<String> {
path(&["agent"])
}
pub fn path(parts: &[&str]) -> Vec<String> {
parts.iter().map(|part| (*part).to_owned()).collect()
}
#[allow(dead_code)]
pub fn build_controller_endpoint() -> ProtocolEndpoint {
ProtocolEndpoint::new(
Vec::new(),
None,
vec![ChildRoute::registered(agent_path())],
Vec::new(),
)
}
#[allow(dead_code)]
pub fn build_agent_endpoint() -> ProtocolEndpoint {
ProtocolEndpoint::new(
agent_path(),
Some(Vec::new()),
Vec::new(),
vec![RemoteShellLeaf::protocol_leaf_spec()],
)
}
pub fn shell_leaf_name() -> String {
RemoteShellLeaf::protocol_leaf_name()
}
pub fn shell_start_procedure() -> String {
RemoteShellLeaf::protocol_procedure_id("start")
.expect("remote shell leaf declares a start procedure")
}
pub fn write_frame(stream: &mut TcpStream, frame: &[u8]) -> io::Result<()> {
let frame_len = u32::try_from(frame.len())
.map_err(|_| io::Error::new(ErrorKind::InvalidData, "frame exceeds u32 transport size"))?;
stream.write_all(&frame_len.to_be_bytes())?;
stream.write_all(frame)?;
stream.flush()?;
Ok(())
}
pub fn pump_outcome(
stream: &mut TcpStream,
outcome: EndpointOutcome,
) -> io::Result<Option<LocalEvent>> {
if let Some((_route, frame)) = outcome.forward {
// These examples model one direct parent-child link over one TCP stream, so
// any forwarded protocol frame is emitted on the same socket.
write_frame(stream, &frame)?;
}
Ok(outcome.event)
}
pub fn spawn_frame_reader(mut stream: TcpStream) -> Receiver<io::Result<FrameBytes>> {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
loop {
match read_frame(&mut stream) {
Ok(Some(frame)) => {
if tx.send(Ok(frame)).is_err() {
break;
}
}
Ok(None) => break,
Err(error) => {
let _ = tx.send(Err(error));
break;
}
}
}
});
rx
}
fn read_frame(stream: &mut TcpStream) -> io::Result<Option<FrameBytes>> {
let mut len_bytes = [0u8; 4];
match stream.read_exact(&mut len_bytes) {
Ok(()) => {}
Err(error) if error.kind() == ErrorKind::UnexpectedEof => return Ok(None),
Err(error) => return Err(error),
}
let frame_len = u32::from_be_bytes(len_bytes) as usize;
let mut bytes = vec![0u8; frame_len];
match stream.read_exact(&mut bytes) {
Ok(()) => {}
Err(error) if error.kind() == ErrorKind::UnexpectedEof => return Ok(None),
Err(error) => return Err(error),
}
let mut frame = FrameBytes::with_capacity(bytes.len());
frame.extend_from_slice(&bytes);
Ok(Some(frame))
}