Simplify procedure session advancement

This commit is contained in:
Michael Mikovsky
2026-04-30 10:23:19 -06:00
parent 7c0ee31d38
commit 366771356c
2 changed files with 122 additions and 130 deletions
+89 -81
View File
@@ -18,11 +18,44 @@ use unshell::protocol::tree::{
};
fn main() -> Result<(), Box<dyn Error>> {
let mut network = ChannelNetwork::new()?;
network.call_root(
path(&["agent"]),
CrossbeamChannelLeaf::protocol_procedure_id("add_connection").expect("procedure exists"),
encode_call_reply(&ConnectionRequest {
peer_path: path(&["agent", "child"]),
})?,
)?;
let reply = network.call_root(
path(&["agent", "child"]),
CrossbeamChannelLeaf::protocol_procedure_id("get_connections").expect("procedure exists"),
encode_call_reply(&())?,
)?;
let snapshot = decode_call_input::<ConnectionSnapshot>(reply.as_slice())?;
println!("child parent: {:?}", snapshot.parent);
println!("child children: {:?}", snapshot.children);
Ok(())
}
struct ChannelNetwork {
root: ProtocolEndpoint,
root_to_agent: Sender<CrossbeamEnvelope>,
root_rx: Receiver<CrossbeamEnvelope>,
agent: ChannelNode,
child: ChannelNode,
}
impl ChannelNetwork {
fn new() -> Result<Self, Box<dyn Error>> {
let (mut agent, root_to_agent) = ChannelNode::new(path(&["agent"]));
let (mut child, agent_to_child) = ChannelNode::new(path(&["agent", "child"]));
let (agent_to_root, root_rx) = unbounded();
let mut root = ProtocolEndpoint::new(
let root = ProtocolEndpoint::new(
Vec::new(),
None,
vec![ChildRoute::registered(path(&["agent"]))],
@@ -37,35 +70,65 @@ fn main() -> Result<(), Box<dyn Error>> {
agent.stage_connection(path(&["agent", "child"]), agent_to_child);
call_root(
&mut root,
&root_to_agent,
&mut agent,
&mut child,
&root_rx,
path(&["agent"]),
CrossbeamChannelLeaf::protocol_procedure_id("add_connection").expect("procedure exists"),
encode_call_reply(&ConnectionRequest {
peer_path: path(&["agent", "child"]),
})?,
Ok(Self {
root,
root_to_agent,
root_rx,
agent,
child,
})
}
fn call_root(
&mut self,
dst_path: Vec<String>,
procedure_id: String,
data: Vec<u8>,
) -> Result<Vec<u8>, Box<dyn Error>> {
let hook_id = self.root.allocate_hook_id();
let outcome = self.root.send_call(
dst_path,
Some(CrossbeamChannelLeaf::protocol_leaf_name()),
procedure_id,
Some(hook_id),
data,
)?;
let EndpointOutcome::Forward { frame, .. } = outcome else {
return Err("root call did not forward".into());
};
self.root_to_agent.send(CrossbeamEnvelope {
ingress: Ingress::Parent,
frame,
})?;
let reply = call_root(
&mut root,
&root_to_agent,
&mut agent,
&mut child,
&root_rx,
path(&["agent", "child"]),
CrossbeamChannelLeaf::protocol_procedure_id("get_connections").expect("procedure exists"),
encode_call_reply(&())?,
)?;
let snapshot = decode_call_input::<ConnectionSnapshot>(reply.as_slice())?;
for _ in 0..16 {
let mut progress = 0usize;
progress += self.agent.drain()?;
progress += self.child.drain()?;
println!("child parent: {:?}", snapshot.parent);
println!("child children: {:?}", snapshot.children);
while let Ok(envelope) = self.root_rx.try_recv() {
progress += 1;
let outcome = self.root.receive(&envelope.ingress, envelope.frame)?;
if let EndpointOutcome::Local(event) = outcome {
match event {
unshell::protocol::tree::LocalEvent::Data { message, .. } => {
return Ok(message.data);
}
unshell::protocol::tree::LocalEvent::Fault { message, .. } => {
return Err(format!("routed call faulted: {:?}", message.fault).into());
}
unshell::protocol::tree::LocalEvent::Call { .. } => {}
}
}
}
Ok(())
if progress == 0 {
break;
}
}
Err("timed out waiting for routed reply".into())
}
}
struct ChannelNode {
@@ -117,61 +180,6 @@ impl ChannelNode {
}
}
fn call_root(
root: &mut ProtocolEndpoint,
root_to_agent: &Sender<CrossbeamEnvelope>,
agent: &mut ChannelNode,
child: &mut ChannelNode,
root_rx: &Receiver<CrossbeamEnvelope>,
dst_path: Vec<String>,
procedure_id: String,
data: Vec<u8>,
) -> Result<Vec<u8>, Box<dyn Error>> {
let hook_id = root.allocate_hook_id();
let outcome = root.send_call(
dst_path,
Some(CrossbeamChannelLeaf::protocol_leaf_name()),
procedure_id,
Some(hook_id),
data,
)?;
let EndpointOutcome::Forward { frame, .. } = outcome else {
return Err("root call did not forward".into());
};
root_to_agent.send(CrossbeamEnvelope {
ingress: Ingress::Parent,
frame,
})?;
for _ in 0..16 {
let mut progress = 0usize;
progress += agent.drain()?;
progress += child.drain()?;
while let Ok(envelope) = root_rx.try_recv() {
progress += 1;
let outcome = root.receive(&envelope.ingress, envelope.frame)?;
if let EndpointOutcome::Local(event) = outcome {
match event {
unshell::protocol::tree::LocalEvent::Data { message, .. } => {
return Ok(message.data);
}
unshell::protocol::tree::LocalEvent::Fault { message, .. } => {
return Err(format!("routed call faulted: {:?}", message.fault).into());
}
unshell::protocol::tree::LocalEvent::Call { .. } => {}
}
}
}
if progress == 0 {
break;
}
}
Err("timed out waiting for routed reply".into())
}
fn path(parts: &[&str]) -> Vec<String> {
parts.iter().map(|part| (*part).to_owned()).collect()
}
+25 -41
View File
@@ -500,10 +500,7 @@ where
};
// Collect keys first and temporarily remove each session so procedure callbacks can
// mutate the leaf without fighting the session-table borrow.
match self.poll_session(key, session)? {
Some(session_frames) => frames.extend(session_frames),
None => continue,
}
frames.extend(self.poll_session(key, session)?);
}
Ok(ProcedureRuntimeOutcome {
@@ -530,17 +527,29 @@ where
}
fn poll_session(
&mut self,
key: HookKey,
session: P,
) -> Result<Vec<FrameBytes>, ProcedureRuntimeError<P::Error>> {
self.advance_session(key, session, P::poll)
}
fn advance_session<F>(
&mut self,
key: HookKey,
mut session: P,
) -> Result<Option<Vec<FrameBytes>>, ProcedureRuntimeError<P::Error>> {
let effect = match P::poll(&mut self.leaf, &mut session) {
step: F,
) -> Result<Vec<FrameBytes>, ProcedureRuntimeError<P::Error>>
where
F: FnOnce(&mut L, &mut P) -> Result<ProcedureEffect, P::Error>,
{
let effect = match step(&mut self.leaf, &mut session) {
Ok(effect) => self.ensure_terminal_packet(&key, effect),
Err(error) => {
let _ = P::close(&mut self.leaf, session);
let frames = self.emit_internal_fault(Some(key.clone()))?;
let _ = error;
return Ok(Some(frames));
return Ok(frames);
}
};
@@ -564,7 +573,7 @@ where
let _ = P::close(&mut self.leaf, session);
}
Ok(Some(outgoing))
Ok(outgoing)
}
fn process_local_event(
@@ -628,45 +637,20 @@ where
message: crate::protocol::DataMessage,
hook_key: HookKey,
) -> Result<ProcedureRuntimeOutcome, ProcedureRuntimeError<P::Error>> {
let Some(mut session) = self.leaf.procedure_sessions().remove(&hook_key) else {
let Some(session) = self.leaf.procedure_sessions().remove(&hook_key) else {
return Ok(ProcedureRuntimeOutcome::default());
};
let effect = match P::on_data(
&mut self.leaf,
&mut session,
let outgoing = self.advance_session(hook_key.clone(), session, |leaf, session| {
P::on_data(
leaf,
session,
IncomingData {
header,
message,
hook_key: hook_key.clone(),
hook_key,
},
) {
Ok(effect) => self.ensure_terminal_packet(&hook_key, effect),
Err(error) => {
let _ = P::close(&mut self.leaf, session);
let frames = self.emit_internal_fault(Some(hook_key.clone()))?;
let _ = error;
return Ok(ProcedureRuntimeOutcome {
frames,
dropped: false,
});
}
};
let outgoing = match self.emit_outgoing(effect.outgoing) {
Ok(outgoing) => outgoing.frames,
Err(error) => {
if !effect.close_session {
self.leaf.procedure_sessions().insert(hook_key, session);
} else {
let _ = P::close(&mut self.leaf, session);
}
return Err(error);
}
};
if !effect.close_session {
self.leaf.procedure_sessions().insert(hook_key, session);
} else {
let _ = P::close(&mut self.leaf, session);
}
)
})?;
Ok(ProcedureRuntimeOutcome {
frames: outgoing,
dropped: false,