Reduce protocol packet-flow allocations

Replace vector-backed endpoint outcomes with single-action results, skip payload deserialization on forwarded data and faults, and route local call and data emissions without encode/decode roundtrips.
This commit is contained in:
Michael Mikovsky
2026-04-25 11:46:45 -06:00
parent 792eb28457
commit 7b5b148ef3
10 changed files with 266 additions and 228 deletions
+126 -57
View File
@@ -11,10 +11,95 @@ use crate::protocol::{
encode_packet, validate_call, validate_header, validate_procedure_id,
};
use super::core::{ChildRoute, EndpointError, ProtocolEndpoint};
use super::super::RouteDecision;
use super::core::{ChildRoute, EndpointError, EndpointOutcome, ProtocolEndpoint};
use crate::protocol::tree::LeafSpec;
impl ProtocolEndpoint {
fn prepare_call(
&self,
dst_path: Vec<String>,
dst_leaf: Option<String>,
procedure_id: impl Into<String>,
response_hook_id: Option<u64>,
data: Vec<u8>,
) -> Result<(PacketHeader, CallMessage), EndpointError> {
let procedure_id = procedure_id.into();
validate_procedure_id(&procedure_id)?;
let response_hook = response_hook_id.map(|hook_id| HookTarget {
hook_id,
return_path: self.path.clone(),
});
let header = PacketHeader {
packet_type: PacketType::Call,
src_path: self.path.clone(),
dst_path,
dst_leaf,
hook_id: None,
};
let call = CallMessage {
procedure_id,
data,
response_hook,
};
validate_header(&header)?;
validate_call(&header, &call)?;
Ok((header, call))
}
fn register_outbound_call_hook(
&mut self,
header: &PacketHeader,
call: &CallMessage,
) -> Result<(), EndpointError> {
if let Some(hook) = &call.response_hook
&& self
.hooks
.insert_active(ActiveHook {
return_path: hook.return_path.clone(),
hook_id: hook.hook_id,
peer_path: header.dst_path.clone(),
procedure_id: call.procedure_id.clone(),
dst_leaf: header.dst_leaf.clone(),
peer_finished: false,
})
.is_err()
{
return Err(EndpointError::Validation(ValidationError::InvalidHookId));
}
Ok(())
}
fn prepare_data(
&self,
dst_path: Vec<String>,
hook_id: u64,
procedure_id: impl Into<String>,
data: Vec<u8>,
end_hook: bool,
) -> Result<(PacketHeader, DataMessage), EndpointError> {
let procedure_id = procedure_id.into();
validate_procedure_id(&procedure_id)?;
let header = PacketHeader {
packet_type: PacketType::Data,
src_path: self.path.clone(),
dst_path,
dst_leaf: None,
hook_id: Some(hook_id),
};
let message = DataMessage {
procedure_id,
data,
end_hook,
};
validate_header(&header)?;
Ok((header, message))
}
/// Creates a runtime endpoint with static tree topology and leaf metadata.
///
/// ```
@@ -69,48 +154,31 @@ impl ProtocolEndpoint {
response_hook_id: Option<u64>,
data: Vec<u8>,
) -> Result<FrameBytes, EndpointError> {
let procedure_id = procedure_id.into();
validate_procedure_id(&procedure_id)?;
let response_hook = response_hook_id.map(|hook_id| HookTarget {
hook_id,
return_path: self.path.clone(),
});
let header = PacketHeader {
packet_type: PacketType::Call,
src_path: self.path.clone(),
dst_path: dst_path.clone(),
dst_leaf: dst_leaf.clone(),
hook_id: None,
};
let call = CallMessage {
procedure_id: procedure_id.clone(),
data,
response_hook,
};
validate_header(&header)?;
validate_call(&header, &call)?;
if let Some(hook) = &call.response_hook
&& self
.hooks
.insert_active(ActiveHook {
return_path: hook.return_path.clone(),
hook_id: hook.hook_id,
peer_path: dst_path,
procedure_id,
dst_leaf,
peer_finished: false,
})
.is_err()
{
return Err(EndpointError::Validation(ValidationError::InvalidHookId));
}
let (header, call) =
self.prepare_call(dst_path, dst_leaf, procedure_id, response_hook_id, data)?;
self.register_outbound_call_hook(&header, &call)?;
Ok(encode_packet(&header, &call)?)
}
/// Routes one locally originated `Call` without an encode/decode roundtrip.
pub fn send_call(
&mut self,
dst_path: Vec<String>,
dst_leaf: Option<String>,
procedure_id: impl Into<String>,
response_hook_id: Option<u64>,
data: Vec<u8>,
) -> Result<EndpointOutcome, EndpointError> {
let (header, call) =
self.prepare_call(dst_path, dst_leaf, procedure_id, response_hook_id, data)?;
self.register_outbound_call_hook(&header, &call)?;
match self.decide_route(&header.dst_path) {
RouteDecision::Local => self.handle_local_call(header, call),
route => Ok(EndpointOutcome::forward(route, encode_packet(&header, &call)?)),
}
}
/// Builds an outbound `Data` packet for an existing hook.
pub fn make_data(
&self,
@@ -120,23 +188,24 @@ impl ProtocolEndpoint {
data: Vec<u8>,
end_hook: bool,
) -> Result<FrameBytes, EndpointError> {
let procedure_id = procedure_id.into();
validate_procedure_id(&procedure_id)?;
let header = PacketHeader {
packet_type: PacketType::Data,
src_path: self.path.clone(),
dst_path,
dst_leaf: None,
hook_id: Some(hook_id),
};
let message = DataMessage {
procedure_id,
data,
end_hook,
};
validate_header(&header)?;
let (header, message) = self.prepare_data(dst_path, hook_id, procedure_id, data, end_hook)?;
Ok(encode_packet(&header, &message)?)
}
/// Routes one locally originated `Data` packet without an encode/decode roundtrip.
pub fn send_data(
&mut self,
dst_path: Vec<String>,
hook_id: u64,
procedure_id: impl Into<String>,
data: Vec<u8>,
end_hook: bool,
) -> Result<EndpointOutcome, EndpointError> {
let (header, message) = self.prepare_data(dst_path, hook_id, procedure_id, data, end_hook)?;
match self.decide_route(&header.dst_path) {
RouteDecision::Local => self.handle_local_data(header, message),
route => Ok(EndpointOutcome::forward(route, encode_packet(&header, &message)?)),
}
}
}
+36 -4
View File
@@ -93,14 +93,46 @@ pub enum LocalEvent {
/// Result of processing one framed packet.
#[derive(Debug, Default)]
pub struct EndpointOutcome {
/// Forwarding actions to perform after local processing.
pub forwards: Vec<(RouteDecision, FrameBytes)>,
/// Events delivered to local runtime consumers.
pub events: Vec<LocalEvent>,
/// Forwarding action to perform after local processing.
pub forward: Option<(RouteDecision, FrameBytes)>,
/// Event delivered to the local runtime consumer.
pub event: Option<LocalEvent>,
/// Whether the packet was intentionally dropped with no other side effects.
pub dropped: bool,
}
impl EndpointOutcome {
/// Returns an outcome that only forwards one frame.
#[must_use]
pub fn forward(route: RouteDecision, frame: FrameBytes) -> Self {
Self {
forward: Some((route, frame)),
event: None,
dropped: false,
}
}
/// Returns an outcome that only delivers one local event.
#[must_use]
pub fn event(event: LocalEvent) -> Self {
Self {
forward: None,
event: Some(event),
dropped: false,
}
}
/// Returns an outcome that silently drops the packet.
#[must_use]
pub fn dropped() -> Self {
Self {
forward: None,
event: None,
dropped: true,
}
}
}
/// Errors returned while decoding or validating a packet.
#[derive(Debug)]
pub enum EndpointError {
+21 -48
View File
@@ -3,7 +3,7 @@
//! These methods implement the hook lifecycle described in `PROTOCOL.md`:
//! pending contexts, active contexts, peer validation, and fault emission.
use alloc::{string::String, vec};
use alloc::string::String;
use crate::protocol::{
DataMessage, FaultMessage, PacketHeader, PacketType, ProtocolFault, encode_packet,
@@ -20,10 +20,7 @@ impl ProtocolEndpoint {
fault: ProtocolFault,
) -> Result<EndpointOutcome, EndpointError> {
let Some(key) = key else {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
return Ok(EndpointOutcome::dropped());
};
self.hooks.remove_pending(&key);
@@ -40,16 +37,10 @@ impl ProtocolEndpoint {
let route = self.decide_route(&key.return_path);
match route {
RouteDecision::Local => Ok(EndpointOutcome {
events: vec![LocalEvent::Fault { header, message }],
..EndpointOutcome::default()
}),
RouteDecision::Local => Ok(EndpointOutcome::event(LocalEvent::Fault { header, message })),
_ => {
let frame = encode_packet(&header, &message)?;
Ok(EndpointOutcome {
forwards: vec![(route, frame)],
..EndpointOutcome::default()
})
Ok(EndpointOutcome::forward(route, frame))
}
}
}
@@ -85,47 +76,35 @@ impl ProtocolEndpoint {
}
let Some(active) = self.hooks.active(&key) else {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
return Ok(EndpointOutcome::dropped());
};
if active.peer_path != header.src_path {
self.hooks.remove_active(&key);
self.hooks.remove_pending(&key);
return Ok(EndpointOutcome {
events: vec![LocalEvent::Fault {
header: PacketHeader {
packet_type: PacketType::Fault,
src_path: header.src_path,
dst_path: self.path.clone(),
dst_leaf: None,
hook_id: Some(key.hook_id),
},
message: FaultMessage {
fault: ProtocolFault::INVALID_HOOK_PEER,
},
}],
..EndpointOutcome::default()
});
return Ok(EndpointOutcome::event(LocalEvent::Fault {
header: PacketHeader {
packet_type: PacketType::Fault,
src_path: header.src_path,
dst_path: self.path.clone(),
dst_leaf: None,
hook_id: Some(key.hook_id),
},
message: FaultMessage {
fault: ProtocolFault::INVALID_HOOK_PEER,
},
}));
}
if active.procedure_id != message.procedure_id {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
return Ok(EndpointOutcome::dropped());
}
if message.end_hook {
self.hooks.remove_active(&key);
}
Ok(EndpointOutcome {
events: vec![LocalEvent::Data { header, message }],
..EndpointOutcome::default()
})
Ok(EndpointOutcome::event(LocalEvent::Data { header, message }))
}
/// Handles locally delivered hook `Fault` packets.
@@ -145,19 +124,13 @@ impl ProtocolEndpoint {
.is_some_and(|pending| pending.caller_src_path == header.src_path);
if !matches {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
return Ok(EndpointOutcome::dropped());
}
self.hooks.remove_active(&key);
self.hooks.remove_pending(&key);
Ok(EndpointOutcome {
events: vec![LocalEvent::Fault { header, message }],
..EndpointOutcome::default()
})
Ok(EndpointOutcome::event(LocalEvent::Fault { header, message }))
}
/// Chooses the next hop using the protocol's longest-prefix routing rule.
+7 -16
View File
@@ -3,7 +3,7 @@
//! This code implements the reserved empty-procedure behavior from the
//! introspection sections of `PROTOCOL.md`.
use alloc::{string::String, vec};
use alloc::string::String;
use rkyv::{rancor::Error as RkyvError, to_bytes};
use crate::protocol::{
@@ -22,14 +22,9 @@ impl ProtocolEndpoint {
key: Option<HookKey>,
) -> Result<EndpointOutcome, EndpointError> {
let Some(key) = key else {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
return Ok(EndpointOutcome::dropped());
};
self.hooks.activate_pending(&key, header.src_path.clone());
let payload = if let Some(leaf_name) = &header.dst_leaf {
let Some(leaf) = self.leaves.get(leaf_name) else {
return self.emit_fault_if_possible(Some(key), ProtocolFault::UNKNOWN_LEAF);
@@ -77,19 +72,15 @@ impl ProtocolEndpoint {
let route = self.decide_route(&key.return_path);
match route {
super::super::RouteDecision::Local => Ok(EndpointOutcome {
events: vec![super::core::LocalEvent::Data {
super::super::RouteDecision::Local => Ok(EndpointOutcome::event(
super::core::LocalEvent::Data {
header: response_header,
message: response,
}],
..EndpointOutcome::default()
}),
},
)),
_ => {
let frame = encode_packet(&response_header, &response)?;
Ok(EndpointOutcome {
forwards: vec![(route, frame)],
..EndpointOutcome::default()
})
Ok(EndpointOutcome::forward(route, frame))
}
}
}
+49 -74
View File
@@ -3,14 +3,12 @@
//! This file implements the transport-facing packet entry point and maps it to
//! the `Call`, `Data`, and `Fault` sections of `PROTOCOL.md`.
use alloc::vec;
use crate::protocol::{
CallMessage, PacketType, ProtocolFault, decode_frame, introspection::INTROSPECTION_PROCEDURE_ID,
validate_call, validate_header,
};
use super::super::{HookKey, PendingHook, RouteDecision};
use super::super::{ActiveHook, HookKey, RouteDecision};
use super::core::{
Endpoint, EndpointError, EndpointOutcome, Ingress, LocalEvent, ProtocolEndpoint,
};
@@ -27,22 +25,6 @@ impl ProtocolEndpoint {
.as_ref()
.map(|hook| HookKey::new(hook.return_path.clone(), hook.hook_id));
if let Some(hook) = &message.response_hook
&& hook.return_path != self.path
&& self
.hooks
.insert_pending(PendingHook {
caller_src_path: header.src_path.clone(),
return_path: hook.return_path.clone(),
hook_id: hook.hook_id,
procedure_id: message.procedure_id.clone(),
dst_leaf: header.dst_leaf.clone(),
})
.is_err()
{
return self.emit_fault_if_possible(key, ProtocolFault::INTERNAL_ERROR);
}
if message.procedure_id == INTROSPECTION_PROCEDURE_ID {
return self.handle_introspection(&header, key);
}
@@ -73,14 +55,24 @@ impl ProtocolEndpoint {
return self.emit_fault_if_possible(key, fault);
}
if let Some(key) = &key {
self.hooks.activate_pending(key, header.src_path.clone());
if let Some(hook) = &message.response_hook
&& hook.return_path != self.path
&& self
.hooks
.insert_active(ActiveHook {
return_path: hook.return_path.clone(),
hook_id: hook.hook_id,
peer_path: header.src_path.clone(),
procedure_id: message.procedure_id.clone(),
dst_leaf: header.dst_leaf.clone(),
peer_finished: false,
})
.is_err()
{
return self.emit_fault_if_possible(key, ProtocolFault::INTERNAL_ERROR);
}
Ok(EndpointOutcome {
events: vec![LocalEvent::Call { header, message }],
..EndpointOutcome::default()
})
Ok(EndpointOutcome::event(LocalEvent::Call { header, message }))
}
}
@@ -99,73 +91,56 @@ impl Endpoint for ProtocolEndpoint {
validate_header(header)?;
if !self.valid_source_for_ingress(ingress, &header.src_path) {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
return Ok(EndpointOutcome::dropped());
}
match header.packet_type {
PacketType::Call => {
let message = parsed.deserialize_call()?;
if !matches!(ingress, Ingress::Parent | Ingress::Local) {
return Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
});
return Ok(EndpointOutcome::dropped());
}
validate_call(header, &message)?;
match self.decide_route(&header.dst_path) {
RouteDecision::Child(index) => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Child(index), frame)],
..EndpointOutcome::default()
}),
RouteDecision::Parent => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Parent, frame)],
..EndpointOutcome::default()
}),
RouteDecision::Drop => Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
}),
RouteDecision::Local => self.handle_local_call(header.clone(), message),
RouteDecision::Child(index) => {
Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame))
}
RouteDecision::Parent => {
Ok(EndpointOutcome::forward(RouteDecision::Parent, frame))
}
RouteDecision::Drop => Ok(EndpointOutcome::dropped()),
RouteDecision::Local => self.handle_local_call(parsed.deserialize_header(), message),
}
}
PacketType::Data => {
let message = parsed.deserialize_data()?;
match self.decide_route(&header.dst_path) {
RouteDecision::Local => self.handle_local_data(header.clone(), message),
RouteDecision::Child(index) => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Child(index), frame)],
..EndpointOutcome::default()
}),
RouteDecision::Parent => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Parent, frame)],
..EndpointOutcome::default()
}),
RouteDecision::Drop => Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
}),
RouteDecision::Local => {
let message = parsed.deserialize_data()?;
self.handle_local_data(parsed.deserialize_header(), message)
}
RouteDecision::Child(index) => {
Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame))
}
RouteDecision::Parent => {
Ok(EndpointOutcome::forward(RouteDecision::Parent, frame))
}
RouteDecision::Drop => Ok(EndpointOutcome::dropped()),
}
}
PacketType::Fault => {
let message = parsed.deserialize_fault()?;
match self.decide_route(&header.dst_path) {
RouteDecision::Local => self.handle_local_fault(header.clone(), message),
RouteDecision::Child(index) => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Child(index), frame)],
..EndpointOutcome::default()
}),
RouteDecision::Parent => Ok(EndpointOutcome {
forwards: vec![(RouteDecision::Parent, frame)],
..EndpointOutcome::default()
}),
RouteDecision::Drop => Ok(EndpointOutcome {
dropped: true,
..EndpointOutcome::default()
}),
RouteDecision::Local => {
let message = parsed.deserialize_fault()?;
self.handle_local_fault(parsed.deserialize_header(), message)
}
RouteDecision::Child(index) => {
Ok(EndpointOutcome::forward(RouteDecision::Child(index), frame))
}
RouteDecision::Parent => {
Ok(EndpointOutcome::forward(RouteDecision::Parent, frame))
}
RouteDecision::Drop => Ok(EndpointOutcome::dropped()),
}
}
}