From 99b54b0bdfcc415e8a826f56e259ca65bf09e8bc Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Sat, 9 May 2026 13:06:43 -0600 Subject: [PATCH] Add runtime connection registration helpers --- API.md | 21 +- unshell-runtime/src/connections.rs | 44 +++ unshell-runtime/src/node/runtime.rs | 397 +++++++++++++++++++++++++++- 3 files changed, 459 insertions(+), 3 deletions(-) diff --git a/API.md b/API.md index a30a8d1..e54ae89 100644 --- a/API.md +++ b/API.md @@ -152,6 +152,22 @@ impl NodeRuntime { frame: FrameBytes, ) -> Result<(), NodeRuntimeError>; } + +impl NodeRuntime { + pub fn register_parent_connection( + &mut self, + connection: ConnectionId, + parent_path: Vec, + generation: ConnectionGeneration, + ) -> Result<(), EndpointError>; + + pub fn register_child_connection( + &mut self, + connection: ConnectionId, + child_path: Vec, + generation: ConnectionGeneration, + ) -> Result<(), EndpointError>; +} ``` Runtime flow: @@ -168,6 +184,10 @@ transport poll -> (ConnectionId, FrameBytes) Rules: - Callers never pass `Ingress` into `NodeRuntime`. +- Callers should register parent and child connections through `NodeRuntime` so + route topology and connection metadata are mutated together. Directly changing + only `Connections` or only `EndpointState` can leave a connected peer + unroutable or a route without a registered connection. - Runtime counts per-tick progress, not retained backlog. - Local events should be dispatched to leaves, not retained forever. - Until leaf dispatch exists, callers may drain local/dropped effects; outbound sends remain runtime-owned. @@ -286,7 +306,6 @@ connection closes or unregisters - `LeafAction` values are queued by `LeafContext` but not yet applied by `NodeRuntime`. - Local outbound calls through the runtime are not implemented. -- Connection registration does not yet atomically update endpoint routes. - Disconnect does not yet clean hooks, sessions, route state, and queued effects. - Child ingress still allocates because the existing `Ingress::Child` owns a `Vec`. diff --git a/unshell-runtime/src/connections.rs b/unshell-runtime/src/connections.rs index b4b6fe2..557ffca 100644 --- a/unshell-runtime/src/connections.rs +++ b/unshell-runtime/src/connections.rs @@ -276,6 +276,50 @@ impl Connections { }) }) } + + /// Makes every matching registered connection except `except` unroutable. + pub(crate) fn demote_registered_direction_except( + &mut self, + direction: ConnectionDirection, + except: ConnectionId, + ) { + for entry in &mut self.entries { + let Some(registered) = entry.state().registered() else { + continue; + }; + if entry.id() == except || registered.direction() != direction { + continue; + } + + entry.set_state(ConnectionState::Connected { + generation: registered.generation(), + }); + } + } + + /// Makes every matching registered peer path except `except` unroutable. + pub(crate) fn demote_registered_path_except( + &mut self, + direction: ConnectionDirection, + peer_path: &[String], + except: ConnectionId, + ) { + for entry in &mut self.entries { + let Some(registered) = entry.state().registered() else { + continue; + }; + if entry.id() == except + || registered.direction() != direction + || registered.peer_path() != peer_path + { + continue; + } + + entry.set_state(ConnectionState::Connected { + generation: registered.generation(), + }); + } + } } /// Read-only connection table view exposed to leaf contexts. diff --git a/unshell-runtime/src/node/runtime.rs b/unshell-runtime/src/node/runtime.rs index 903611f..7c18d2a 100644 --- a/unshell-runtime/src/node/runtime.rs +++ b/unshell-runtime/src/node/runtime.rs @@ -5,10 +5,15 @@ //! queues concrete runtime effects. Leaf dispatch and leaf-action application are //! intentionally not implemented in this slice. -use crate::connections::{ConnectionDirection, ConnectionId, Connections, RegisteredConnection}; +use crate::alloc::{string::String, vec::Vec}; +use crate::connections::{ + Connection, ConnectionDirection, ConnectionGeneration, ConnectionId, ConnectionState, + Connections, RegisteredConnection, +}; use crate::effects::{EffectQueue, RuntimeEffect}; use crate::transport::Transport; use unshell_protocol::FrameBytes; +use unshell_protocol::tree::ChildRoute; use unshell_protocol::tree::{EndpointError, EndpointOutcome, Ingress, RouteDecision}; use super::{EndpointState, PacketProcessor}; @@ -136,6 +141,104 @@ impl NodeRuntime { &mut self.transport } + /// Registers or updates the parent connection and endpoint parent route together. + /// + /// Call this instead of mutating [`Connections`] and [`EndpointState`] separately. + /// The endpoint validates that `parent_path` is the direct parent before the + /// connection table is made routable. + pub fn register_parent_connection( + &mut self, + connection: ConnectionId, + parent_path: Vec, + generation: ConnectionGeneration, + ) -> Result<(), EndpointError> { + let previous = self.connections.registered(connection).cloned(); + self.endpoint + .endpoint_mut() + .set_parent_path(Some(parent_path.clone()))?; + + if let Some(previous) = previous + && previous.direction() == ConnectionDirection::Child + { + self.endpoint + .endpoint_mut() + .remove_child_route(previous.peer_path()); + } + + self.upsert_registered_connection( + connection, + ConnectionDirection::Parent, + parent_path.clone(), + generation, + ); + self.connections + .demote_registered_direction_except(ConnectionDirection::Parent, connection); + Ok(()) + } + + /// Registers or updates a child connection and endpoint child route together. + /// + /// Call this instead of mutating [`Connections`] and [`EndpointState`] separately. + /// The endpoint validates that `child_path` is a direct child before the + /// connection table is made routable. + pub fn register_child_connection( + &mut self, + connection: ConnectionId, + child_path: Vec, + generation: ConnectionGeneration, + ) -> Result<(), EndpointError> { + let previous = self.connections.registered(connection).cloned(); + self.endpoint + .endpoint_mut() + .upsert_child_route(ChildRoute::registered(child_path.clone()))?; + + if let Some(previous) = previous { + match previous.direction() { + ConnectionDirection::Parent => { + self.endpoint.endpoint_mut().set_parent_path(None)?; + } + ConnectionDirection::Child if previous.peer_path() != child_path.as_slice() => { + self.endpoint + .endpoint_mut() + .remove_child_route(previous.peer_path()); + } + ConnectionDirection::Child => {} + } + } + + self.upsert_registered_connection( + connection, + ConnectionDirection::Child, + child_path.clone(), + generation, + ); + self.connections.demote_registered_path_except( + ConnectionDirection::Child, + &child_path, + connection, + ); + Ok(()) + } + + fn upsert_registered_connection( + &mut self, + connection: ConnectionId, + direction: ConnectionDirection, + peer_path: Vec, + generation: ConnectionGeneration, + ) { + if let Some(existing) = self.connections.get_mut(connection) { + let state = ConnectionState::Registered(RegisteredConnection::new( + direction, peer_path, generation, + )); + existing.set_state(state); + } else { + self.connections.push(Connection::registered( + connection, direction, peer_path, generation, + )); + } + } + /// Returns currently queued effects. #[must_use] pub fn effects(&self) -> &[RuntimeEffect] { @@ -331,7 +434,7 @@ mod tests { }; use crate::effects::RuntimeEffect; use crate::transport::Transport; - use unshell_protocol::tree::{ChildRoute, ProtocolEndpoint}; + use unshell_protocol::tree::{ChildRoute, EndpointError, ProtocolEndpoint}; use unshell_protocol::{CallMessage, FrameBytes, PacketHeader, PacketType, encode_packet}; use super::{EndpointState, NodeRuntime, NodeRuntimeError, TickBudget}; @@ -425,6 +528,296 @@ mod tests { assert_eq!(runtime.transport().sent[0].0, child); } + #[test] + fn runtime_child_registration_updates_connection_and_route_topology() { + let parent = ConnectionId::new(1); + let child = ConnectionId::new(2); + let mut connections = Connections::new(); + connections.push(Connection::connected(parent, ConnectionGeneration::INITIAL)); + connections.push(Connection::connected(child, ConnectionGeneration::INITIAL)); + + let endpoint = + ProtocolEndpoint::new(vec![String::from("agent")], None, Vec::new(), Vec::new()); + let frame = encode_packet( + &PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent"), String::from("grand")], + dst_leaf: None, + hook_id: None, + }, + &CallMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![], + response_hook: None, + }, + ) + .expect("frame encodes"); + let transport = RecordingTransport { + inbound: Some((parent, frame)), + sent: Vec::new(), + fail_send: false, + }; + let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport); + + runtime + .register_parent_connection(parent, vec![], ConnectionGeneration::INITIAL) + .expect("parent registers"); + runtime + .register_child_connection( + child, + vec![String::from("agent"), String::from("grand")], + ConnectionGeneration::INITIAL, + ) + .expect("child registers"); + + let outcome = runtime.tick(TickBudget::default()).expect("tick succeeds"); + + assert_eq!(outcome.outbound_frames, 1); + assert_eq!(runtime.transport().sent[0].0, child); + assert_eq!( + runtime.endpoint().endpoint().child_routes(), + [ChildRoute::registered(vec![ + String::from("agent"), + String::from("grand") + ])] + ); + } + + #[test] + fn connected_child_without_runtime_registration_is_unroutable() { + let parent = ConnectionId::new(1); + let child = ConnectionId::new(2); + let mut connections = Connections::new(); + connections.push(Connection::connected(parent, ConnectionGeneration::INITIAL)); + connections.push(Connection::connected(child, ConnectionGeneration::INITIAL)); + + let endpoint = ProtocolEndpoint::new( + vec![String::from("agent")], + None, + vec![ChildRoute::registered(vec![ + String::from("agent"), + String::from("grand"), + ])], + Vec::new(), + ); + let frame = encode_packet( + &PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent"), String::from("grand")], + dst_leaf: None, + hook_id: None, + }, + &CallMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![], + response_hook: None, + }, + ) + .expect("frame encodes"); + let transport = RecordingTransport { + inbound: Some((parent, frame)), + sent: Vec::new(), + fail_send: false, + }; + let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport); + runtime + .register_parent_connection(parent, vec![], ConnectionGeneration::INITIAL) + .expect("parent registers"); + + let error = runtime + .tick(TickBudget::default()) + .expect_err("child is not routable"); + + assert!(matches!(error, NodeRuntimeError::MissingRouteConnection)); + assert!(runtime.transport().sent.is_empty()); + assert!(runtime.connections().registered(child).is_none()); + } + + #[test] + fn child_reregistration_removes_old_route() { + let child = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::connected(child, ConnectionGeneration::INITIAL)); + + let endpoint = + ProtocolEndpoint::new(vec![String::from("agent")], None, Vec::new(), Vec::new()); + let transport = RecordingTransport { + inbound: None, + sent: Vec::new(), + fail_send: false, + }; + let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport); + + runtime + .register_child_connection( + child, + vec![String::from("agent"), String::from("old")], + ConnectionGeneration::INITIAL, + ) + .expect("old child registers"); + runtime + .register_child_connection( + child, + vec![String::from("agent"), String::from("new")], + ConnectionGeneration::INITIAL, + ) + .expect("new child registers"); + + assert_eq!( + runtime.endpoint().endpoint().child_routes(), + [ChildRoute::registered(vec![ + String::from("agent"), + String::from("new") + ])] + ); + assert!( + runtime + .connections() + .registered_by_path( + ConnectionDirection::Child, + &[String::from("agent"), String::from("old")], + ) + .is_none() + ); + } + + #[test] + fn replacement_child_registration_demotes_old_peer() { + let parent = ConnectionId::new(1); + let old_child = ConnectionId::new(2); + let new_child = ConnectionId::new(3); + let mut connections = Connections::new(); + connections.push(Connection::connected(parent, ConnectionGeneration::INITIAL)); + connections.push(Connection::connected( + old_child, + ConnectionGeneration::INITIAL, + )); + connections.push(Connection::connected( + new_child, + ConnectionGeneration::INITIAL, + )); + + let endpoint = + ProtocolEndpoint::new(vec![String::from("agent")], None, Vec::new(), Vec::new()); + let transport = RecordingTransport { + inbound: None, + sent: Vec::new(), + fail_send: false, + }; + let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport); + + runtime + .register_parent_connection(parent, vec![], ConnectionGeneration::INITIAL) + .expect("parent registers"); + runtime + .register_child_connection( + old_child, + vec![String::from("agent"), String::from("grand")], + ConnectionGeneration::INITIAL, + ) + .expect("old child registers"); + runtime + .register_child_connection( + new_child, + vec![String::from("agent"), String::from("grand")], + ConnectionGeneration::INITIAL, + ) + .expect("new child replaces old child"); + + let frame = encode_packet( + &PacketHeader { + packet_type: PacketType::Call, + src_path: vec![], + dst_path: vec![String::from("agent"), String::from("grand")], + dst_leaf: None, + hook_id: None, + }, + &CallMessage { + procedure_id: String::from("org.example.v1.echo.invoke"), + data: vec![], + response_hook: None, + }, + ) + .expect("frame encodes"); + runtime.transport_mut().inbound = Some((parent, frame)); + + let outcome = runtime.tick(TickBudget::default()).expect("tick succeeds"); + + assert_eq!(outcome.outbound_frames, 1); + assert_eq!(runtime.transport().sent[0].0, new_child); + assert!(runtime.connections().registered(old_child).is_none()); + } + + #[test] + fn invalid_child_registration_leaves_connection_unregistered() { + let child = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::connected(child, ConnectionGeneration::INITIAL)); + + let endpoint = + ProtocolEndpoint::new(vec![String::from("agent")], None, Vec::new(), Vec::new()); + let transport = RecordingTransport { + inbound: None, + sent: Vec::new(), + fail_send: false, + }; + let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport); + + let error = runtime + .register_child_connection( + child, + vec![String::from("other"), String::from("kid")], + ConnectionGeneration::INITIAL, + ) + .expect_err("invalid child path is rejected"); + + assert!(matches!(error, EndpointError::Validation(_))); + assert!(runtime.connections().registered(child).is_none()); + assert!(runtime.endpoint().endpoint().child_routes().is_empty()); + } + + #[test] + fn invalid_child_reregistration_preserves_existing_registration() { + let child = ConnectionId::new(1); + let mut connections = Connections::new(); + connections.push(Connection::connected(child, ConnectionGeneration::INITIAL)); + + let endpoint = + ProtocolEndpoint::new(vec![String::from("agent")], None, Vec::new(), Vec::new()); + let transport = RecordingTransport { + inbound: None, + sent: Vec::new(), + fail_send: false, + }; + let mut runtime = NodeRuntime::new(EndpointState::new(endpoint), connections, transport); + let valid_path = vec![String::from("agent"), String::from("kid")]; + + runtime + .register_child_connection(child, valid_path.clone(), ConnectionGeneration::INITIAL) + .expect("initial child registers"); + + let error = runtime + .register_child_connection( + child, + vec![String::from("other"), String::from("kid")], + ConnectionGeneration::INITIAL.next(), + ) + .expect_err("invalid replacement path is rejected"); + + assert!(matches!(error, EndpointError::Validation(_))); + let registered = runtime + .connections() + .registered(child) + .expect("original child remains registered"); + assert_eq!(registered.peer_path(), valid_path); + assert_eq!( + runtime.endpoint().endpoint().child_routes(), + [ChildRoute::registered(valid_path)] + ); + } + #[test] fn child_route_decision_uses_registered_child_order() { let parent = ConnectionId::new(1);