Add runtime connection registration helpers

This commit is contained in:
Michael Mikovsky
2026-05-09 13:06:43 -06:00
parent fcde5d66d2
commit 99b54b0bdf
3 changed files with 459 additions and 3 deletions
+20 -1
View File
@@ -152,6 +152,22 @@ impl<T: Transport> NodeRuntime<T> {
frame: FrameBytes,
) -> Result<(), NodeRuntimeError<T::Error>>;
}
impl<T> NodeRuntime<T> {
pub fn register_parent_connection(
&mut self,
connection: ConnectionId,
parent_path: Vec<String>,
generation: ConnectionGeneration,
) -> Result<(), EndpointError>;
pub fn register_child_connection(
&mut self,
connection: ConnectionId,
child_path: Vec<String>,
generation: ConnectionGeneration,
) -> Result<(), EndpointError>;
}
```
Runtime flow:
@@ -168,6 +184,10 @@ transport poll -> (ConnectionId, FrameBytes)
Rules:
- Callers never pass `Ingress` into `NodeRuntime`.
- Callers should register parent and child connections through `NodeRuntime` so
route topology and connection metadata are mutated together. Directly changing
only `Connections` or only `EndpointState` can leave a connected peer
unroutable or a route without a registered connection.
- Runtime counts per-tick progress, not retained backlog.
- Local events should be dispatched to leaves, not retained forever.
- Until leaf dispatch exists, callers may drain local/dropped effects; outbound sends remain runtime-owned.
@@ -286,7 +306,6 @@ connection closes or unregisters
- `LeafAction` values are queued by `LeafContext` but not yet applied by
`NodeRuntime`.
- Local outbound calls through the runtime are not implemented.
- Connection registration does not yet atomically update endpoint routes.
- Disconnect does not yet clean hooks, sessions, route state, and queued effects.
- Child ingress still allocates because the existing `Ingress::Child` owns a
`Vec<String>`.
+44
View File
@@ -276,6 +276,50 @@ impl Connections {
})
})
}
/// Makes every matching registered connection except `except` unroutable.
pub(crate) fn demote_registered_direction_except(
&mut self,
direction: ConnectionDirection,
except: ConnectionId,
) {
for entry in &mut self.entries {
let Some(registered) = entry.state().registered() else {
continue;
};
if entry.id() == except || registered.direction() != direction {
continue;
}
entry.set_state(ConnectionState::Connected {
generation: registered.generation(),
});
}
}
/// Makes every matching registered peer path except `except` unroutable.
pub(crate) fn demote_registered_path_except(
&mut self,
direction: ConnectionDirection,
peer_path: &[String],
except: ConnectionId,
) {
for entry in &mut self.entries {
let Some(registered) = entry.state().registered() else {
continue;
};
if entry.id() == except
|| registered.direction() != direction
|| registered.peer_path() != peer_path
{
continue;
}
entry.set_state(ConnectionState::Connected {
generation: registered.generation(),
});
}
}
}
/// Read-only connection table view exposed to leaf contexts.
+395 -2
View File
@@ -5,10 +5,15 @@
//! queues concrete runtime effects. Leaf dispatch and leaf-action application are
//! intentionally not implemented in this slice.
use crate::connections::{ConnectionDirection, ConnectionId, Connections, RegisteredConnection};
use crate::alloc::{string::String, vec::Vec};
use crate::connections::{
Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, ConnectionState,
Connections, RegisteredConnection,
};
use crate::effects::{EffectQueue, RuntimeEffect};
use crate::transport::Transport;
use unshell_protocol::FrameBytes;
use unshell_protocol::tree::ChildRoute;
use unshell_protocol::tree::{EndpointError, EndpointOutcome, Ingress, RouteDecision};
use super::{EndpointState, PacketProcessor};
@@ -136,6 +141,104 @@ impl<T> NodeRuntime<T> {
&mut self.transport
}
/// Registers or updates the parent connection and endpoint parent route together.
///
/// Call this instead of mutating [`Connections`] and [`EndpointState`] separately.
/// The endpoint validates that `parent_path` is the direct parent before the
/// connection table is made routable.
pub fn register_parent_connection(
&mut self,
connection: ConnectionId,
parent_path: Vec<String>,
generation: ConnectionGeneration,
) -> Result<(), EndpointError> {
let previous = self.connections.registered(connection).cloned();
self.endpoint
.endpoint_mut()
.set_parent_path(Some(parent_path.clone()))?;
if let Some(previous) = previous
&& previous.direction() == ConnectionDirection::Child
{
self.endpoint
.endpoint_mut()
.remove_child_route(previous.peer_path());
}
self.upsert_registered_connection(
connection,
ConnectionDirection::Parent,
parent_path.clone(),
generation,
);
self.connections
.demote_registered_direction_except(ConnectionDirection::Parent, connection);
Ok(())
}
/// Registers or updates a child connection and endpoint child route together.
///
/// Call this instead of mutating [`Connections`] and [`EndpointState`] separately.
/// The endpoint validates that `child_path` is a direct child before the
/// connection table is made routable.
pub fn register_child_connection(
&mut self,
connection: ConnectionId,
child_path: Vec<String>,
generation: ConnectionGeneration,
) -> Result<(), EndpointError> {
let previous = self.connections.registered(connection).cloned();
self.endpoint
.endpoint_mut()
.upsert_child_route(ChildRoute::registered(child_path.clone()))?;
if let Some(previous) = previous {
match previous.direction() {
ConnectionDirection::Parent => {
self.endpoint.endpoint_mut().set_parent_path(None)?;
}
ConnectionDirection::Child if previous.peer_path() != child_path.as_slice() => {
self.endpoint
.endpoint_mut()
.remove_child_route(previous.peer_path());
}
ConnectionDirection::Child => {}
}
}
self.upsert_registered_connection(
connection,
ConnectionDirection::Child,
child_path.clone(),
generation,
);
self.connections.demote_registered_path_except(
ConnectionDirection::Child,
&child_path,
connection,
);
Ok(())
}
fn upsert_registered_connection(
&mut self,
connection: ConnectionId,
direction: ConnectionDirection,
peer_path: Vec<String>,
generation: ConnectionGeneration,
) {
if let Some(existing) = self.connections.get_mut(connection) {
let state = ConnectionState::Registered(RegisteredConnection::new(
direction, peer_path, generation,
));
existing.set_state(state);
} else {
self.connections.push(Connection::registered(
connection, direction, peer_path, generation,
));
}
}
/// Returns currently queued effects.
#[must_use]
pub fn effects(&self) -> &[RuntimeEffect] {
@@ -331,7 +434,7 @@ mod tests {
};
use crate::effects::RuntimeEffect;
use crate::transport::Transport;
use unshell_protocol::tree::{ChildRoute, ProtocolEndpoint};
use unshell_protocol::tree::{ChildRoute, EndpointError, ProtocolEndpoint};
use unshell_protocol::{CallMessage, FrameBytes, PacketHeader, PacketType, encode_packet};
use super::{EndpointState, NodeRuntime, NodeRuntimeError, TickBudget};
@@ -425,6 +528,296 @@ mod tests {
assert_eq!(runtime.transport().sent[0].0, child);
}
#[test]
fn runtime_child_registration_updates_connection_and_route_topology() {
let parent = ConnectionId::new(1);
let child = ConnectionId::new(2);
let mut connections = Connections::new();
connections.push(Connection::connected(parent, ConnectionGeneration::INITIAL));
connections.push(Connection::connected(child, ConnectionGeneration::INITIAL));
let endpoint =
ProtocolEndpoint::new(vec![String::from("agent")], None, Vec::new(), Vec::new());
let frame = encode_packet(
&PacketHeader {
packet_type: PacketType::Call,
src_path: vec![],
dst_path: vec![String::from("agent"), String::from("grand")],
dst_leaf: None,
hook_id: None,
},
&CallMessage {
procedure_id: String::from("org.example.v1.echo.invoke"),
data: vec![],
response_hook: None,
},
)
.expect("frame encodes");
let transport = RecordingTransport {
inbound: Some((parent, frame)),
sent: Vec::new(),
fail_send: false,
};
let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport);
runtime
.register_parent_connection(parent, vec![], ConnectionGeneration::INITIAL)
.expect("parent registers");
runtime
.register_child_connection(
child,
vec![String::from("agent"), String::from("grand")],
ConnectionGeneration::INITIAL,
)
.expect("child registers");
let outcome = runtime.tick(TickBudget::default()).expect("tick succeeds");
assert_eq!(outcome.outbound_frames, 1);
assert_eq!(runtime.transport().sent[0].0, child);
assert_eq!(
runtime.endpoint().endpoint().child_routes(),
[ChildRoute::registered(vec![
String::from("agent"),
String::from("grand")
])]
);
}
#[test]
fn connected_child_without_runtime_registration_is_unroutable() {
let parent = ConnectionId::new(1);
let child = ConnectionId::new(2);
let mut connections = Connections::new();
connections.push(Connection::connected(parent, ConnectionGeneration::INITIAL));
connections.push(Connection::connected(child, ConnectionGeneration::INITIAL));
let endpoint = ProtocolEndpoint::new(
vec![String::from("agent")],
None,
vec![ChildRoute::registered(vec![
String::from("agent"),
String::from("grand"),
])],
Vec::new(),
);
let frame = encode_packet(
&PacketHeader {
packet_type: PacketType::Call,
src_path: vec![],
dst_path: vec![String::from("agent"), String::from("grand")],
dst_leaf: None,
hook_id: None,
},
&CallMessage {
procedure_id: String::from("org.example.v1.echo.invoke"),
data: vec![],
response_hook: None,
},
)
.expect("frame encodes");
let transport = RecordingTransport {
inbound: Some((parent, frame)),
sent: Vec::new(),
fail_send: false,
};
let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport);
runtime
.register_parent_connection(parent, vec![], ConnectionGeneration::INITIAL)
.expect("parent registers");
let error = runtime
.tick(TickBudget::default())
.expect_err("child is not routable");
assert!(matches!(error, NodeRuntimeError::MissingRouteConnection));
assert!(runtime.transport().sent.is_empty());
assert!(runtime.connections().registered(child).is_none());
}
#[test]
fn child_reregistration_removes_old_route() {
let child = ConnectionId::new(1);
let mut connections = Connections::new();
connections.push(Connection::connected(child, ConnectionGeneration::INITIAL));
let endpoint =
ProtocolEndpoint::new(vec![String::from("agent")], None, Vec::new(), Vec::new());
let transport = RecordingTransport {
inbound: None,
sent: Vec::new(),
fail_send: false,
};
let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport);
runtime
.register_child_connection(
child,
vec![String::from("agent"), String::from("old")],
ConnectionGeneration::INITIAL,
)
.expect("old child registers");
runtime
.register_child_connection(
child,
vec![String::from("agent"), String::from("new")],
ConnectionGeneration::INITIAL,
)
.expect("new child registers");
assert_eq!(
runtime.endpoint().endpoint().child_routes(),
[ChildRoute::registered(vec![
String::from("agent"),
String::from("new")
])]
);
assert!(
runtime
.connections()
.registered_by_path(
ConnectionDirection::Child,
&[String::from("agent"), String::from("old")],
)
.is_none()
);
}
#[test]
fn replacement_child_registration_demotes_old_peer() {
let parent = ConnectionId::new(1);
let old_child = ConnectionId::new(2);
let new_child = ConnectionId::new(3);
let mut connections = Connections::new();
connections.push(Connection::connected(parent, ConnectionGeneration::INITIAL));
connections.push(Connection::connected(
old_child,
ConnectionGeneration::INITIAL,
));
connections.push(Connection::connected(
new_child,
ConnectionGeneration::INITIAL,
));
let endpoint =
ProtocolEndpoint::new(vec![String::from("agent")], None, Vec::new(), Vec::new());
let transport = RecordingTransport {
inbound: None,
sent: Vec::new(),
fail_send: false,
};
let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport);
runtime
.register_parent_connection(parent, vec![], ConnectionGeneration::INITIAL)
.expect("parent registers");
runtime
.register_child_connection(
old_child,
vec![String::from("agent"), String::from("grand")],
ConnectionGeneration::INITIAL,
)
.expect("old child registers");
runtime
.register_child_connection(
new_child,
vec![String::from("agent"), String::from("grand")],
ConnectionGeneration::INITIAL,
)
.expect("new child replaces old child");
let frame = encode_packet(
&PacketHeader {
packet_type: PacketType::Call,
src_path: vec![],
dst_path: vec![String::from("agent"), String::from("grand")],
dst_leaf: None,
hook_id: None,
},
&CallMessage {
procedure_id: String::from("org.example.v1.echo.invoke"),
data: vec![],
response_hook: None,
},
)
.expect("frame encodes");
runtime.transport_mut().inbound = Some((parent, frame));
let outcome = runtime.tick(TickBudget::default()).expect("tick succeeds");
assert_eq!(outcome.outbound_frames, 1);
assert_eq!(runtime.transport().sent[0].0, new_child);
assert!(runtime.connections().registered(old_child).is_none());
}
#[test]
fn invalid_child_registration_leaves_connection_unregistered() {
let child = ConnectionId::new(1);
let mut connections = Connections::new();
connections.push(Connection::connected(child, ConnectionGeneration::INITIAL));
let endpoint =
ProtocolEndpoint::new(vec![String::from("agent")], None, Vec::new(), Vec::new());
let transport = RecordingTransport {
inbound: None,
sent: Vec::new(),
fail_send: false,
};
let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport);
let error = runtime
.register_child_connection(
child,
vec![String::from("other"), String::from("kid")],
ConnectionGeneration::INITIAL,
)
.expect_err("invalid child path is rejected");
assert!(matches!(error, EndpointError::Validation(_)));
assert!(runtime.connections().registered(child).is_none());
assert!(runtime.endpoint().endpoint().child_routes().is_empty());
}
#[test]
fn invalid_child_reregistration_preserves_existing_registration() {
let child = ConnectionId::new(1);
let mut connections = Connections::new();
connections.push(Connection::connected(child, ConnectionGeneration::INITIAL));
let endpoint =
ProtocolEndpoint::new(vec![String::from("agent")], None, Vec::new(), Vec::new());
let transport = RecordingTransport {
inbound: None,
sent: Vec::new(),
fail_send: false,
};
let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport);
let valid_path = vec![String::from("agent"), String::from("kid")];
runtime
.register_child_connection(child, valid_path.clone(), ConnectionGeneration::INITIAL)
.expect("initial child registers");
let error = runtime
.register_child_connection(
child,
vec![String::from("other"), String::from("kid")],
ConnectionGeneration::INITIAL.next(),
)
.expect_err("invalid replacement path is rejected");
assert!(matches!(error, EndpointError::Validation(_)));
let registered = runtime
.connections()
.registered(child)
.expect("original child remains registered");
assert_eq!(registered.peer_path(), valid_path);
assert_eq!(
runtime.endpoint().endpoint().child_routes(),
[ChildRoute::registered(valid_path)]
);
}
#[test]
fn child_route_decision_uses_registered_child_order() {
let parent = ConnectionId::new(1);