Simplify endpoint outcome state handling

This commit is contained in:
Michael Mikovsky
2026-04-25 20:47:37 -06:00
parent f2c6a54060
commit 31a0bd39b0
19 changed files with 234 additions and 192 deletions
+6 -6
View File
@@ -4,7 +4,7 @@ use std::process::Command;
use std::time::Instant;
use unshell::protocol::tree::{
ChildRoute, Endpoint, Ingress, LeafSpec, LocalEvent, ProtocolEndpoint,
ChildRoute, Endpoint, EndpointOutcome, Ingress, LeafSpec, LocalEvent, ProtocolEndpoint,
};
use unshell::protocol::{CallMessage, PacketHeader, PacketType, decode_frame, encode_packet};
@@ -105,7 +105,7 @@ fn bench_forward_call_receive() -> BenchResult {
let outcome = root
.receive(&Ingress::Local, frame)
.expect("forward receive should work");
black_box(outcome.forward.is_some());
black_box(matches!(outcome, EndpointOutcome::Forward { .. }));
},
)
}
@@ -118,8 +118,8 @@ fn bench_local_call_receive() -> BenchResult {
let outcome = endpoint
.receive(&Ingress::Parent, frame)
.expect("local call should work");
match black_box(outcome.event) {
Some(LocalEvent::Call { .. }) => {}
match black_box(outcome) {
EndpointOutcome::Local(LocalEvent::Call { .. }) => {}
other => panic!("expected local call event, got {other:?}"),
}
},
@@ -134,8 +134,8 @@ fn bench_hook_data_receive() -> BenchResult {
let outcome = host
.receive(&Ingress::Child(path(&["worker"])), frame)
.expect("hook data should work");
match black_box(outcome.event) {
Some(LocalEvent::Data { .. }) => {}
match black_box(outcome) {
EndpointOutcome::Local(LocalEvent::Data { .. }) => {}
other => panic!("expected local data event, got {other:?}"),
}
},
+11 -13
View File
@@ -3,7 +3,7 @@
use std::hint::black_box;
use unshell::protocol::tree::{
ChildRoute, Endpoint, Ingress, LeafSpec, LocalEvent, ProtocolEndpoint,
ChildRoute, Endpoint, EndpointOutcome, Ingress, LeafSpec, LocalEvent, ProtocolEndpoint,
};
use unshell::protocol::{CallMessage, PacketHeader, PacketType, decode_frame, encode_packet};
@@ -94,14 +94,12 @@ pub fn run_forward_call_receive(iterations: usize) -> usize {
let outcome = root
.receive(&Ingress::Local, frame)
.expect("forward receive should work");
let forwarded = outcome
.forward
.as_ref()
.map(|(route, frame)| route_value(*route).wrapping_add(frame.len()))
.unwrap_or_default();
checksum = checksum
.wrapping_add(forwarded)
.wrapping_add(outcome.dropped as usize);
let forwarded = match outcome {
EndpointOutcome::Forward { route, frame } => route_value(route).wrapping_add(frame.len()),
EndpointOutcome::Local(_) => 0,
EndpointOutcome::Dropped => usize::from(true),
};
checksum = checksum.wrapping_add(forwarded);
}
black_box(checksum)
}
@@ -141,8 +139,8 @@ pub fn run_local_call_receive(iterations: usize) -> usize {
let outcome = endpoint
.receive(&Ingress::Parent, frame)
.expect("local call should work");
match outcome.event {
Some(LocalEvent::Call { header, message }) => {
match outcome {
EndpointOutcome::Local(LocalEvent::Call { header, message }) => {
checksum = checksum
.wrapping_add(header.dst_path.len())
.wrapping_add(header.src_path.len())
@@ -194,8 +192,8 @@ pub fn run_hook_data_receive(iterations: usize) -> usize {
let outcome = host
.receive(&Ingress::Child(path(&["worker"])), frame)
.expect("hook data should work");
match outcome.event {
Some(LocalEvent::Data {
match outcome {
EndpointOutcome::Local(LocalEvent::Data {
header, message, ..
}) => {
checksum = checksum
+5 -4
View File
@@ -2,8 +2,9 @@ use std::error::Error;
use std::{convert::Infallible, string::String};
use rkyv::{Archive, Deserialize, Serialize};
use unshell::protocol::tree::{Call, CallLeaf, Ingress, LeafRuntime, ProtocolEndpoint};
use unshell::protocol::tree::{ChildRoute, ConnectionState};
use unshell::protocol::tree::{
Call, CallLeaf, ChildRoute, EndpointOutcome, Ingress, LeafRuntime, ProtocolEndpoint,
};
use unshell::protocol::{PacketType, decode_frame};
use unshell::{Leaf, procedures};
@@ -60,7 +61,7 @@ fn main() -> Result<(), Box<dyn Error>> {
None,
vec![ChildRoute {
path: path(&["agent"]),
state: ConnectionState::Registered,
registered: true,
}],
Vec::new(),
);
@@ -74,7 +75,7 @@ fn main() -> Result<(), Box<dyn Error>> {
text: String::from("hello leaf"),
})?,
)?;
let Some((_, frame)) = controller_outcome.forward else {
let EndpointOutcome::Forward { frame, .. } = controller_outcome else {
return Err("expected controller to forward call".into());
};
+2 -2
View File
@@ -4,7 +4,7 @@ mod remote_shell;
use std::error::Error;
use std::net::TcpListener;
use unshell::protocol::tree::{Endpoint, Ingress, LocalEvent};
use unshell::protocol::tree::{Endpoint, EndpointOutcome, Ingress, LocalEvent};
fn main() -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind(remote_shell::LISTEN_ADDR)?;
@@ -46,7 +46,7 @@ fn main() -> Result<(), Box<dyn Error>> {
for result in frame_rx {
let frame = result?;
let outcome = endpoint.receive(&Ingress::Child(remote_shell::agent_path()), frame)?;
let Some(event) = outcome.event else {
let EndpointOutcome::Local(event) = outcome else {
continue;
};
+4 -8
View File
@@ -11,14 +11,10 @@ const MAX_FRAME_BYTES: usize = 1024 * 1024;
#[allow(dead_code)]
pub fn send_forward(stream: &mut TcpStream, outcome: EndpointOutcome) -> io::Result<()> {
write_frames(
stream,
&outcome
.forward
.into_iter()
.map(|(_, frame)| frame)
.collect::<Vec<_>>(),
)
match outcome {
EndpointOutcome::Forward { frame, .. } => write_frames(stream, &[frame]),
EndpointOutcome::Local(_) | EndpointOutcome::Dropped => write_frames(stream, &[]),
}
}
pub fn write_frames(stream: &mut TcpStream, frames: &[FrameBytes]) -> io::Result<()> {
+3 -3
View File
@@ -4,7 +4,7 @@ use core::convert::Infallible;
use rkyv::{Archive, Deserialize, Serialize};
use crate::protocol::tree::{
Call, CallLeaf, ChildRoute, ConnectionState, Ingress, LeafRuntime, ProtocolEndpoint,
Call, CallLeaf, ChildRoute, EndpointOutcome, Ingress, LeafRuntime, ProtocolEndpoint,
decode_call_input, encode_call_reply,
};
use crate::protocol::{PacketType, decode_frame};
@@ -64,7 +64,7 @@ fn leaf_runtime_dispatches_generated_call_procedure() {
None,
vec![ChildRoute {
path: path(&["agent"]),
state: ConnectionState::Registered,
registered: true,
}],
Vec::new(),
);
@@ -81,7 +81,7 @@ fn leaf_runtime_dispatches_generated_call_procedure() {
.expect("request should encode"),
)
.expect("call should encode");
let Some((_, frame)) = controller_outcome.forward else {
let EndpointOutcome::Forward { frame, .. } = controller_outcome else {
panic!("controller should forward call to child");
};
+20 -8
View File
@@ -2,7 +2,7 @@ use alloc::{borrow::ToOwned, collections::BTreeMap, format, string::String, vec,
use core::convert::Infallible;
use crate::protocol::tree::{
Call, ChildRoute, ConnectionState, Endpoint, HookKey, Ingress, OutgoingData, Procedure,
Call, ChildRoute, Endpoint, EndpointOutcome, HookKey, Ingress, OutgoingData, Procedure,
ProcedureEffect, ProcedureRuntime, ProcedureStore, ProtocolEndpoint, encode_call_reply,
};
use crate::protocol::{PacketType, decode_frame};
@@ -80,7 +80,7 @@ fn procedure_runtime_routes_data_to_stored_session() {
None,
vec![ChildRoute {
path: path(&["agent"]),
state: ConnectionState::Registered,
registered: true,
}],
Vec::new(),
);
@@ -94,7 +94,10 @@ fn procedure_runtime_routes_data_to_stored_session() {
encode_call_reply(&String::from("prefix:")).expect("procedure input should encode"),
)
.expect("open call should encode");
let Some((_, open_frame)) = open.forward else {
let EndpointOutcome::Forward {
frame: open_frame, ..
} = open
else {
panic!("controller should forward opening call");
};
runtime
@@ -110,7 +113,10 @@ fn procedure_runtime_routes_data_to_stored_session() {
true,
)
.expect("data should encode");
let Some((_, data_frame)) = data.forward else {
let EndpointOutcome::Forward {
frame: data_frame, ..
} = data
else {
panic!("controller should forward data frame");
};
let outcome = runtime
@@ -129,7 +135,7 @@ fn procedure_runtime_routes_data_to_stored_session() {
let forwarded = controller
.receive(&Ingress::Child(path(&["agent"])), response_frame.clone())
.expect("controller should receive session response");
assert!(forwarded.event.is_some());
assert!(matches!(forwarded, EndpointOutcome::Local(_)));
assert!(runtime.leaf_mut().procedure_sessions().is_empty());
}
@@ -204,7 +210,7 @@ fn procedure_runtime_keeps_session_after_local_end_until_explicit_close() {
None,
vec![ChildRoute {
path: path(&["agent"]),
state: ConnectionState::Registered,
registered: true,
}],
Vec::new(),
);
@@ -218,7 +224,10 @@ fn procedure_runtime_keeps_session_after_local_end_until_explicit_close() {
encode_call_reply(&()).expect("unit call should encode"),
)
.expect("open call should encode");
let Some((_, open_frame)) = open.forward else {
let EndpointOutcome::Forward {
frame: open_frame, ..
} = open
else {
panic!("controller should forward opening call");
};
runtime
@@ -234,7 +243,10 @@ fn procedure_runtime_keeps_session_after_local_end_until_explicit_close() {
false,
)
.expect("local end trigger should encode");
let Some((_, local_end_frame)) = local_end.forward else {
let EndpointOutcome::Forward {
frame: local_end_frame, ..
} = local_end
else {
panic!("controller should forward local end trigger");
};
let outcome = runtime
+9 -11
View File
@@ -1,8 +1,8 @@
use alloc::{borrow::ToOwned, string::String, vec, vec::Vec};
use crate::protocol::tree::{
ChildRoute, DefaultRouteProvider, Endpoint, Ingress, LeafNode, LeafSpec, LocalEvent,
ProtocolEndpoint, RouteDecision, RouteProvider, TreeNode,
ChildRoute, DefaultRouteProvider, Endpoint, EndpointOutcome, Ingress, LeafNode, LeafSpec,
LocalEvent, ProtocolEndpoint, RouteDecision, RouteProvider, TreeNode,
};
use crate::protocol::{
DataMessage, EndpointIntrospection, FaultMessage, PacketHeader, PacketType, ProtocolFault,
@@ -76,13 +76,11 @@ fn protocol_endpoint_introspection_returns_leaf_summary() {
.receive(&Ingress::Local, frame)
.expect("endpoint should handle introspection");
assert!(outcome.forward.is_none());
let LocalEvent::Data {
let EndpointOutcome::Local(LocalEvent::Data {
header,
message: response,
..
} = outcome.event.as_ref().expect("expected local data event")
}) = &outcome
else {
panic!("expected local data event");
};
@@ -167,10 +165,8 @@ fn invalid_hook_peer_emits_local_fault_event() {
.receive(&Ingress::Child(path(&["intruder"])), frame)
.expect("invalid peer should be handled");
assert!(outcome.forward.is_none());
assert!(!outcome.dropped);
match outcome.event.as_ref().expect("expected local fault event") {
match &outcome {
EndpointOutcome::Local(event) => match event {
LocalEvent::Fault {
header, message, ..
} => {
@@ -184,6 +180,8 @@ fn invalid_hook_peer_emits_local_fault_event() {
);
}
other => panic!("expected fault event, got {other:?}"),
},
other => panic!("expected local fault event, got {other:?}"),
}
}
@@ -302,7 +300,7 @@ fn pending_hook_fault_is_delivered_before_activation() {
)
.expect("introspection should handle pending hook");
assert!(outcome.forward.is_some() || outcome.event.is_some());
assert!(!matches!(outcome, EndpointOutcome::Dropped));
}
#[test]
+52 -11
View File
@@ -16,65 +16,90 @@ use super::{
/// One typed incoming `Call` passed to a leaf procedure.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Call<T> {
/// Decoded application input payload.
pub input: T,
/// Endpoint path of the caller that opened this call.
pub caller_path: Vec<String>,
/// Canonical procedure identifier chosen by the caller.
pub procedure_id: String,
/// Optional destination leaf targeted by the call.
pub dst_leaf: Option<String>,
/// Hook key declared by the caller when it expects a response.
pub response_hook: Option<HookKey>,
}
/// One incoming local call event that already passed protocol validation.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IncomingCall {
/// Validated protocol header for the call.
pub header: PacketHeader,
/// Application payload for the call.
pub message: CallMessage,
}
/// One incoming local data event tied to an active hook.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IncomingData {
/// Validated protocol header for the data packet.
pub header: PacketHeader,
/// Hook-associated data payload.
pub message: DataMessage,
/// Resolved hook key for the active session.
pub hook_key: HookKey,
}
/// One incoming local fault event tied to a pending or active hook.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IncomingFault {
/// Validated protocol header for the fault packet.
pub header: PacketHeader,
/// Fault payload emitted by the peer.
pub fault: crate::protocol::FaultMessage,
/// Hook key for the pending or active session that faulted.
pub hook_key: HookKey,
}
/// Outcome of one generated initial call procedure.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CallResult<T> {
/// Return one reply payload to the caller.
Reply(T),
/// Complete the call without any response data.
NoReply,
}
/// One hook-associated `Data` packet emitted by leaf code.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OutgoingData {
/// Destination endpoint path for the hook packet.
pub dst_path: Vec<String>,
/// Hook identifier scoped to the receiving endpoint.
pub hook_id: u64,
/// Procedure identifier that owns this hook stream.
pub procedure_id: String,
/// Serialized application data to send.
pub data: Vec<u8>,
/// Whether this packet closes the local side of the hook.
pub end_hook: bool,
}
/// One runtime-normalized reply produced by generated call dispatch.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CallReply {
/// Serialized reply bytes that should be returned upstream.
Reply(Vec<u8>),
/// Complete without emitting any reply packet.
NoReply,
}
/// Error surfaced while decoding one incoming call or encoding one generated reply.
#[derive(Debug)]
pub enum DispatchError<E> {
/// Failed to decode the typed call input.
Decode(FrameError),
/// Failed to encode the typed call output.
Encode(FrameError),
/// The leaf-specific call handler returned an error.
Handler(E),
}
@@ -96,8 +121,11 @@ impl<E> core::error::Error for DispatchError<E> where E: core::error::Error + 's
/// Error surfaced by the stateful leaf runtime.
#[derive(Debug)]
pub enum LeafRuntimeError<E> {
/// Protocol endpoint routing or framing failed.
Endpoint(EndpointError),
/// Typed call dispatch failed.
Dispatch(DispatchError<E>),
/// Leaf-local data or fault handling failed.
Leaf(E),
}
@@ -124,6 +152,7 @@ impl<E> From<EndpointError> for LeafRuntimeError<E> {
/// High-level leaf behavior layered on top of validated protocol events.
pub trait CallLeaf: ProtocolLeaf {
/// Leaf-specific error surfaced by call, data, or fault handling.
type Error;
/// Handles hook-associated inbound `Data` after protocol validation.
@@ -152,30 +181,37 @@ pub struct LeafRuntime<L> {
/// Frames emitted by the runtime after one receive or poll step.
#[derive(Debug, Default)]
pub struct RuntimeOutcome {
/// Frames emitted while processing the step.
pub frames: Vec<FrameBytes>,
/// Whether the endpoint dropped the incoming packet.
pub dropped: bool,
}
impl<L> LeafRuntime<L> {
/// Builds a runtime from one endpoint and one leaf instance.
#[must_use]
pub fn new(endpoint: ProtocolEndpoint, leaf: L) -> Self {
Self { endpoint, leaf }
}
/// Returns the underlying protocol endpoint.
#[must_use]
pub fn endpoint(&self) -> &ProtocolEndpoint {
&self.endpoint
}
/// Returns a mutable reference to the underlying endpoint.
pub fn endpoint_mut(&mut self) -> &mut ProtocolEndpoint {
&mut self.endpoint
}
/// Returns the hosted leaf instance.
#[must_use]
pub fn leaf(&self) -> &L {
&self.leaf
}
/// Returns a mutable reference to the hosted leaf instance.
pub fn leaf_mut(&mut self) -> &mut L {
&mut self.leaf
}
@@ -203,18 +239,21 @@ where
&mut self,
outcome: crate::protocol::tree::EndpointOutcome,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let mut runtime = RuntimeOutcome {
frames: Vec::new(),
dropped: outcome.dropped,
};
if let Some((_route, frame)) = outcome.forward {
runtime.frames.push(frame);
match outcome {
crate::protocol::tree::EndpointOutcome::Forward { frame, .. } => {
let mut frames = Vec::with_capacity(1);
frames.push(frame);
Ok(RuntimeOutcome {
frames,
dropped: false,
})
}
let Some(event) = outcome.event else {
return Ok(runtime);
};
crate::protocol::tree::EndpointOutcome::Dropped => Ok(RuntimeOutcome {
frames: Vec::new(),
dropped: true,
}),
crate::protocol::tree::EndpointOutcome::Local(event) => {
let mut runtime = RuntimeOutcome::default();
match event {
LocalEvent::Call { header, message } => {
@@ -274,6 +313,8 @@ where
Ok(runtime)
}
}
}
fn emit_outgoing(
&mut self,
+9 -9
View File
@@ -111,7 +111,7 @@ impl ProtocolEndpoint {
) -> Self {
let registered_child_paths = children
.iter()
.filter(|child| child.state == super::core::ConnectionState::Registered)
.filter(|child| child.registered)
.map(|child| child.path.clone())
.collect::<Vec<_>>();
@@ -180,12 +180,12 @@ impl ProtocolEndpoint {
self.hooks
.remove_pending(&HookKey::new(hook.return_path.clone(), hook.hook_id));
}
Ok(EndpointOutcome::dropped())
Ok(EndpointOutcome::Dropped)
}
route => Ok(EndpointOutcome::forward(
route => Ok(EndpointOutcome::Forward {
route,
encode_packet(&header, &call)?,
)),
frame: encode_packet(&header, &call)?,
}),
}
}
@@ -246,11 +246,11 @@ impl ProtocolEndpoint {
match self.decide_route(&header.dst_path) {
RouteDecision::Local => self.handle_local_data(header, message),
RouteDecision::Drop => Ok(EndpointOutcome::dropped()),
route => Ok(EndpointOutcome::forward(
RouteDecision::Drop => Ok(EndpointOutcome::Dropped),
route => Ok(EndpointOutcome::Forward {
route,
encode_packet(&header, &message)?,
)),
frame: encode_packet(&header, &message)?,
}),
}
}
}
+8 -44
View File
@@ -13,20 +13,13 @@ use crate::protocol::{
use super::super::{CompiledRoutes, HookKey, HookTable, RouteDecision};
/// Registration state for a direct child endpoint.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
Unregistered,
Registered,
}
/// Routing metadata for one direct child endpoint.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChildRoute {
/// Absolute path for the child endpoint inside the protocol tree.
pub path: Vec<String>,
/// Whether this child currently participates in routing decisions.
pub state: ConnectionState,
pub registered: bool,
}
impl ChildRoute {
@@ -34,7 +27,7 @@ impl ChildRoute {
pub fn registered(path: Vec<String>) -> Self {
Self {
path,
state: ConnectionState::Registered,
registered: true,
}
}
}
@@ -76,43 +69,14 @@ pub enum LocalEvent {
}
/// Result of processing a frame or building a locally-sent packet.
#[derive(Debug, Default)]
pub struct EndpointOutcome {
#[derive(Debug)]
pub enum EndpointOutcome {
/// Frame to forward, together with the next routing decision.
pub forward: Option<(RouteDecision, FrameBytes)>,
Forward { route: RouteDecision, frame: FrameBytes },
/// Locally-delivered protocol event.
pub event: Option<LocalEvent>,
/// Whether the packet was intentionally discarded.
pub dropped: bool,
}
impl EndpointOutcome {
#[must_use]
pub fn forward(route: RouteDecision, frame: FrameBytes) -> Self {
Self {
forward: Some((route, frame)),
event: None,
dropped: false,
}
}
#[must_use]
pub fn event(event: LocalEvent) -> Self {
Self {
forward: None,
event: Some(event),
dropped: false,
}
}
#[must_use]
pub fn dropped() -> Self {
Self {
forward: None,
event: None,
dropped: true,
}
}
Local(LocalEvent),
/// Packet intentionally discarded.
Dropped,
}
/// Error surfaced while validating or encoding protocol frames.
+12 -12
View File
@@ -16,7 +16,7 @@ impl ProtocolEndpoint {
fault: ProtocolFault,
) -> Result<EndpointOutcome, EndpointError> {
let Some(key) = key else {
return Ok(EndpointOutcome::dropped());
return Ok(EndpointOutcome::Dropped);
};
self.hooks.remove_pending(&key);
@@ -32,15 +32,15 @@ impl ProtocolEndpoint {
let message = FaultMessage { fault };
match self.decide_route(&key.return_path) {
RouteDecision::Local => Ok(EndpointOutcome::event(LocalEvent::Fault {
RouteDecision::Local => Ok(EndpointOutcome::Local(LocalEvent::Fault {
header,
message,
hook_key: key,
})),
route => Ok(EndpointOutcome::forward(
route => Ok(EndpointOutcome::Forward {
route,
encode_packet(&header, &message)?,
)),
frame: encode_packet(&header, &message)?,
}),
}
}
@@ -64,12 +64,12 @@ impl ProtocolEndpoint {
self.hooks.activate_pending(&pending_key);
pending_key
} else {
return Ok(EndpointOutcome::dropped());
return Ok(EndpointOutcome::Dropped);
}
};
let Some(active) = self.hooks.active(&key) else {
return Ok(EndpointOutcome::dropped());
return Ok(EndpointOutcome::Dropped);
};
if active.peer_path != header.src_path {
@@ -81,14 +81,14 @@ impl ProtocolEndpoint {
if active.procedure_id != message.procedure_id {
// Data frames stay bound to the procedure chosen by the original call.
return Ok(EndpointOutcome::dropped());
return Ok(EndpointOutcome::Dropped);
}
if message.end_hook && self.hooks.mark_peer_end(&key) {
self.hooks.remove_active(&key);
}
Ok(EndpointOutcome::event(LocalEvent::Data {
Ok(EndpointOutcome::Local(LocalEvent::Data {
header,
message,
hook_key: key,
@@ -106,7 +106,7 @@ impl ProtocolEndpoint {
.resolve_active_key(&self.path, hook_id, &header.src_path)
{
self.hooks.remove_active(&key);
return Ok(EndpointOutcome::event(LocalEvent::Fault {
return Ok(EndpointOutcome::Local(LocalEvent::Fault {
header,
message,
hook_key: key,
@@ -120,14 +120,14 @@ impl ProtocolEndpoint {
.is_some_and(|pending| pending.caller_src_path == header.src_path)
{
self.hooks.remove_pending(&pending_key);
return Ok(EndpointOutcome::event(LocalEvent::Fault {
return Ok(EndpointOutcome::Local(LocalEvent::Fault {
header,
message,
hook_key: pending_key,
}));
}
Ok(EndpointOutcome::dropped())
Ok(EndpointOutcome::Dropped)
}
pub(crate) fn decide_route(&self, dst_path: &[String]) -> RouteDecision {
+6 -6
View File
@@ -18,7 +18,7 @@ impl ProtocolEndpoint {
key: Option<HookKey>,
) -> Result<EndpointOutcome, EndpointError> {
let Some(key) = key else {
return Ok(EndpointOutcome::dropped());
return Ok(EndpointOutcome::Dropped);
};
let response_payload = if let Some(leaf_name) = &header.dst_leaf {
@@ -36,7 +36,7 @@ impl ProtocolEndpoint {
sub_endpoints: self
.children
.iter()
.filter(|child| child.state == super::core::ConnectionState::Registered)
.filter(|child| child.registered)
.filter_map(|child| child.path.get(self.path.len()).cloned())
.collect(),
leaves: self
@@ -72,16 +72,16 @@ impl ProtocolEndpoint {
match self.decide_route(&key.return_path) {
super::super::RouteDecision::Local => {
Ok(EndpointOutcome::event(super::core::LocalEvent::Data {
Ok(EndpointOutcome::Local(super::core::LocalEvent::Data {
header: response_header,
message: response,
hook_key: key,
}))
}
route => Ok(EndpointOutcome::forward(
route => Ok(EndpointOutcome::Forward {
route,
encode_packet(&response_header, &response)?,
)),
frame: encode_packet(&response_header, &response)?,
}),
}
}
}
+2 -2
View File
@@ -11,6 +11,6 @@ mod introspection;
mod receive;
pub use core::{
ChildRoute, ConnectionState, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec,
LocalEvent, ProtocolEndpoint,
ChildRoute, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec, LocalEvent,
ProtocolEndpoint,
};
+30 -20
View File
@@ -75,7 +75,7 @@ impl ProtocolEndpoint {
return self.emit_fault_if_possible(key, ProtocolFault::INTERNAL_ERROR);
}
Ok(EndpointOutcome::event(LocalEvent::Call { header, message }))
Ok(EndpointOutcome::Local(LocalEvent::Call { header, message }))
}
}
@@ -94,7 +94,7 @@ impl Endpoint for ProtocolEndpoint {
validate_header(header)?;
if !self.valid_source_for_ingress(ingress, &header.src_path) {
return Ok(EndpointOutcome::dropped());
return Ok(EndpointOutcome::Dropped);
}
match header.packet_type {
@@ -103,17 +103,19 @@ impl Endpoint for ProtocolEndpoint {
// itself. Children can return data/faults, but they do not initiate new
// calls through this node.
if !matches!(ingress, Ingress::Parent | Ingress::Local) {
return Ok(EndpointOutcome::dropped());
return Ok(EndpointOutcome::Dropped);
}
match self.decide_route(&header.dst_path) {
RouteDecision::Child(index) => {
Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame))
}
RouteDecision::Parent => {
Ok(EndpointOutcome::forward(RouteDecision::Parent, frame))
}
RouteDecision::Drop => Ok(EndpointOutcome::dropped()),
RouteDecision::Child(index) => Ok(EndpointOutcome::Forward {
route: RouteDecision::Child(index),
frame,
}),
RouteDecision::Parent => Ok(EndpointOutcome::Forward {
route: RouteDecision::Parent,
frame,
}),
RouteDecision::Drop => Ok(EndpointOutcome::Dropped),
RouteDecision::Local => {
let (header, payload) = parsed.into_parts();
let message = deserialize_archived_bytes::<ArchivedCallMessage, CallMessage>(
@@ -133,11 +135,15 @@ impl Endpoint for ProtocolEndpoint {
>(payload)?;
self.handle_local_data(header, message)
}
RouteDecision::Child(index) => {
Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame))
}
RouteDecision::Parent => Ok(EndpointOutcome::forward(RouteDecision::Parent, frame)),
RouteDecision::Drop => Ok(EndpointOutcome::dropped()),
RouteDecision::Child(index) => Ok(EndpointOutcome::Forward {
route: RouteDecision::Child(index),
frame,
}),
RouteDecision::Parent => Ok(EndpointOutcome::Forward {
route: RouteDecision::Parent,
frame,
}),
RouteDecision::Drop => Ok(EndpointOutcome::Dropped),
},
PacketType::Fault => match self.decide_route(&header.dst_path) {
RouteDecision::Local => {
@@ -148,11 +154,15 @@ impl Endpoint for ProtocolEndpoint {
>(payload)?;
self.handle_local_fault(header, message)
}
RouteDecision::Child(index) => {
Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame))
}
RouteDecision::Parent => Ok(EndpointOutcome::forward(RouteDecision::Parent, frame)),
RouteDecision::Drop => Ok(EndpointOutcome::dropped()),
RouteDecision::Child(index) => Ok(EndpointOutcome::Forward {
route: RouteDecision::Child(index),
frame,
}),
RouteDecision::Parent => Ok(EndpointOutcome::Forward {
route: RouteDecision::Parent,
frame,
}),
RouteDecision::Drop => Ok(EndpointOutcome::Dropped),
},
}
}
+2 -2
View File
@@ -18,8 +18,8 @@ pub use call::{
encode_call_reply,
};
pub use endpoint::{
ChildRoute, ConnectionState, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec,
LocalEvent, ProtocolEndpoint,
ChildRoute, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec, LocalEvent,
ProtocolEndpoint,
};
pub use hook::{ActiveHook, HookConflict, HookKey, HookTable, PendingHook};
pub use leaf::{CallProcedures, ProtocolLeaf, derive_leaf_name};
+37 -13
View File
@@ -64,8 +64,8 @@ pub trait ProcedureStore<P> {
///
/// # Example
/// ```rust
/// use alloc::collections::BTreeMap;
/// use alloc::string::String;
/// use std::collections::BTreeMap;
/// use std::string::String;
/// use unshell::{Leaf, Procedure};
/// use unshell::protocol::tree::{Call, HookKey, Procedure, ProcedureEffect, ProcedureStore};
///
@@ -110,7 +110,9 @@ pub trait Procedure<L>: StatefulProcedureMetadata<L> + Sized
where
L: ProtocolLeaf,
{
/// Leaf-specific error surfaced while opening or advancing the session.
type Error;
/// Typed input payload decoded from the opening call.
type Input;
/// Creates one session from the opening `Call`.
@@ -159,6 +161,7 @@ pub struct ProcedureEffect {
}
impl ProcedureEffect {
/// Builds an effect that keeps the session alive after emitting `outgoing`.
#[must_use]
pub fn outgoing(outgoing: Vec<OutgoingData>) -> Self {
Self {
@@ -167,6 +170,7 @@ impl ProcedureEffect {
}
}
/// Builds an effect that closes the session after emitting `outgoing`.
#[must_use]
pub fn close(outgoing: Vec<OutgoingData>) -> Self {
Self {
@@ -179,7 +183,9 @@ impl ProcedureEffect {
/// Error surfaced by the procedure runtime.
#[derive(Debug)]
pub enum ProcedureRuntimeError<E> {
/// Protocol endpoint routing or framing failed.
Endpoint(EndpointError),
/// The opening call failed to decode or open cleanly.
Decode(super::DispatchError<E>),
}
@@ -206,7 +212,9 @@ impl<E> From<EndpointError> for ProcedureRuntimeError<E> {
/// Frames emitted while advancing one stateful procedure runtime.
#[derive(Debug, Default)]
pub struct ProcedureRuntimeOutcome {
/// Frames emitted while processing the current step.
pub frames: Vec<FrameBytes>,
/// Whether the endpoint dropped the incoming packet.
pub dropped: bool,
}
@@ -223,6 +231,7 @@ pub struct ProcedureRuntime<L, P> {
}
impl<L, P> ProcedureRuntime<L, P> {
/// Builds a procedure runtime from one endpoint and one leaf instance.
#[must_use]
pub fn new(endpoint: ProtocolEndpoint, leaf: L) -> Self {
Self {
@@ -232,20 +241,24 @@ impl<L, P> ProcedureRuntime<L, P> {
}
}
/// Returns the underlying protocol endpoint.
#[must_use]
pub fn endpoint(&self) -> &ProtocolEndpoint {
&self.endpoint
}
/// Returns a mutable reference to the protocol endpoint.
pub fn endpoint_mut(&mut self) -> &mut ProtocolEndpoint {
&mut self.endpoint
}
/// Returns the hosted leaf instance.
#[must_use]
pub fn leaf(&self) -> &L {
&self.leaf
}
/// Returns a mutable reference to the hosted leaf instance.
pub fn leaf_mut(&mut self) -> &mut L {
&mut self.leaf
}
@@ -327,18 +340,21 @@ where
&mut self,
outcome: super::EndpointOutcome,
) -> Result<ProcedureRuntimeOutcome, ProcedureRuntimeError<P::Error>> {
let mut runtime = ProcedureRuntimeOutcome {
frames: Vec::new(),
dropped: outcome.dropped,
};
if let Some((_route, frame)) = outcome.forward {
runtime.frames.push(frame);
match outcome {
super::EndpointOutcome::Forward { frame, .. } => {
let mut frames = Vec::with_capacity(1);
frames.push(frame);
Ok(ProcedureRuntimeOutcome {
frames,
dropped: false,
})
}
let Some(event) = outcome.event else {
return Ok(runtime);
};
super::EndpointOutcome::Dropped => Ok(ProcedureRuntimeOutcome {
frames: Vec::new(),
dropped: true,
}),
super::EndpointOutcome::Local(event) => {
let mut runtime = ProcedureRuntimeOutcome::default();
match event {
LocalEvent::Call { header, message } => {
@@ -448,6 +464,8 @@ where
Ok(runtime)
}
}
}
fn open_session(&mut self, call: IncomingCall) -> Result<P, DispatchError<P::Error>> {
let input = decode_call_input::<P::Input>(call.message.data.as_slice())
@@ -523,6 +541,8 @@ where
hook_key: &HookKey,
mut effect: ProcedureEffect,
) -> ProcedureEffect {
// Once a session emits `end_hook`, later packets would violate the protocol,
// so the runtime keeps only the prefix through that terminal packet.
if let Some(index) = effect.outgoing.iter().position(|packet| packet.end_hook) {
effect.outgoing.truncate(index + 1);
}
@@ -535,6 +555,9 @@ where
&& !effect.outgoing.iter().any(|packet| packet.end_hook)
&& !local_end_already_sent
{
// Closing a session without an explicit terminal packet would leave the
// protocol hook half-open, so emit an empty terminal frame on behalf of
// the procedure unless the local side already ended earlier.
effect.outgoing.push(OutgoingData {
dst_path: hook_key.return_path.clone(),
hook_id: hook_key.hook_id,
@@ -545,4 +568,5 @@ where
}
effect
}
}
+2 -2
View File
@@ -6,7 +6,7 @@
use std::collections::{BTreeMap, VecDeque};
use crossbeam_channel::unbounded;
use unshell::protocol::tree::{ChildRoute, ConnectionState, ProtocolEndpoint};
use unshell::protocol::tree::{ChildRoute, ProtocolEndpoint};
use crate::model::{DemoTree, NodeId, ScenarioDefinition, Selection};
@@ -42,7 +42,7 @@ impl Simulation {
.iter()
.map(|child_id| ChildRoute {
path: tree.node(*child_id).path.clone(),
state: ConnectionState::Registered,
registered: true,
})
.collect::<Vec<_>>();
+6 -8
View File
@@ -87,12 +87,11 @@ impl Simulation {
node_id: NodeId,
outcome: unshell::protocol::tree::EndpointOutcome,
) -> Result<(), SimError> {
if outcome.dropped {
match outcome {
unshell::protocol::tree::EndpointOutcome::Dropped => {
self.record_trace(node_id, "packet dropped".to_owned());
}
if let Some((route, frame)) = outcome.forward {
match route {
unshell::protocol::tree::EndpointOutcome::Forward { route, frame } => match route {
RouteDecision::Child(index) => {
let child_id = self.nodes[node_id.0]
.children
@@ -147,12 +146,11 @@ impl Simulation {
RouteDecision::Drop => {
self.record_trace(node_id, "route decision dropped frame".to_owned());
}
}
}
if let Some(event) = outcome.event {
},
unshell::protocol::tree::EndpointOutcome::Local(event) => {
self.handle_local_event(node_id, event)?;
}
}
Ok(())
}