//! # REPL Core //! //! The main interactive loop for the operator CLI. //! //! ## Flow //! //! ```text //! run() //! ↓ //! connect to router → handshake → register as operator node //! ↓ //! start recv thread (router → operator messages) //! ↓ //! main thread: readline loop //! parse command //! execute (may send TreeRequest over transport) //! print response //! ``` //! //! ## Threading model //! //! The transport is shared between: //! - The main thread (sends requests, prints responses). //! - A background recv thread (receives unsolicited messages from the router, //! e.g., node-connected notifications — future feature). //! //! In v1, the main thread does both send and receive synchronously (blocking //! recv after each send). The recv thread is reserved for future async notifications. use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use rustyline::error::ReadlineError; use rustyline::DefaultEditor; use unshell::protocol::{ content, HandshakeAck, HandshakeMessage, NodeType, PacketHeader, PacketType, RequestType, TreeRequest, }; use unshell::transport::tcp::TcpTransport; use unshell::transport::Transport; use crate::commands::{self, Command}; use crate::session::Session; // --------------------------------------------------------------------------- // Request ID counter // --------------------------------------------------------------------------- /// Monotonically increasing request ID generator. /// /// Generates unique IDs so the operator can correlate responses to requests /// in the future when multiple requests are in-flight concurrently. static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(1); fn next_request_id() -> u64 { REQUEST_COUNTER.fetch_add(1, Ordering::SeqCst) } // --------------------------------------------------------------------------- // Entry point // --------------------------------------------------------------------------- /// Start the operator REPL, connecting to `router_addr`. /// /// Blocks until the user types `exit` or the connection is lost. /// /// # Errors /// /// Returns an error if the connection or handshake fails. pub fn run(router_addr: &str) -> Result<(), Box> { println!("UnShell operator console"); println!("Connecting to {}...", router_addr); let mut transport = TcpTransport::connect(router_addr)?; let session_id = format!("sess{}", std::process::id()); let base_path = format!("/operator/{session_id}"); // Handshake let handshake = HandshakeMessage { node_id: session_id.clone(), node_type: NodeType::Operator, registered_paths: vec![base_path.clone()], platform: "operator".to_owned(), }; let handshake_payload = rkyv::to_bytes::(&handshake) .map_err(|e| format!("failed to serialise handshake: {e}"))?; let handshake_header = PacketHeader { dst_path: "/router".to_owned(), src_path: base_path.clone(), packet_type: PacketType::Handshake, }; transport.send(&handshake_header, &handshake_payload)?; let (_, ack_payload) = transport.recv()?; let ack: HandshakeAck = rkyv::from_bytes::(&ack_payload) .map_err(|e| format!("failed to deserialise ack: {e}"))?; if !ack.accepted { return Err(format!( "router rejected: {}", ack.rejection_reason.unwrap_or_default() ) .into()); } println!("Connected. Type 'help' for commands."); // Wrap transport in a Mutex for shared access let transport = Arc::new(Mutex::new(transport)); // REPL state let mut current_session = Session::new("default", "/"); let mut background_sessions: Vec = Vec::new(); // Readline editor with history let mut rl = DefaultEditor::new()?; loop { let prompt = if current_session.current_path == "/" { "unshell> ".to_owned() } else { let short = current_session .current_path .trim_start_matches("/agents/") .trim_start_matches("/operator/"); format!("unshell [{short}]> ") }; let readline = rl.readline(&prompt); match readline { Ok(line) => { rl.add_history_entry(line.as_str()) .unwrap_or_default(); match commands::parse(&line) { Ok(None) => {} // empty / comment Ok(Some(cmd)) => { if !handle_command( cmd, &mut current_session, &mut background_sessions, &base_path, &transport, ) { break; // exit command } } Err(e) => println!("error: {e}"), } } Err(ReadlineError::Interrupted | ReadlineError::Eof) => { println!("Disconnecting..."); break; } Err(e) => { eprintln!("readline error: {e}"); break; } } } println!("Bye."); Ok(()) } // --------------------------------------------------------------------------- // Command handlers // --------------------------------------------------------------------------- /// Handle one parsed command. /// /// Returns `false` if the REPL should exit, `true` to continue. fn handle_command( cmd: Command, current_session: &mut Session, background_sessions: &mut Vec, base_path: &str, transport: &Arc>, ) -> bool { match cmd { Command::Exit => return false, Command::Help => commands::print_help(), Command::Use(path) => { // Normalise: if no leading slash, prepend /agents/ let resolved = if path.starts_with('/') { path } else { format!("/agents/{path}") }; current_session.current_path = resolved; println!("current path: {}", current_session.current_path); } Command::List => { // Send GetProcedures to /router/nodes send_request_and_print( "/router/nodes", RequestType::GetProcedures, content::NONE, None, base_path, transport, ); } Command::Ls(sub_path) => { let path = sub_path .as_deref() .map(|p| current_session.resolve(p)) .unwrap_or_else(|| current_session.current_path.clone()); send_request_and_print( &path, RequestType::GetProcedures, content::NONE, None, base_path, transport, ); } Command::Read(sub_path) => { let path = current_session.resolve(&sub_path); send_request_and_print( &path, RequestType::Read, content::NONE, None, base_path, transport, ); } Command::Call { path, data } => { let full_path = current_session.resolve(&path); send_request_and_print( &full_path, RequestType::CallProcedure, content::UTF8_STRING, data.as_deref(), base_path, transport, ); } Command::Write { path, data } => { let full_path = current_session.resolve(&path); send_request_and_print( &full_path, RequestType::Write, content::UTF8_STRING, Some(&data), base_path, transport, ); } Command::Background => { let mut session = current_session.clone(); session.active = false; background_sessions.push(session); current_session.current_path = "/".to_owned(); println!("session backgrounded. Type 'sessions' to list."); } Command::Sessions => { if background_sessions.is_empty() { println!("no background sessions"); } else { for (i, sess) in background_sessions.iter().enumerate() { println!(" [{i}] {} ({})", sess.name, sess.current_path); } } } } true } /// Send a `TreeRequest` and print the response. fn send_request_and_print( dst_path: &str, request_type: RequestType, content_type: &str, data: Option<&str>, src_path: &str, transport: &Arc>, ) { let request = TreeRequest { request_id: next_request_id(), request_type, content_type: content_type.to_owned(), data: data.map(|s| s.as_bytes().to_vec()).unwrap_or_default(), }; let Ok(payload) = rkyv::to_bytes::(&request) else { eprintln!("error: failed to serialise request"); return; }; let header = PacketHeader { dst_path: dst_path.to_owned(), src_path: src_path.to_owned(), packet_type: PacketType::Request, }; let mut t = transport.lock().expect("transport lock poisoned"); if let Err(e) = t.send(&header, &payload) { eprintln!("send error: {e}"); return; } match t.recv() { Ok((_, resp_payload)) => { match rkyv::from_bytes::( &resp_payload, ) { Ok(resp) => { if resp.data.is_empty() { println!("[{:?}]", resp.status); } else if let Ok(text) = std::str::from_utf8(&resp.data) { println!("{text}"); } else { println!("[{} bytes, content-type: {}]", resp.data.len(), resp.content_type); } } Err(e) => eprintln!("error: failed to deserialise response: {e}"), } } Err(e) => eprintln!("recv error: {e}"), } }