Add stateful call leaf runtime

This commit is contained in:
Michael Mikovsky
2026-04-25 15:35:08 -06:00
parent 56bc7ee4f8
commit 7e266e2a38
18 changed files with 1349 additions and 388 deletions
+106
View File
@@ -0,0 +1,106 @@
use alloc::{borrow::ToOwned, format, string::String, vec, vec::Vec};
use core::convert::Infallible;
use rkyv::{Archive, Deserialize, Serialize};
use crate::protocol::tree::{
Call, CallLeaf, ChildRoute, ConnectionState, Ingress, LeafRuntime, ProtocolEndpoint,
decode_call_input, encode_call_reply,
};
use crate::protocol::{PacketType, decode_frame};
use crate::{Leaf, procedures};
fn path(parts: &[&str]) -> Vec<String> {
parts.iter().map(|part| (*part).to_owned()).collect()
}
#[derive(Leaf)]
#[leaf(id = "org.example.v1.echo")]
struct EchoLeaf {
prefix: String,
}
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
struct EchoRequest {
text: String,
}
#[derive(Archive, Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
struct EchoResponse {
text: String,
}
#[procedures(error = Infallible)]
impl EchoLeaf {
#[call]
fn echo(&mut self, request: Call<EchoRequest>) -> EchoResponse {
EchoResponse {
text: format!("{}{}", self.prefix, request.input.text),
}
}
}
impl CallLeaf for EchoLeaf {
type Error = Infallible;
}
#[test]
fn leaf_runtime_dispatches_generated_call_procedure() {
let endpoint = ProtocolEndpoint::new(
path(&["agent"]),
Some(Vec::new()),
Vec::new(),
vec![EchoLeaf::protocol_leaf_spec()],
);
let mut runtime = LeafRuntime::new(
endpoint,
EchoLeaf {
prefix: String::from("echo: "),
},
);
let mut controller = ProtocolEndpoint::new(
Vec::new(),
None,
vec![ChildRoute {
path: path(&["agent"]),
state: ConnectionState::Registered,
}],
Vec::new(),
);
let hook_id = controller.allocate_hook_id();
let controller_outcome = controller
.send_call(
path(&["agent"]),
Some(EchoLeaf::protocol_leaf_name()),
EchoLeaf::protocol_procedure_id("echo").expect("generated suffix should resolve"),
Some(hook_id),
encode_call_reply(&EchoRequest {
text: String::from("hello"),
})
.expect("request should encode"),
)
.expect("call should encode");
let Some((_, frame)) = controller_outcome.forward else {
panic!("controller should forward call to child");
};
let outcome = runtime
.receive(&Ingress::Parent, frame)
.expect("runtime should handle call");
let [response_frame] = outcome.frames.as_slice() else {
panic!("expected one response frame");
};
let parsed = decode_frame(response_frame.as_slice()).expect("response frame should decode");
assert_eq!(parsed.packet_type(), PacketType::Data);
let response = decode_call_input::<EchoResponse>(
parsed
.deserialize_data()
.expect("data payload should deserialize")
.data
.as_slice(),
)
.expect("typed response should decode");
assert_eq!(response.text, "echo: hello");
}
+1
View File
@@ -1,2 +1,3 @@
mod call;
mod protocol;
mod tree;
+69 -1
View File
@@ -81,6 +81,7 @@ fn protocol_endpoint_introspection_returns_leaf_summary() {
let LocalEvent::Data {
header,
message: response,
..
} = outcome.event.as_ref().expect("expected local data event")
else {
panic!("expected local data event");
@@ -142,7 +143,9 @@ fn invalid_hook_peer_emits_local_fault_event() {
assert!(!outcome.dropped);
match outcome.event.as_ref().expect("expected event") {
LocalEvent::Fault { header, message } => {
LocalEvent::Fault {
header, message, ..
} => {
assert_eq!(header.packet_type, PacketType::Fault);
assert_eq!(header.hook_id, Some(hook_id));
assert_eq!(
@@ -251,3 +254,68 @@ fn pending_hook_fault_is_delivered_before_activation() {
assert!(outcome.forward.is_some() || outcome.event.is_some());
}
#[test]
fn callee_side_end_hook_marks_local_end_before_peer_close() {
let mut endpoint = ProtocolEndpoint::new(path(&["server"]), None, Vec::new(), Vec::new());
endpoint
.add_endpoint_procedure("example.service.v1.invoke")
.expect("procedure registration should succeed");
let frame = encode_packet(
&PacketHeader {
packet_type: PacketType::Call,
src_path: Vec::new(),
dst_path: path(&["server"]),
dst_leaf: None,
hook_id: None,
},
&crate::protocol::CallMessage {
procedure_id: "example.service.v1.invoke".to_owned(),
data: vec![1],
response_hook: Some(crate::protocol::HookTarget {
hook_id: 21,
return_path: Vec::new(),
}),
},
)
.expect("call should encode");
endpoint
.receive(&Ingress::Parent, frame)
.expect("callee should accept call");
let key = crate::protocol::tree::HookKey::new(Vec::new(), 21);
assert!(endpoint.hooks.active(&key).is_some());
endpoint
.send_data(
Vec::new(),
21,
"example.service.v1.invoke",
Vec::new(),
true,
)
.expect("callee local end should succeed");
assert!(endpoint.hooks.active(&key).is_some());
let peer_final = encode_packet(
&PacketHeader {
packet_type: PacketType::Data,
src_path: Vec::new(),
dst_path: path(&["server"]),
dst_leaf: None,
hook_id: Some(21),
},
&DataMessage {
procedure_id: "example.service.v1.invoke".to_owned(),
data: Vec::new(),
end_hook: true,
},
)
.expect("peer final data should encode");
endpoint
.receive(&Ingress::Parent, peer_final)
.expect("callee should accept peer close");
assert!(endpoint.hooks.active(&key).is_none());
}
+350
View File
@@ -0,0 +1,350 @@
//! Stateful application-layer call runtime built on top of `ProtocolEndpoint`.
use alloc::{string::String, vec::Vec};
use core::fmt;
use rkyv::{Archive, Serialize, rancor::Error, to_bytes, util::AlignedVec};
use crate::protocol::{
CallMessage, DataMessage, FrameBytes, FrameError, HookTarget, PacketHeader, ProtocolFault,
};
use super::{
Endpoint, EndpointError, HookKey, Ingress, LocalEvent, ProtocolEndpoint, ProtocolLeaf,
};
/// One typed incoming `Call` passed to a leaf procedure.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Call<T> {
pub input: T,
pub caller_path: Vec<String>,
pub procedure_id: String,
pub dst_leaf: Option<String>,
pub response_hook: Option<HookKey>,
}
/// One incoming local call event that already passed protocol validation.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IncomingCall {
pub header: PacketHeader,
pub message: CallMessage,
}
/// One incoming local data event tied to an active hook.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IncomingData {
pub header: PacketHeader,
pub message: DataMessage,
pub hook_key: HookKey,
}
/// One incoming local fault event tied to a pending or active hook.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct IncomingFault {
pub header: PacketHeader,
pub fault: crate::protocol::FaultMessage,
pub hook_key: HookKey,
}
/// Outcome of one generated initial call procedure.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CallResult<T> {
Reply(T),
NoReply,
}
/// One hook-associated `Data` packet emitted by leaf code.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OutgoingData {
pub dst_path: Vec<String>,
pub hook_id: u64,
pub procedure_id: String,
pub data: Vec<u8>,
pub end_hook: bool,
}
/// One runtime-normalized reply produced by generated call dispatch.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CallReply {
Reply(Vec<u8>),
NoReply,
}
/// Error surfaced while decoding one incoming call or encoding one generated reply.
#[derive(Debug)]
pub enum DispatchError<E> {
Decode(FrameError),
Encode(FrameError),
Handler(E),
}
impl<E> fmt::Display for DispatchError<E>
where
E: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Decode(error) => write!(f, "call decode failed: {error}"),
Self::Encode(error) => write!(f, "call reply encode failed: {error}"),
Self::Handler(error) => write!(f, "call handler failed: {error}"),
}
}
}
impl<E> core::error::Error for DispatchError<E> where E: core::error::Error + 'static {}
/// Error surfaced by the stateful leaf runtime.
#[derive(Debug)]
pub enum LeafRuntimeError<E> {
Endpoint(EndpointError),
Dispatch(DispatchError<E>),
Leaf(E),
}
impl<E> fmt::Display for LeafRuntimeError<E>
where
E: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Endpoint(error) => write!(f, "{error}"),
Self::Dispatch(error) => write!(f, "{error}"),
Self::Leaf(error) => write!(f, "{error}"),
}
}
}
impl<E> core::error::Error for LeafRuntimeError<E> where E: core::error::Error + 'static {}
impl<E> From<EndpointError> for LeafRuntimeError<E> {
fn from(value: EndpointError) -> Self {
Self::Endpoint(value)
}
}
/// High-level leaf behavior layered on top of validated protocol events.
pub trait CallLeaf: ProtocolLeaf {
type Error;
/// Handles hook-associated inbound `Data` after protocol validation.
fn on_data(&mut self, _data: IncomingData) -> Result<Vec<OutgoingData>, Self::Error> {
Ok(Vec::new())
}
/// Observes one inbound `Fault` after protocol validation.
fn on_fault(&mut self, _fault: IncomingFault) -> Result<(), Self::Error> {
Ok(())
}
/// Polls the leaf for locally-generated hook traffic.
fn poll(&mut self) -> Result<Vec<OutgoingData>, Self::Error> {
Ok(Vec::new())
}
}
/// Stateful runtime that combines a protocol endpoint with one leaf instance.
#[derive(Debug)]
pub struct LeafRuntime<L> {
endpoint: ProtocolEndpoint,
leaf: L,
}
/// Frames emitted by the runtime after one receive or poll step.
#[derive(Debug, Default)]
pub struct RuntimeOutcome {
pub frames: Vec<FrameBytes>,
pub dropped: bool,
}
impl<L> LeafRuntime<L> {
#[must_use]
pub fn new(endpoint: ProtocolEndpoint, leaf: L) -> Self {
Self { endpoint, leaf }
}
#[must_use]
pub fn endpoint(&self) -> &ProtocolEndpoint {
&self.endpoint
}
pub fn endpoint_mut(&mut self) -> &mut ProtocolEndpoint {
&mut self.endpoint
}
#[must_use]
pub fn leaf(&self) -> &L {
&self.leaf
}
pub fn leaf_mut(&mut self) -> &mut L {
&mut self.leaf
}
}
impl<L> LeafRuntime<L>
where
L: CallLeaf + super::CallProcedures<Error = <L as CallLeaf>::Error>,
{
pub fn receive(
&mut self,
ingress: &Ingress,
frame: FrameBytes,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let outcome = self.endpoint.receive(ingress, frame)?;
self.process_endpoint_outcome(outcome)
}
pub fn poll(&mut self) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let outgoing = self.leaf.poll().map_err(LeafRuntimeError::Leaf)?;
self.emit_outgoing(outgoing)
}
fn process_endpoint_outcome(
&mut self,
outcome: crate::protocol::tree::EndpointOutcome,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let mut runtime = RuntimeOutcome {
frames: Vec::new(),
dropped: outcome.dropped,
};
if let Some((_route, frame)) = outcome.forward {
runtime.frames.push(frame);
}
let Some(event) = outcome.event else {
return Ok(runtime);
};
match event {
LocalEvent::Call { header, message } => {
let incoming = IncomingCall {
header,
message: message.clone(),
};
match self.leaf.dispatch_call(incoming) {
Ok(CallReply::Reply(bytes)) => {
if let Some(hook) = message.response_hook {
runtime.frames.extend(self.send_reply_data(
hook,
message.procedure_id,
bytes,
true,
)?);
}
}
Ok(CallReply::NoReply) => {}
Err(error) => {
runtime
.frames
.extend(self.emit_internal_fault_if_possible(&message)?);
return Err(LeafRuntimeError::Dispatch(error));
}
}
}
LocalEvent::Data {
header,
message,
hook_key,
} => {
let outgoing = self
.leaf
.on_data(IncomingData {
header,
message,
hook_key,
})
.map_err(LeafRuntimeError::Leaf)?;
runtime.frames.extend(self.emit_outgoing(outgoing)?.frames);
}
LocalEvent::Fault {
header,
message,
hook_key,
} => {
self.leaf
.on_fault(IncomingFault {
header,
fault: message,
hook_key,
})
.map_err(LeafRuntimeError::Leaf)?;
}
}
Ok(runtime)
}
fn emit_outgoing(
&mut self,
outgoing: Vec<OutgoingData>,
) -> Result<RuntimeOutcome, LeafRuntimeError<<L as CallLeaf>::Error>> {
let mut runtime = RuntimeOutcome::default();
for packet in outgoing {
let endpoint_outcome = self.endpoint.send_data(
packet.dst_path,
packet.hook_id,
packet.procedure_id,
packet.data,
packet.end_hook,
)?;
runtime
.frames
.extend(self.process_endpoint_outcome(endpoint_outcome)?.frames);
}
Ok(runtime)
}
fn send_reply_data(
&mut self,
hook: HookTarget,
procedure_id: String,
bytes: Vec<u8>,
end_hook: bool,
) -> Result<Vec<FrameBytes>, LeafRuntimeError<<L as CallLeaf>::Error>> {
let endpoint_outcome = self.endpoint.send_data(
hook.return_path,
hook.hook_id,
procedure_id,
bytes,
end_hook,
)?;
Ok(self.process_endpoint_outcome(endpoint_outcome)?.frames)
}
fn emit_internal_fault_if_possible(
&mut self,
message: &CallMessage,
) -> Result<Vec<FrameBytes>, LeafRuntimeError<<L as CallLeaf>::Error>> {
let Some(hook) = message.response_hook.as_ref() else {
return Ok(Vec::new());
};
let key = HookKey::new(hook.return_path.clone(), hook.hook_id);
let outcome = self
.endpoint
.emit_fault_if_possible(Some(key), ProtocolFault::INTERNAL_ERROR)?;
Ok(self.process_endpoint_outcome(outcome)?.frames)
}
}
/// Decodes one archived call payload into a typed application request.
pub fn decode_call_input<T>(bytes: &[u8]) -> Result<T, FrameError>
where
T: Archive,
<T as Archive>::Archived: rkyv::Portable
+ for<'b> rkyv::bytecheck::CheckBytes<rkyv::api::high::HighValidator<'b, Error>>
+ rkyv::Deserialize<T, rkyv::api::high::HighDeserializer<Error>>,
{
crate::protocol::deserialize_archived_bytes::<<T as Archive>::Archived, T>(bytes)
}
/// Encodes one typed application reply into hook `Data` bytes.
pub fn encode_call_reply<T>(value: &T) -> Result<Vec<u8>, FrameError>
where
T: for<'a> Serialize<
rkyv::api::high::HighSerializer<AlignedVec, rkyv::ser::allocator::ArenaHandle<'a>, Error>,
>,
{
let bytes = to_bytes::<Error>(value).map_err(FrameError::Serialize)?;
Ok(bytes.as_slice().to_vec())
}
+2 -1
View File
@@ -205,6 +205,7 @@ impl ProtocolEndpoint {
data: Vec<u8>,
end_hook: bool,
) -> Result<EndpointOutcome, EndpointError> {
let local_end_dst_path = dst_path.clone();
let (header, message) =
self.prepare_data(dst_path, hook_id, procedure_id, data, end_hook)?;
@@ -213,7 +214,7 @@ impl ProtocolEndpoint {
// so fall back to the endpoint's own hook key shape when closing them.
let local_hook_key = self
.hooks
.resolve_active_key(&self.path, hook_id, &self.path)
.resolve_active_key(&local_end_dst_path, hook_id, &self.path)
.unwrap_or_else(|| HookKey::new(self.path.clone(), hook_id));
if self.hooks.mark_local_end(&local_hook_key) {
self.hooks.remove_active(&local_hook_key);
+3 -1
View File
@@ -11,7 +11,7 @@ use crate::protocol::{
CallMessage, DataMessage, FaultMessage, FrameBytes, FrameError, PacketHeader, ValidationError,
};
use super::super::{CompiledRoutes, HookTable, RouteDecision};
use super::super::{CompiledRoutes, HookKey, HookTable, RouteDecision};
/// Registration state for a direct child endpoint.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -66,10 +66,12 @@ pub enum LocalEvent {
Data {
header: PacketHeader,
message: DataMessage,
hook_key: HookKey,
},
Fault {
header: PacketHeader,
message: FaultMessage,
hook_key: HookKey,
},
}
+9 -1
View File
@@ -35,6 +35,7 @@ impl ProtocolEndpoint {
RouteDecision::Local => Ok(EndpointOutcome::event(LocalEvent::Fault {
header,
message,
hook_key: key,
})),
route => Ok(EndpointOutcome::forward(
route,
@@ -75,6 +76,7 @@ impl ProtocolEndpoint {
message: FaultMessage {
fault: ProtocolFault::INVALID_HOOK_PEER,
},
hook_key: key,
}));
}
@@ -87,7 +89,11 @@ impl ProtocolEndpoint {
self.hooks.remove_active(&key);
}
Ok(EndpointOutcome::event(LocalEvent::Data { header, message }))
Ok(EndpointOutcome::event(LocalEvent::Data {
header,
message,
hook_key: key,
}))
}
pub(crate) fn handle_local_fault(
@@ -104,6 +110,7 @@ impl ProtocolEndpoint {
return Ok(EndpointOutcome::event(LocalEvent::Fault {
header,
message,
hook_key: key,
}));
}
@@ -117,6 +124,7 @@ impl ProtocolEndpoint {
return Ok(EndpointOutcome::event(LocalEvent::Fault {
header,
message,
hook_key: pending_key,
}));
}
@@ -75,6 +75,7 @@ impl ProtocolEndpoint {
Ok(EndpointOutcome::event(super::core::LocalEvent::Data {
header: response_header,
message: response,
hook_key: key,
}))
}
route => Ok(EndpointOutcome::forward(
+13 -3
View File
@@ -1,9 +1,8 @@
//! Application-facing leaf metadata helpers.
//!
//! The protocol runtime itself only knows about `LeafSpec` metadata and validated
//! `LocalEvent::Call` delivery. This trait sits one layer above that runtime so
//! application code can declare canonical leaf names and procedure ids once and
//! then reuse the generated metadata when building endpoints and dispatching calls.
//! `LocalEvent` delivery. `ProtocolLeaf` owns the canonical dotted leaf id, while
//! `CallProcedures` owns generated procedure ids and initial call dispatch.
use alloc::{string::String, vec::Vec};
@@ -13,6 +12,11 @@ use super::LeafSpec;
pub trait ProtocolLeaf {
/// Returns the canonical dotted leaf name hosted by this type.
fn leaf_name() -> String;
}
/// Generated call metadata and initial `Call` dispatch for one leaf.
pub trait CallProcedures: ProtocolLeaf {
type Error;
/// Returns the local procedure suffixes supported by this leaf.
fn procedure_suffixes() -> &'static [&'static str];
@@ -44,6 +48,12 @@ pub trait ProtocolLeaf {
procedures: Self::procedure_ids(),
}
}
/// Dispatches one initial `Call` that targeted this leaf.
fn dispatch_call(
&mut self,
call: crate::protocol::tree::IncomingCall,
) -> Result<crate::protocol::tree::CallReply, crate::protocol::tree::DispatchError<Self::Error>>;
}
/// Builds one canonical dotted leaf id from crate-local metadata plus optional
+7 -1
View File
@@ -5,17 +5,23 @@
//! - `hook` contains the pending/active hook lifecycle tables used by endpoint runtime code.
//! - `endpoint` ties those pieces together into the runtime-facing protocol endpoint API.
mod call;
mod endpoint;
mod hook;
mod leaf;
mod routing;
pub use call::{
Call, CallLeaf, CallReply, CallResult, DispatchError, IncomingCall, IncomingData,
IncomingFault, LeafRuntime, LeafRuntimeError, OutgoingData, RuntimeOutcome, decode_call_input,
encode_call_reply,
};
pub use endpoint::{
ChildRoute, ConnectionState, Endpoint, EndpointError, EndpointOutcome, Ingress, LeafSpec,
LocalEvent, ProtocolEndpoint,
};
pub use hook::{ActiveHook, HookConflict, HookKey, HookTable, PendingHook};
pub use leaf::{ProtocolLeaf, derive_leaf_name};
pub use leaf::{CallProcedures, ProtocolLeaf, derive_leaf_name};
pub use routing::{
CompiledRoutes, DefaultRouteProvider, LeafNode, RouteDecision, RouteProvider, TreeNode,
is_prefix, route_destination,