From 366771356c83d0edb02b178cfcb2651f3b8200e2 Mon Sep 17 00:00:00 2001 From: Michael Mikovsky <77305074+Astatin3@users.noreply.github.com> Date: Thu, 30 Apr 2026 10:23:19 -0600 Subject: [PATCH] Simplify procedure session advancement --- examples/protocol/crossbeam_channel_leaf.rs | 178 +++++++++--------- .../src/protocol/tree/procedure.rs | 74 +++----- 2 files changed, 122 insertions(+), 130 deletions(-) diff --git a/examples/protocol/crossbeam_channel_leaf.rs b/examples/protocol/crossbeam_channel_leaf.rs index dffad2e..5cf8766 100644 --- a/examples/protocol/crossbeam_channel_leaf.rs +++ b/examples/protocol/crossbeam_channel_leaf.rs @@ -18,31 +18,9 @@ use unshell::protocol::tree::{ }; fn main() -> Result<(), Box> { - let (mut agent, root_to_agent) = ChannelNode::new(path(&["agent"])); - let (mut child, agent_to_child) = ChannelNode::new(path(&["agent", "child"])); - let (agent_to_root, root_rx) = unbounded(); + let mut network = ChannelNetwork::new()?; - let mut root = ProtocolEndpoint::new( - Vec::new(), - None, - vec![ChildRoute::registered(path(&["agent"]))], - Vec::new(), - ); - - agent.stage_connection(Vec::new(), agent_to_root); - agent.connect_staged(Vec::new())?; - - child.stage_connection(path(&["agent"]), root_to_agent.clone()); - child.connect_staged(path(&["agent"]))?; - - agent.stage_connection(path(&["agent", "child"]), agent_to_child); - - call_root( - &mut root, - &root_to_agent, - &mut agent, - &mut child, - &root_rx, + network.call_root( path(&["agent"]), CrossbeamChannelLeaf::protocol_procedure_id("add_connection").expect("procedure exists"), encode_call_reply(&ConnectionRequest { @@ -50,12 +28,7 @@ fn main() -> Result<(), Box> { })?, )?; - let reply = call_root( - &mut root, - &root_to_agent, - &mut agent, - &mut child, - &root_rx, + let reply = network.call_root( path(&["agent", "child"]), CrossbeamChannelLeaf::protocol_procedure_id("get_connections").expect("procedure exists"), encode_call_reply(&())?, @@ -68,6 +41,96 @@ fn main() -> Result<(), Box> { Ok(()) } +struct ChannelNetwork { + root: ProtocolEndpoint, + root_to_agent: Sender, + root_rx: Receiver, + agent: ChannelNode, + child: ChannelNode, +} + +impl ChannelNetwork { + fn new() -> Result> { + let (mut agent, root_to_agent) = ChannelNode::new(path(&["agent"])); + let (mut child, agent_to_child) = ChannelNode::new(path(&["agent", "child"])); + let (agent_to_root, root_rx) = unbounded(); + + let root = ProtocolEndpoint::new( + Vec::new(), + None, + vec![ChildRoute::registered(path(&["agent"]))], + Vec::new(), + ); + + agent.stage_connection(Vec::new(), agent_to_root); + agent.connect_staged(Vec::new())?; + + child.stage_connection(path(&["agent"]), root_to_agent.clone()); + child.connect_staged(path(&["agent"]))?; + + agent.stage_connection(path(&["agent", "child"]), agent_to_child); + + Ok(Self { + root, + root_to_agent, + root_rx, + agent, + child, + }) + } + + fn call_root( + &mut self, + dst_path: Vec, + procedure_id: String, + data: Vec, + ) -> Result, Box> { + let hook_id = self.root.allocate_hook_id(); + let outcome = self.root.send_call( + dst_path, + Some(CrossbeamChannelLeaf::protocol_leaf_name()), + procedure_id, + Some(hook_id), + data, + )?; + let EndpointOutcome::Forward { frame, .. } = outcome else { + return Err("root call did not forward".into()); + }; + self.root_to_agent.send(CrossbeamEnvelope { + ingress: Ingress::Parent, + frame, + })?; + + for _ in 0..16 { + let mut progress = 0usize; + progress += self.agent.drain()?; + progress += self.child.drain()?; + + while let Ok(envelope) = self.root_rx.try_recv() { + progress += 1; + let outcome = self.root.receive(&envelope.ingress, envelope.frame)?; + if let EndpointOutcome::Local(event) = outcome { + match event { + unshell::protocol::tree::LocalEvent::Data { message, .. } => { + return Ok(message.data); + } + unshell::protocol::tree::LocalEvent::Fault { message, .. } => { + return Err(format!("routed call faulted: {:?}", message.fault).into()); + } + unshell::protocol::tree::LocalEvent::Call { .. } => {} + } + } + } + + if progress == 0 { + break; + } + } + + Err("timed out waiting for routed reply".into()) + } +} + struct ChannelNode { runtime: LeafRuntime, rx: Receiver, @@ -117,61 +180,6 @@ impl ChannelNode { } } -fn call_root( - root: &mut ProtocolEndpoint, - root_to_agent: &Sender, - agent: &mut ChannelNode, - child: &mut ChannelNode, - root_rx: &Receiver, - dst_path: Vec, - procedure_id: String, - data: Vec, -) -> Result, Box> { - let hook_id = root.allocate_hook_id(); - let outcome = root.send_call( - dst_path, - Some(CrossbeamChannelLeaf::protocol_leaf_name()), - procedure_id, - Some(hook_id), - data, - )?; - let EndpointOutcome::Forward { frame, .. } = outcome else { - return Err("root call did not forward".into()); - }; - root_to_agent.send(CrossbeamEnvelope { - ingress: Ingress::Parent, - frame, - })?; - - for _ in 0..16 { - let mut progress = 0usize; - progress += agent.drain()?; - progress += child.drain()?; - - while let Ok(envelope) = root_rx.try_recv() { - progress += 1; - let outcome = root.receive(&envelope.ingress, envelope.frame)?; - if let EndpointOutcome::Local(event) = outcome { - match event { - unshell::protocol::tree::LocalEvent::Data { message, .. } => { - return Ok(message.data); - } - unshell::protocol::tree::LocalEvent::Fault { message, .. } => { - return Err(format!("routed call faulted: {:?}", message.fault).into()); - } - unshell::protocol::tree::LocalEvent::Call { .. } => {} - } - } - } - - if progress == 0 { - break; - } - } - - Err("timed out waiting for routed reply".into()) -} - fn path(parts: &[&str]) -> Vec { parts.iter().map(|part| (*part).to_owned()).collect() } diff --git a/unshell-protocol/src/protocol/tree/procedure.rs b/unshell-protocol/src/protocol/tree/procedure.rs index 47f9385..c67a266 100644 --- a/unshell-protocol/src/protocol/tree/procedure.rs +++ b/unshell-protocol/src/protocol/tree/procedure.rs @@ -500,10 +500,7 @@ where }; // Collect keys first and temporarily remove each session so procedure callbacks can // mutate the leaf without fighting the session-table borrow. - match self.poll_session(key, session)? { - Some(session_frames) => frames.extend(session_frames), - None => continue, - } + frames.extend(self.poll_session(key, session)?); } Ok(ProcedureRuntimeOutcome { @@ -530,17 +527,29 @@ where } fn poll_session( + &mut self, + key: HookKey, + session: P, + ) -> Result, ProcedureRuntimeError> { + self.advance_session(key, session, P::poll) + } + + fn advance_session( &mut self, key: HookKey, mut session: P, - ) -> Result>, ProcedureRuntimeError> { - let effect = match P::poll(&mut self.leaf, &mut session) { + step: F, + ) -> Result, ProcedureRuntimeError> + where + F: FnOnce(&mut L, &mut P) -> Result, + { + let effect = match step(&mut self.leaf, &mut session) { Ok(effect) => self.ensure_terminal_packet(&key, effect), Err(error) => { let _ = P::close(&mut self.leaf, session); let frames = self.emit_internal_fault(Some(key.clone()))?; let _ = error; - return Ok(Some(frames)); + return Ok(frames); } }; @@ -564,7 +573,7 @@ where let _ = P::close(&mut self.leaf, session); } - Ok(Some(outgoing)) + Ok(outgoing) } fn process_local_event( @@ -628,45 +637,20 @@ where message: crate::protocol::DataMessage, hook_key: HookKey, ) -> Result> { - let Some(mut session) = self.leaf.procedure_sessions().remove(&hook_key) else { + let Some(session) = self.leaf.procedure_sessions().remove(&hook_key) else { return Ok(ProcedureRuntimeOutcome::default()); }; - let effect = match P::on_data( - &mut self.leaf, - &mut session, - IncomingData { - header, - message, - hook_key: hook_key.clone(), - }, - ) { - Ok(effect) => self.ensure_terminal_packet(&hook_key, effect), - Err(error) => { - let _ = P::close(&mut self.leaf, session); - let frames = self.emit_internal_fault(Some(hook_key.clone()))?; - let _ = error; - return Ok(ProcedureRuntimeOutcome { - frames, - dropped: false, - }); - } - }; - let outgoing = match self.emit_outgoing(effect.outgoing) { - Ok(outgoing) => outgoing.frames, - Err(error) => { - if !effect.close_session { - self.leaf.procedure_sessions().insert(hook_key, session); - } else { - let _ = P::close(&mut self.leaf, session); - } - return Err(error); - } - }; - if !effect.close_session { - self.leaf.procedure_sessions().insert(hook_key, session); - } else { - let _ = P::close(&mut self.leaf, session); - } + let outgoing = self.advance_session(hook_key.clone(), session, |leaf, session| { + P::on_data( + leaf, + session, + IncomingData { + header, + message, + hook_key, + }, + ) + })?; Ok(ProcedureRuntimeOutcome { frames: outgoing, dropped: false,