//! Packet builders and endpoint construction. use alloc::{collections::BTreeSet, string::String, vec::Vec}; use crate::protocol::tree::{ActiveHook, HookKey}; use crate::protocol::{ CallMessage, DataMessage, FrameBytes, HookTarget, PacketHeader, PacketType, ValidationError, encode_packet, validate_call, validate_header, validate_procedure_id, }; use super::super::{CompiledRoutes, RouteDecision}; use super::core::{ChildRoute, EndpointError, EndpointOutcome, ProtocolEndpoint}; use crate::protocol::tree::LeafSpec; impl ProtocolEndpoint { fn prepare_call( &self, dst_path: Vec, dst_leaf: Option, procedure_id: impl Into, response_hook_id: Option, data: Vec, ) -> Result<(PacketHeader, CallMessage), EndpointError> { let procedure_id = procedure_id.into(); validate_procedure_id(&procedure_id)?; let response_hook = response_hook_id.map(|hook_id| HookTarget { hook_id, return_path: self.path.clone(), }); let header = PacketHeader { packet_type: PacketType::Call, src_path: self.path.clone(), dst_path, dst_leaf, hook_id: None, }; let call = CallMessage { procedure_id, data, response_hook, }; validate_header(&header)?; validate_call(&header, &call)?; Ok((header, call)) } fn prepare_data( &self, dst_path: Vec, hook_id: u64, procedure_id: impl Into, data: Vec, end_hook: bool, ) -> Result<(PacketHeader, DataMessage), EndpointError> { let procedure_id = procedure_id.into(); validate_procedure_id(&procedure_id)?; let header = PacketHeader { packet_type: PacketType::Data, src_path: self.path.clone(), dst_path, dst_leaf: None, hook_id: Some(hook_id), }; let message = DataMessage { procedure_id, data, end_hook, }; validate_header(&header)?; Ok((header, message)) } fn register_outbound_call_hook( &mut self, header: &PacketHeader, call: &CallMessage, ) -> Result<(), EndpointError> { // Outbound calls reserve their response hook before the frame is emitted so // the endpoint can accept a synchronous local response path as well as a // remote one. if let Some(hook) = &call.response_hook && self .hooks .insert_active(ActiveHook { return_path: hook.return_path.clone(), hook_id: hook.hook_id, peer_path: header.dst_path.clone(), procedure_id: call.procedure_id.clone(), dst_leaf: header.dst_leaf.clone(), local_ended: false, peer_ended: false, }) .is_err() { return Err(EndpointError::Validation(ValidationError::InvalidHookId)); } Ok(()) } #[must_use] /// Creates an endpoint with compiled routing tables for its current topology. pub fn new( path: Vec, parent_path: Option>, children: Vec, leaves: Vec, ) -> Self { let registered_child_paths = children .iter() .filter(|child| child.state == super::core::ConnectionState::Registered) .map(|child| child.path.clone()) .collect::>(); Self { routing: CompiledRoutes::new(&path, ®istered_child_paths, parent_path.is_some()), path, children, leaves: leaves .into_iter() .map(|leaf| (leaf.name.clone(), leaf)) .collect(), endpoint_procedures: BTreeSet::new(), hooks: Default::default(), } } /// Registers a procedure that is handled directly by the endpoint. pub fn add_endpoint_procedure( &mut self, procedure_id: impl Into, ) -> Result<(), EndpointError> { let procedure_id = procedure_id.into(); validate_procedure_id(&procedure_id)?; self.endpoint_procedures.insert(procedure_id); Ok(()) } #[must_use] /// Allocates a hook id scoped to this endpoint path. pub fn allocate_hook_id(&mut self) -> u64 { self.hooks.allocate_hook_id(&self.path) } /// Encodes a call frame without routing it through the local endpoint. pub fn make_call( &mut self, dst_path: Vec, dst_leaf: Option, procedure_id: impl Into, response_hook_id: Option, data: Vec, ) -> Result { let (header, call) = self.prepare_call(dst_path, dst_leaf, procedure_id, response_hook_id, data)?; self.register_outbound_call_hook(&header, &call)?; Ok(encode_packet(&header, &call)?) } /// Builds and immediately routes a call, producing either a forward or a local event. pub fn send_call( &mut self, dst_path: Vec, dst_leaf: Option, procedure_id: impl Into, response_hook_id: Option, data: Vec, ) -> Result { let (header, call) = self.prepare_call(dst_path, dst_leaf, procedure_id, response_hook_id, data)?; self.register_outbound_call_hook(&header, &call)?; match self.decide_route(&header.dst_path) { RouteDecision::Local => self.handle_local_call(header, call), route => Ok(EndpointOutcome::forward( route, encode_packet(&header, &call)?, )), } } /// Encodes a data frame without routing it through the local endpoint. pub fn make_data( &self, dst_path: Vec, hook_id: u64, procedure_id: impl Into, data: Vec, end_hook: bool, ) -> Result { let (header, message) = self.prepare_data(dst_path, hook_id, procedure_id, data, end_hook)?; Ok(encode_packet(&header, &message)?) } /// Builds and immediately routes a data packet, updating local hook state for end-of-stream. pub fn send_data( &mut self, dst_path: Vec, hook_id: u64, procedure_id: impl Into, data: Vec, end_hook: bool, ) -> Result { let (header, message) = self.prepare_data(dst_path, hook_id, procedure_id, data, end_hook)?; if end_hook { // Locally-originated streams may not have been resolved against a peer yet, // so fall back to the endpoint's own hook key shape when closing them. let local_hook_key = self .hooks .resolve_active_key(&self.path, hook_id, &self.path) .unwrap_or_else(|| HookKey::new(self.path.clone(), hook_id)); if self.hooks.mark_local_end(&local_hook_key) { self.hooks.remove_active(&local_hook_key); } } match self.decide_route(&header.dst_path) { RouteDecision::Local => self.handle_local_data(header, message), route => Ok(EndpointOutcome::forward( route, encode_packet(&header, &message)?, )), } } }