mirror of
https://github.com/Astatin3/unshell.git
synced 2026-06-08 22:38:01 -06:00
Dispatch local runtime effects to leaves
This commit is contained in:
@@ -118,11 +118,13 @@ Rules:
|
|||||||
effects.
|
effects.
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
pub struct NodeRuntime<T> {
|
pub struct NodeRuntime<T, LeafError = core::convert::Infallible> {
|
||||||
endpoint: EndpointState,
|
endpoint: EndpointState,
|
||||||
connections: Connections,
|
connections: Connections,
|
||||||
transport: T,
|
transport: T,
|
||||||
effects: EffectQueue,
|
effects: EffectQueue,
|
||||||
|
leaves: Vec<RegisteredLeaf<LeafError>>,
|
||||||
|
leaf_actions: Vec<(LeafId, LeafAction)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TickBudget {
|
pub struct TickBudget {
|
||||||
@@ -144,8 +146,6 @@ Primary operations:
|
|||||||
impl<T: Transport> NodeRuntime<T> {
|
impl<T: Transport> NodeRuntime<T> {
|
||||||
pub fn tick(&mut self, budget: TickBudget) -> Result<TickOutcome, NodeRuntimeError<T::Error>>;
|
pub fn tick(&mut self, budget: TickBudget) -> Result<TickOutcome, NodeRuntimeError<T::Error>>;
|
||||||
|
|
||||||
pub fn drain_local_effects(&mut self) -> impl Iterator<Item = RuntimeEffect>;
|
|
||||||
|
|
||||||
pub fn receive_frame(
|
pub fn receive_frame(
|
||||||
&mut self,
|
&mut self,
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
@@ -153,6 +153,24 @@ impl<T: Transport> NodeRuntime<T> {
|
|||||||
) -> Result<(), NodeRuntimeError<T::Error>>;
|
) -> Result<(), NodeRuntimeError<T::Error>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T, LeafError> NodeRuntime<T, LeafError> {
|
||||||
|
pub fn new_with_leaf_error(
|
||||||
|
endpoint: EndpointState,
|
||||||
|
connections: Connections,
|
||||||
|
transport: T,
|
||||||
|
) -> Self;
|
||||||
|
|
||||||
|
pub fn drain_local_effects(&mut self) -> impl Iterator<Item = RuntimeEffect>;
|
||||||
|
|
||||||
|
pub fn register_leaf<L>(&mut self, leaf: L) -> LeafId
|
||||||
|
where
|
||||||
|
L: Leaf<Error = LeafError> + 'static;
|
||||||
|
|
||||||
|
pub fn dispatch_local_effects(&mut self) -> Result<usize, LeafDispatchError<LeafError>>;
|
||||||
|
|
||||||
|
pub fn drain_leaf_actions(&mut self) -> impl Iterator<Item = (LeafId, LeafAction)>;
|
||||||
|
}
|
||||||
|
|
||||||
impl<T> NodeRuntime<T> {
|
impl<T> NodeRuntime<T> {
|
||||||
pub fn register_parent_connection(
|
pub fn register_parent_connection(
|
||||||
&mut self,
|
&mut self,
|
||||||
@@ -190,7 +208,12 @@ Rules:
|
|||||||
unroutable or a route without a registered connection.
|
unroutable or a route without a registered connection.
|
||||||
- Runtime counts per-tick progress, not retained backlog.
|
- Runtime counts per-tick progress, not retained backlog.
|
||||||
- Local events should be dispatched to leaves, not retained forever.
|
- Local events should be dispatched to leaves, not retained forever.
|
||||||
- Until leaf dispatch exists, callers may drain local/dropped effects; outbound sends remain runtime-owned.
|
- `dispatch_local_effects` attempts queued `RuntimeEffect::Local` values in FIFO
|
||||||
|
order, calls the matching leaf callback, records queued `LeafAction` values for
|
||||||
|
later reducer work, and leaves unmatched locals queued for a future attempt.
|
||||||
|
- Dispatch does not consume `SendFrame` or `Dropped` effects. Outbound sends remain
|
||||||
|
runtime-owned, and drop notifications remain available to callers that drain
|
||||||
|
local/drop effects.
|
||||||
- Send failures must not drop unrelated queued effects.
|
- Send failures must not drop unrelated queued effects.
|
||||||
|
|
||||||
## Leaf API
|
## Leaf API
|
||||||
@@ -275,7 +298,7 @@ parent frame for local endpoint
|
|||||||
-> EndpointState validates and returns Local(Call)
|
-> EndpointState validates and returns Local(Call)
|
||||||
-> NodeRuntime dispatches to matching Leaf::on_call
|
-> NodeRuntime dispatches to matching Leaf::on_call
|
||||||
-> leaf queues LeafAction values
|
-> leaf queues LeafAction values
|
||||||
-> runtime validates and applies actions
|
-> runtime retains actions for a later reducer pass
|
||||||
```
|
```
|
||||||
|
|
||||||
### Outbound Leaf Call
|
### Outbound Leaf Call
|
||||||
@@ -302,7 +325,6 @@ connection closes or unregisters
|
|||||||
|
|
||||||
## Known Gaps In The Current Branch
|
## Known Gaps In The Current Branch
|
||||||
|
|
||||||
- `Leaf` is defined but not yet registered or dispatched by `NodeRuntime`.
|
|
||||||
- `LeafAction` values are queued by `LeafContext` but not yet applied by
|
- `LeafAction` values are queued by `LeafContext` but not yet applied by
|
||||||
`NodeRuntime`.
|
`NodeRuntime`.
|
||||||
- Local outbound calls through the runtime are not implemented.
|
- Local outbound calls through the runtime are not implemented.
|
||||||
@@ -314,11 +336,9 @@ connection closes or unregisters
|
|||||||
|
|
||||||
Implement one narrow end-to-end path:
|
Implement one narrow end-to-end path:
|
||||||
|
|
||||||
1. Add a leaf registry to `NodeRuntime`.
|
1. Apply queued `LeafAction::SendHookData` through endpoint packet state.
|
||||||
2. Dispatch `RuntimeEffect::Local(Call)` into `Leaf::on_call`.
|
2. Route the produced frame through `Transport`.
|
||||||
3. Apply `LeafAction::SendHookData` through endpoint packet state.
|
3. Add tests proving a leaf reply is framed and
|
||||||
4. Route the produced frame through `Transport`.
|
|
||||||
5. Add tests proving a local call reaches a leaf and the leaf reply is framed and
|
|
||||||
sent through a registered connection.
|
sent through a registered connection.
|
||||||
|
|
||||||
That slice forces the real architecture to work without overbuilding the rest of
|
That slice forces the real architecture to work without overbuilding the rest of
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
//! Leaf-facing runtime types.
|
//! Leaf-facing runtime types.
|
||||||
|
|
||||||
|
use crate::alloc::boxed::Box;
|
||||||
use crate::alloc::string::String;
|
use crate::alloc::string::String;
|
||||||
use crate::alloc::vec::Vec;
|
use crate::alloc::vec::Vec;
|
||||||
use crate::context::LeafContext;
|
use crate::context::LeafContext;
|
||||||
@@ -108,3 +109,69 @@ pub trait Leaf {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// One leaf handler registered with a runtime-local dispatch key.
|
||||||
|
///
|
||||||
|
/// The id is the packet `dst_leaf` name used by [`unshell_protocol::tree::LocalEvent`]
|
||||||
|
/// call headers. The runtime keeps this intentionally small: it only finds the
|
||||||
|
/// target callback and records requested [`crate::context::LeafAction`] values.
|
||||||
|
pub struct RegisteredLeaf<Error> {
|
||||||
|
id: LeafId,
|
||||||
|
capabilities: LeafCapabilities,
|
||||||
|
handler: Box<dyn Leaf<Error = Error>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Error> RegisteredLeaf<Error> {
|
||||||
|
/// Creates a registered leaf from an explicit dispatch id and handler.
|
||||||
|
#[must_use]
|
||||||
|
pub fn new<L>(id: LeafId, handler: L) -> Self
|
||||||
|
where
|
||||||
|
L: Leaf<Error = Error> + 'static,
|
||||||
|
{
|
||||||
|
let capabilities = handler.capabilities().clone();
|
||||||
|
Self {
|
||||||
|
id,
|
||||||
|
capabilities,
|
||||||
|
handler: Box::new(handler),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the dispatch id used for local packet matching.
|
||||||
|
#[must_use]
|
||||||
|
pub const fn id(&self) -> &LeafId {
|
||||||
|
&self.id
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the capabilities cached at registration time.
|
||||||
|
#[must_use]
|
||||||
|
pub const fn capabilities(&self) -> &LeafCapabilities {
|
||||||
|
&self.capabilities
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns immutable access to the hosted leaf.
|
||||||
|
#[must_use]
|
||||||
|
pub fn handler(&self) -> &dyn Leaf<Error = Error> {
|
||||||
|
self.handler.as_ref()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns mutable access to the hosted leaf.
|
||||||
|
#[must_use]
|
||||||
|
pub fn handler_mut(&mut self) -> &mut dyn Leaf<Error = Error> {
|
||||||
|
self.handler.as_mut()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns all fields needed to invoke a leaf without cloning metadata.
|
||||||
|
pub(crate) fn dispatch_parts_mut(
|
||||||
|
&mut self,
|
||||||
|
) -> (&LeafId, &LeafCapabilities, &mut dyn Leaf<Error = Error>) {
|
||||||
|
(&self.id, &self.capabilities, self.handler.as_mut())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Error> core::fmt::Debug for RegisteredLeaf<Error> {
|
||||||
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||||
|
f.debug_struct("RegisteredLeaf")
|
||||||
|
.field("id", &self.id)
|
||||||
|
.finish_non_exhaustive()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -24,9 +24,10 @@ pub use context::{
|
|||||||
RuntimeCapability,
|
RuntimeCapability,
|
||||||
};
|
};
|
||||||
pub use effects::{EffectQueue, RuntimeEffect};
|
pub use effects::{EffectQueue, RuntimeEffect};
|
||||||
pub use leaf::{Leaf, LeafCapabilities, LeafId, LeafPermissions};
|
pub use leaf::{Leaf, LeafCapabilities, LeafId, LeafPermissions, RegisteredLeaf};
|
||||||
pub use node::{
|
pub use node::{
|
||||||
EndpointState, Node, NodeId, NodeRuntime, NodeRuntimeError, NodeState, TickBudget, TickOutcome,
|
EndpointState, LeafDispatchError, Node, NodeId, NodeRuntime, NodeRuntimeError, NodeState,
|
||||||
|
TickBudget, TickOutcome,
|
||||||
};
|
};
|
||||||
pub use transport::Transport;
|
pub use transport::Transport;
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ pub mod runtime;
|
|||||||
pub mod state;
|
pub mod state;
|
||||||
|
|
||||||
pub use packet::{EndpointState, PacketProcessor};
|
pub use packet::{EndpointState, PacketProcessor};
|
||||||
pub use runtime::{NodeRuntime, NodeRuntimeError, TickBudget, TickOutcome};
|
pub use runtime::{LeafDispatchError, NodeRuntime, NodeRuntimeError, TickBudget, TickOutcome};
|
||||||
pub use state::NodeState;
|
pub use state::NodeState;
|
||||||
|
|
||||||
use crate::alloc::string::String;
|
use crate::alloc::string::String;
|
||||||
|
|||||||
@@ -2,19 +2,24 @@
|
|||||||
//!
|
//!
|
||||||
//! This first slice owns transport and connection metadata, derives ingress from
|
//! This first slice owns transport and connection metadata, derives ingress from
|
||||||
//! registered connections, delegates packet invariants to [`EndpointState`], and
|
//! registered connections, delegates packet invariants to [`EndpointState`], and
|
||||||
//! queues concrete runtime effects. Leaf dispatch and leaf-action application are
|
//! queues concrete runtime effects. Leaf action application is intentionally not
|
||||||
//! intentionally not implemented in this slice.
|
//! implemented in this slice.
|
||||||
|
|
||||||
use crate::alloc::{string::String, vec::Vec};
|
use crate::alloc::{string::String, vec::Vec};
|
||||||
use crate::connections::{
|
use crate::connections::{
|
||||||
Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, ConnectionState,
|
Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, ConnectionState,
|
||||||
Connections, RegisteredConnection,
|
Connections, RegisteredConnection,
|
||||||
};
|
};
|
||||||
|
use crate::context::{LeafAction, LeafContext};
|
||||||
use crate::effects::{EffectQueue, RuntimeEffect};
|
use crate::effects::{EffectQueue, RuntimeEffect};
|
||||||
|
use crate::leaf::{Leaf, LeafId, RegisteredLeaf};
|
||||||
use crate::transport::Transport;
|
use crate::transport::Transport;
|
||||||
use unshell_protocol::FrameBytes;
|
use unshell_protocol::FrameBytes;
|
||||||
use unshell_protocol::tree::ChildRoute;
|
use unshell_protocol::tree::ChildRoute;
|
||||||
use unshell_protocol::tree::{EndpointError, EndpointOutcome, Ingress, RouteDecision};
|
use unshell_protocol::tree::{
|
||||||
|
Endpoint, EndpointError, EndpointOutcome, IncomingCall, IncomingData, IncomingFault, Ingress,
|
||||||
|
LocalEvent, RouteDecision,
|
||||||
|
};
|
||||||
|
|
||||||
use super::{EndpointState, PacketProcessor};
|
use super::{EndpointState, PacketProcessor};
|
||||||
|
|
||||||
@@ -62,6 +67,34 @@ pub enum NodeRuntimeError<TransportError> {
|
|||||||
Transport(TransportError),
|
Transport(TransportError),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Error returned when a leaf callback rejects a local event.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct LeafDispatchError<LeafError> {
|
||||||
|
/// Leaf id that received the event.
|
||||||
|
pub leaf_id: LeafId,
|
||||||
|
/// Callback-specific error returned by the leaf.
|
||||||
|
pub source: LeafError,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<LeafError> core::fmt::Display for LeafDispatchError<LeafError>
|
||||||
|
where
|
||||||
|
LeafError: core::fmt::Display,
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"leaf {} failed during dispatch: {}",
|
||||||
|
self.leaf_id.as_str(),
|
||||||
|
self.source
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<LeafError> core::error::Error for LeafDispatchError<LeafError> where
|
||||||
|
LeafError: core::error::Error + 'static
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
impl<TransportError> core::fmt::Display for NodeRuntimeError<TransportError>
|
impl<TransportError> core::fmt::Display for NodeRuntimeError<TransportError>
|
||||||
where
|
where
|
||||||
TransportError: core::fmt::Display,
|
TransportError: core::fmt::Display,
|
||||||
@@ -85,11 +118,13 @@ impl<TransportError> core::error::Error for NodeRuntimeError<TransportError> whe
|
|||||||
|
|
||||||
/// Runtime owner for one endpoint, transport, and connection table.
|
/// Runtime owner for one endpoint, transport, and connection table.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct NodeRuntime<T> {
|
pub struct NodeRuntime<T, LeafError = core::convert::Infallible> {
|
||||||
endpoint: EndpointState,
|
endpoint: EndpointState,
|
||||||
connections: Connections,
|
connections: Connections,
|
||||||
transport: T,
|
transport: T,
|
||||||
effects: EffectQueue,
|
effects: EffectQueue,
|
||||||
|
leaves: Vec<RegisteredLeaf<LeafError>>,
|
||||||
|
leaf_actions: Vec<(LeafId, LeafAction)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> NodeRuntime<T> {
|
impl<T> NodeRuntime<T> {
|
||||||
@@ -102,6 +137,27 @@ impl<T> NodeRuntime<T> {
|
|||||||
connections,
|
connections,
|
||||||
transport,
|
transport,
|
||||||
effects: EffectQueue::new(),
|
effects: EffectQueue::new(),
|
||||||
|
leaves: Vec::new(),
|
||||||
|
leaf_actions: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, LeafError> NodeRuntime<T, LeafError> {
|
||||||
|
/// Creates a runtime with an explicit leaf callback error type.
|
||||||
|
#[must_use]
|
||||||
|
pub const fn new_with_leaf_error(
|
||||||
|
endpoint: EndpointState,
|
||||||
|
connections: Connections,
|
||||||
|
transport: T,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
endpoint,
|
||||||
|
connections,
|
||||||
|
transport,
|
||||||
|
effects: EffectQueue::new(),
|
||||||
|
leaves: Vec::new(),
|
||||||
|
leaf_actions: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -251,9 +307,170 @@ impl<T> NodeRuntime<T> {
|
|||||||
pub fn drain_local_effects(&mut self) -> impl Iterator<Item = RuntimeEffect> {
|
pub fn drain_local_effects(&mut self) -> impl Iterator<Item = RuntimeEffect> {
|
||||||
self.effects.drain_local()
|
self.effects.drain_local()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Registers a leaf under its declared `leaf_name` dispatch id.
|
||||||
|
///
|
||||||
|
/// If the id already exists, the new handler replaces the previous one. This
|
||||||
|
/// keeps local dispatch deterministic without adding a broader registry API.
|
||||||
|
pub fn register_leaf<L>(&mut self, leaf: L) -> LeafId
|
||||||
|
where
|
||||||
|
L: Leaf<Error = LeafError> + 'static,
|
||||||
|
{
|
||||||
|
let id = LeafId::new(leaf.capabilities().leaf_name.clone());
|
||||||
|
self.register_leaf_as(id.clone(), leaf);
|
||||||
|
id
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Registers a leaf under an explicit dispatch id.
|
||||||
|
///
|
||||||
|
/// This is useful when tests or adapters already hold the exact `dst_leaf`
|
||||||
|
/// string from protocol metadata. Duplicate ids are replaced.
|
||||||
|
pub fn register_leaf_as<L>(&mut self, id: LeafId, leaf: L)
|
||||||
|
where
|
||||||
|
L: Leaf<Error = LeafError> + 'static,
|
||||||
|
{
|
||||||
|
if let Some(existing) = self.leaves.iter_mut().find(|entry| entry.id() == &id) {
|
||||||
|
*existing = RegisteredLeaf::new(id, leaf);
|
||||||
|
} else {
|
||||||
|
self.leaves.push(RegisteredLeaf::new(id, leaf));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns registered leaf handlers.
|
||||||
|
#[must_use]
|
||||||
|
pub fn leaves(&self) -> &[RegisteredLeaf<LeafError>] {
|
||||||
|
&self.leaves
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns leaf actions queued by dispatched callbacks.
|
||||||
|
///
|
||||||
|
/// These actions are intentionally only retained here; reducing them into
|
||||||
|
/// endpoint packets or connection changes belongs to a later runtime slice.
|
||||||
|
#[must_use]
|
||||||
|
pub fn leaf_actions(&self) -> &[(LeafId, LeafAction)] {
|
||||||
|
&self.leaf_actions
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Drains leaf actions queued by dispatched callbacks.
|
||||||
|
pub fn drain_leaf_actions(&mut self) -> impl Iterator<Item = (LeafId, LeafAction)> {
|
||||||
|
let actions = core::mem::take(&mut self.leaf_actions);
|
||||||
|
actions.into_iter()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dispatches currently queued local effects to matching leaf handlers.
|
||||||
|
///
|
||||||
|
/// Local events are attempted in FIFO queue order. A matched event is removed
|
||||||
|
/// only after the leaf callback succeeds. Unmatched local events, outbound
|
||||||
|
/// sends, and drop notifications remain queued for future runtime work.
|
||||||
|
pub fn dispatch_local_effects(&mut self) -> Result<usize, LeafDispatchError<LeafError>> {
|
||||||
|
let mut retained = EffectQueue::new();
|
||||||
|
let mut dispatched = 0usize;
|
||||||
|
let mut pending = core::mem::take(&mut self.effects);
|
||||||
|
let mut drained = pending.drain();
|
||||||
|
|
||||||
|
while let Some(effect) = drained.next() {
|
||||||
|
match effect {
|
||||||
|
RuntimeEffect::Local(event) => {
|
||||||
|
let Some(leaf_index) = self.leaf_index_for_event(&event) else {
|
||||||
|
retained.push(RuntimeEffect::Local(event));
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(error) = self.dispatch_event_to_leaf(leaf_index, &event) {
|
||||||
|
retained.push(RuntimeEffect::Local(event));
|
||||||
|
for remaining in drained {
|
||||||
|
retained.push(remaining);
|
||||||
|
}
|
||||||
|
self.effects = retained;
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
dispatched += 1;
|
||||||
|
}
|
||||||
|
other => retained.push(other),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.effects = retained;
|
||||||
|
Ok(dispatched)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn leaf_index_for_event(&self, event: &LocalEvent) -> Option<usize> {
|
||||||
|
let leaf_name = local_event_leaf_name(event)?;
|
||||||
|
self.leaves
|
||||||
|
.iter()
|
||||||
|
.position(|entry| entry.id().as_str() == leaf_name)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn dispatch_event_to_leaf(
|
||||||
|
&mut self,
|
||||||
|
leaf_index: usize,
|
||||||
|
event: &LocalEvent,
|
||||||
|
) -> Result<(), LeafDispatchError<LeafError>> {
|
||||||
|
let local_path = self.endpoint.endpoint().path();
|
||||||
|
let (leaf_id, actions) = {
|
||||||
|
let leaf = &mut self.leaves[leaf_index];
|
||||||
|
let (leaf_id, capabilities, handler) = leaf.dispatch_parts_mut();
|
||||||
|
let mut ctx = LeafContext::new(local_path, leaf_id, capabilities, &self.connections);
|
||||||
|
|
||||||
|
match event {
|
||||||
|
LocalEvent::Call { header, message } => handler
|
||||||
|
.on_call(
|
||||||
|
&mut ctx,
|
||||||
|
IncomingCall {
|
||||||
|
header: header.clone(),
|
||||||
|
message: message.clone(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.map_err(|source| LeafDispatchError {
|
||||||
|
leaf_id: leaf_id.clone(),
|
||||||
|
source,
|
||||||
|
})?,
|
||||||
|
LocalEvent::Data {
|
||||||
|
header,
|
||||||
|
message,
|
||||||
|
hook_key,
|
||||||
|
} => handler
|
||||||
|
.on_data(
|
||||||
|
&mut ctx,
|
||||||
|
IncomingData {
|
||||||
|
header: header.clone(),
|
||||||
|
message: message.clone(),
|
||||||
|
hook_key: hook_key.clone(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.map_err(|source| LeafDispatchError {
|
||||||
|
leaf_id: leaf_id.clone(),
|
||||||
|
source,
|
||||||
|
})?,
|
||||||
|
LocalEvent::Fault {
|
||||||
|
header,
|
||||||
|
message,
|
||||||
|
hook_key,
|
||||||
|
} => handler
|
||||||
|
.on_fault(
|
||||||
|
&mut ctx,
|
||||||
|
IncomingFault {
|
||||||
|
header: header.clone(),
|
||||||
|
fault: message.clone(),
|
||||||
|
hook_key: hook_key.clone(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.map_err(|source| LeafDispatchError {
|
||||||
|
leaf_id: leaf_id.clone(),
|
||||||
|
source,
|
||||||
|
})?,
|
||||||
|
}
|
||||||
|
|
||||||
|
(leaf_id.clone(), ctx.into_actions())
|
||||||
|
};
|
||||||
|
|
||||||
|
self.leaf_actions
|
||||||
|
.extend(actions.into_iter().map(|action| (leaf_id.clone(), action)));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> NodeRuntime<T>
|
impl<T, LeafError> NodeRuntime<T, LeafError>
|
||||||
where
|
where
|
||||||
T: Transport,
|
T: Transport,
|
||||||
{
|
{
|
||||||
@@ -424,17 +641,33 @@ fn ingress_for(registered: &RegisteredConnection) -> Ingress {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn local_event_leaf_name(event: &LocalEvent) -> Option<&str> {
|
||||||
|
match event {
|
||||||
|
LocalEvent::Call { header, .. }
|
||||||
|
| LocalEvent::Data { header, .. }
|
||||||
|
| LocalEvent::Fault { header, .. } => header.dst_leaf.as_deref(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use core::cell::RefCell;
|
||||||
|
use core::convert::Infallible;
|
||||||
|
|
||||||
|
use crate::alloc::rc::Rc;
|
||||||
use crate::alloc::string::String;
|
use crate::alloc::string::String;
|
||||||
use crate::alloc::vec;
|
use crate::alloc::vec;
|
||||||
use crate::alloc::vec::Vec;
|
use crate::alloc::vec::Vec;
|
||||||
use crate::connections::{
|
use crate::connections::{
|
||||||
Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, Connections,
|
Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, Connections,
|
||||||
};
|
};
|
||||||
|
use crate::context::{LeafAction, OutboundHookData};
|
||||||
use crate::effects::RuntimeEffect;
|
use crate::effects::RuntimeEffect;
|
||||||
|
use crate::leaf::{Leaf, LeafCapabilities, LeafPermissions};
|
||||||
use crate::transport::Transport;
|
use crate::transport::Transport;
|
||||||
use unshell_protocol::tree::{ChildRoute, EndpointError, ProtocolEndpoint};
|
use unshell_protocol::tree::{
|
||||||
|
ChildRoute, EndpointError, IncomingCall, LeafSpec, LocalEvent, ProtocolEndpoint,
|
||||||
|
};
|
||||||
use unshell_protocol::{CallMessage, FrameBytes, PacketHeader, PacketType, encode_packet};
|
use unshell_protocol::{CallMessage, FrameBytes, PacketHeader, PacketType, encode_packet};
|
||||||
|
|
||||||
use super::{EndpointState, NodeRuntime, NodeRuntimeError, TickBudget};
|
use super::{EndpointState, NodeRuntime, NodeRuntimeError, TickBudget};
|
||||||
@@ -469,6 +702,81 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct RecordingLeaf {
|
||||||
|
capabilities: LeafCapabilities,
|
||||||
|
calls: Rc<RefCell<Vec<IncomingCall>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RecordingLeaf {
|
||||||
|
fn new(leaf_name: &str, calls: Rc<RefCell<Vec<IncomingCall>>>) -> Self {
|
||||||
|
Self {
|
||||||
|
capabilities: LeafCapabilities {
|
||||||
|
leaf_name: String::from(leaf_name),
|
||||||
|
procedures: vec![String::from("org.example.v1.echo.invoke")],
|
||||||
|
permissions: LeafPermissions::REPLY_ONLY,
|
||||||
|
},
|
||||||
|
calls,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Leaf for RecordingLeaf {
|
||||||
|
type Error = Infallible;
|
||||||
|
|
||||||
|
fn capabilities(&self) -> &LeafCapabilities {
|
||||||
|
&self.capabilities
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_call(
|
||||||
|
&mut self,
|
||||||
|
ctx: &mut crate::LeafContext<'_>,
|
||||||
|
call: IncomingCall,
|
||||||
|
) -> Result<(), Self::Error> {
|
||||||
|
self.calls.borrow_mut().push(call.clone());
|
||||||
|
ctx.hook_data(OutboundHookData {
|
||||||
|
dst_path: call.header.src_path,
|
||||||
|
hook_id: 7,
|
||||||
|
procedure_id: call.message.procedure_id,
|
||||||
|
payload: vec![1, 2, 3],
|
||||||
|
end_hook: true,
|
||||||
|
})
|
||||||
|
.expect("reply-only leaf can queue hook data");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct FailingLeaf {
|
||||||
|
capabilities: LeafCapabilities,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FailingLeaf {
|
||||||
|
fn new(leaf_name: &str) -> Self {
|
||||||
|
Self {
|
||||||
|
capabilities: LeafCapabilities {
|
||||||
|
leaf_name: String::from(leaf_name),
|
||||||
|
procedures: vec![String::from("org.example.v1.fail.invoke")],
|
||||||
|
permissions: LeafPermissions::REPLY_ONLY,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Leaf for FailingLeaf {
|
||||||
|
type Error = &'static str;
|
||||||
|
|
||||||
|
fn capabilities(&self) -> &LeafCapabilities {
|
||||||
|
&self.capabilities
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_call(
|
||||||
|
&mut self,
|
||||||
|
_ctx: &mut crate::LeafContext<'_>,
|
||||||
|
_call: IncomingCall,
|
||||||
|
) -> Result<(), Self::Error> {
|
||||||
|
Err("leaf failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn tick_derives_ingress_and_sends_forwarded_child_frame() {
|
fn tick_derives_ingress_and_sends_forwarded_child_frame() {
|
||||||
let parent = ConnectionId::new(1);
|
let parent = ConnectionId::new(1);
|
||||||
@@ -929,6 +1237,191 @@ mod tests {
|
|||||||
assert!(matches!(runtime.effects()[0], RuntimeEffect::Local(_)));
|
assert!(matches!(runtime.effects()[0], RuntimeEffect::Local(_)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dispatch_local_call_reaches_registered_leaf() {
|
||||||
|
let parent = ConnectionId::new(1);
|
||||||
|
let mut connections = Connections::new();
|
||||||
|
connections.push(Connection::registered(
|
||||||
|
parent,
|
||||||
|
ConnectionDirection::Parent,
|
||||||
|
vec![],
|
||||||
|
ConnectionGeneration::INITIAL,
|
||||||
|
));
|
||||||
|
|
||||||
|
let leaf_name = "org.example.v1.echo";
|
||||||
|
let endpoint = ProtocolEndpoint::new(
|
||||||
|
vec![String::from("agent")],
|
||||||
|
Some(vec![]),
|
||||||
|
vec![],
|
||||||
|
vec![LeafSpec {
|
||||||
|
name: String::from(leaf_name),
|
||||||
|
procedures: vec![String::from("org.example.v1.echo.invoke")],
|
||||||
|
}],
|
||||||
|
);
|
||||||
|
let frame = encode_packet(
|
||||||
|
&PacketHeader {
|
||||||
|
packet_type: PacketType::Call,
|
||||||
|
src_path: vec![],
|
||||||
|
dst_path: vec![String::from("agent")],
|
||||||
|
dst_leaf: Some(String::from(leaf_name)),
|
||||||
|
hook_id: None,
|
||||||
|
},
|
||||||
|
&CallMessage {
|
||||||
|
procedure_id: String::from("org.example.v1.echo.invoke"),
|
||||||
|
data: vec![9],
|
||||||
|
response_hook: None,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.expect("frame encodes");
|
||||||
|
let calls = Rc::new(RefCell::new(Vec::new()));
|
||||||
|
let mut runtime = NodeRuntime::new(
|
||||||
|
EndpointState::new(endpoint),
|
||||||
|
connections,
|
||||||
|
RecordingTransport::default(),
|
||||||
|
);
|
||||||
|
runtime.register_leaf(RecordingLeaf::new(leaf_name, Rc::clone(&calls)));
|
||||||
|
|
||||||
|
runtime
|
||||||
|
.receive_frame(parent, frame)
|
||||||
|
.expect("frame processes");
|
||||||
|
let dispatched = runtime.dispatch_local_effects().expect("dispatch succeeds");
|
||||||
|
|
||||||
|
assert_eq!(dispatched, 1);
|
||||||
|
assert!(runtime.effects().is_empty());
|
||||||
|
assert_eq!(calls.borrow().len(), 1);
|
||||||
|
assert_eq!(calls.borrow()[0].message.data, [9]);
|
||||||
|
assert_eq!(runtime.leaf_actions().len(), 1);
|
||||||
|
let (action_leaf, action) = &runtime.leaf_actions()[0];
|
||||||
|
assert_eq!(action_leaf.as_str(), leaf_name);
|
||||||
|
let LeafAction::SendHookData(data) = action else {
|
||||||
|
panic!("leaf action should be retained hook data");
|
||||||
|
};
|
||||||
|
assert_eq!(data.dst_path, Vec::<String>::new());
|
||||||
|
assert_eq!(data.hook_id, 7);
|
||||||
|
assert_eq!(data.procedure_id, "org.example.v1.echo.invoke");
|
||||||
|
assert_eq!(data.payload, [1, 2, 3]);
|
||||||
|
assert!(data.end_hook);
|
||||||
|
assert!(runtime.transport().sent.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn unmatched_local_event_remains_queued() {
|
||||||
|
let mut runtime = NodeRuntime::new(
|
||||||
|
EndpointState::new(ProtocolEndpoint::new(
|
||||||
|
vec![String::from("agent")],
|
||||||
|
Some(vec![]),
|
||||||
|
vec![],
|
||||||
|
vec![],
|
||||||
|
)),
|
||||||
|
Connections::new(),
|
||||||
|
RecordingTransport::default(),
|
||||||
|
);
|
||||||
|
runtime.effects.push(RuntimeEffect::Local(LocalEvent::Call {
|
||||||
|
header: PacketHeader {
|
||||||
|
packet_type: PacketType::Call,
|
||||||
|
src_path: vec![],
|
||||||
|
dst_path: vec![String::from("agent")],
|
||||||
|
dst_leaf: Some(String::from("org.example.v1.missing")),
|
||||||
|
hook_id: None,
|
||||||
|
},
|
||||||
|
message: CallMessage {
|
||||||
|
procedure_id: String::from("org.example.v1.missing.invoke"),
|
||||||
|
data: vec![],
|
||||||
|
response_hook: None,
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
let dispatched = runtime.dispatch_local_effects().expect("dispatch succeeds");
|
||||||
|
|
||||||
|
assert_eq!(dispatched, 0);
|
||||||
|
assert_eq!(runtime.effects().len(), 1);
|
||||||
|
assert!(matches!(runtime.effects()[0], RuntimeEffect::Local(_)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn local_dispatch_preserves_send_frame_and_dropped_effects() {
|
||||||
|
let parent = ConnectionId::new(1);
|
||||||
|
let frame = FrameBytes::new();
|
||||||
|
let mut runtime = NodeRuntime::new(
|
||||||
|
EndpointState::new(ProtocolEndpoint::new(
|
||||||
|
vec![String::from("agent")],
|
||||||
|
Some(vec![]),
|
||||||
|
vec![],
|
||||||
|
vec![],
|
||||||
|
)),
|
||||||
|
Connections::new(),
|
||||||
|
RecordingTransport::default(),
|
||||||
|
);
|
||||||
|
runtime.effects.push(RuntimeEffect::SendFrame {
|
||||||
|
connection: parent,
|
||||||
|
generation: ConnectionGeneration::INITIAL,
|
||||||
|
frame,
|
||||||
|
});
|
||||||
|
runtime.effects.push(RuntimeEffect::Dropped);
|
||||||
|
|
||||||
|
let dispatched = runtime.dispatch_local_effects().expect("dispatch succeeds");
|
||||||
|
|
||||||
|
assert_eq!(dispatched, 0);
|
||||||
|
assert_eq!(runtime.effects().len(), 2);
|
||||||
|
assert!(matches!(
|
||||||
|
runtime.effects()[0],
|
||||||
|
RuntimeEffect::SendFrame { .. }
|
||||||
|
));
|
||||||
|
assert!(matches!(runtime.effects()[1], RuntimeEffect::Dropped));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn failed_local_dispatch_preserves_failed_and_remaining_effects() {
|
||||||
|
let parent = ConnectionId::new(1);
|
||||||
|
let leaf_name = "org.example.v1.fail";
|
||||||
|
let mut runtime = NodeRuntime::<_, &'static str>::new_with_leaf_error(
|
||||||
|
EndpointState::new(ProtocolEndpoint::new(
|
||||||
|
vec![String::from("agent")],
|
||||||
|
Some(vec![]),
|
||||||
|
vec![],
|
||||||
|
vec![],
|
||||||
|
)),
|
||||||
|
Connections::new(),
|
||||||
|
RecordingTransport::default(),
|
||||||
|
);
|
||||||
|
runtime.register_leaf(FailingLeaf::new(leaf_name));
|
||||||
|
runtime.effects.push(RuntimeEffect::Local(LocalEvent::Call {
|
||||||
|
header: PacketHeader {
|
||||||
|
packet_type: PacketType::Call,
|
||||||
|
src_path: vec![],
|
||||||
|
dst_path: vec![String::from("agent")],
|
||||||
|
dst_leaf: Some(String::from(leaf_name)),
|
||||||
|
hook_id: None,
|
||||||
|
},
|
||||||
|
message: CallMessage {
|
||||||
|
procedure_id: String::from("org.example.v1.fail.invoke"),
|
||||||
|
data: vec![],
|
||||||
|
response_hook: None,
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
runtime.effects.push(RuntimeEffect::Dropped);
|
||||||
|
runtime.effects.push(RuntimeEffect::SendFrame {
|
||||||
|
connection: parent,
|
||||||
|
generation: ConnectionGeneration::INITIAL,
|
||||||
|
frame: FrameBytes::new(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let error = runtime
|
||||||
|
.dispatch_local_effects()
|
||||||
|
.expect_err("leaf callback failure is returned");
|
||||||
|
|
||||||
|
assert_eq!(error.leaf_id.as_str(), leaf_name);
|
||||||
|
assert_eq!(error.source, "leaf failed");
|
||||||
|
assert!(runtime.leaf_actions().is_empty());
|
||||||
|
assert_eq!(runtime.effects().len(), 3);
|
||||||
|
assert!(matches!(runtime.effects()[0], RuntimeEffect::Local(_)));
|
||||||
|
assert!(matches!(runtime.effects()[1], RuntimeEffect::Dropped));
|
||||||
|
assert!(matches!(
|
||||||
|
runtime.effects()[2],
|
||||||
|
RuntimeEffect::SendFrame { .. }
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn failed_send_preserves_failed_and_unprocessed_effects() {
|
fn failed_send_preserves_failed_and_unprocessed_effects() {
|
||||||
let parent = ConnectionId::new(1);
|
let parent = ConnectionId::new(1);
|
||||||
|
|||||||
Reference in New Issue
Block a user