2026-02-16 10:03:23 -07:00
|
|
|
//! Queue - A TreeElement wrapper around crossbeam channels for message queuing.
|
2026-02-20 14:05:43 -07:00
|
|
|
//!
|
|
|
|
|
//! Provides a thread-safe queue that can be accessed via tree messages.
|
|
|
|
|
//! Useful for inter-thread communication and log buffering.
|
|
|
|
|
//!
|
|
|
|
|
//! # Tree Interface
|
|
|
|
|
//!
|
|
|
|
|
//! - `Get`: Receive one message (blocking)
|
|
|
|
|
//! - `Poll`: Try to receive without blocking
|
|
|
|
|
//! - `GetLength`: Get queue length
|
|
|
|
|
//!
|
|
|
|
|
//! # Usage
|
|
|
|
|
//!
|
|
|
|
|
//! ```rust
|
|
|
|
|
//! use unshell::tree::queue::Queue;
|
|
|
|
|
//! use unshell::tree::{Branch, TreeElement};
|
|
|
|
|
//! use serde_json::json;
|
|
|
|
|
//!
|
|
|
|
|
//! // Create queue with channel factory
|
|
|
|
|
//! let (sender, mut queue) = Queue::<String>::channel();
|
|
|
|
|
//!
|
|
|
|
|
//! // Add to branch
|
|
|
|
|
//! let mut branch = Branch::new("test");
|
|
|
|
|
//! branch.add_child("my_queue", Box::new(queue));
|
|
|
|
|
//!
|
|
|
|
|
//! // Send via sender (different thread)
|
|
|
|
|
//! sender.send("message".to_string()).unwrap();
|
|
|
|
|
//!
|
|
|
|
|
//! // Receive via tree message
|
|
|
|
|
//! let result = branch.send_message(json!("my_queue"), json!("Get"));
|
|
|
|
|
//! ```
|
2026-02-16 10:03:23 -07:00
|
|
|
|
|
|
|
|
use crossbeam_channel::{Receiver, Sender};
|
|
|
|
|
use serde_json::{json, Value};
|
|
|
|
|
|
|
|
|
|
use crate::tree::symbols::{self, TYPE_QUEUE};
|
|
|
|
|
use crate::tree::TreeElement;
|
|
|
|
|
|
|
|
|
|
/// Generic queue wrapping crossbeam channels.
|
2026-02-20 14:05:43 -07:00
|
|
|
///
|
2026-02-16 10:03:23 -07:00
|
|
|
/// Provides Get, Poll, and GetLength commands via the tree interface.
|
2026-02-20 14:05:43 -07:00
|
|
|
/// Thread-safe for multi-producer multi-consumer scenarios.
|
2026-02-16 10:03:23 -07:00
|
|
|
pub struct Queue<T> {
|
|
|
|
|
sender: Sender<T>,
|
|
|
|
|
receiver: Receiver<T>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<T> Queue<T> {
|
2026-02-20 14:05:43 -07:00
|
|
|
/// Create a new queue with given sender and receiver.
|
2026-02-16 10:03:23 -07:00
|
|
|
pub fn new(sender: Sender<T>, receiver: Receiver<T>) -> Self {
|
|
|
|
|
Self { sender, receiver }
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-20 14:05:43 -07:00
|
|
|
/// Create a channel pair, returning sender and queue.
|
|
|
|
|
///
|
|
|
|
|
/// Useful for setting up the queue where one end sends
|
|
|
|
|
/// and the other end is exposed via tree.
|
2026-02-16 10:03:23 -07:00
|
|
|
pub fn channel() -> (Sender<T>, Self) {
|
|
|
|
|
let (tx, rx) = crossbeam_channel::unbounded();
|
|
|
|
|
let queue = Self::new(tx.clone(), rx);
|
|
|
|
|
(tx, queue)
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-20 14:05:43 -07:00
|
|
|
/// Get reference to the sender for producing messages.
|
2026-02-16 10:03:23 -07:00
|
|
|
pub fn sender(&self) -> &Sender<T> {
|
|
|
|
|
&self.sender
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-20 14:05:43 -07:00
|
|
|
/// Get current queue length.
|
2026-02-16 10:03:23 -07:00
|
|
|
pub fn len(&self) -> usize {
|
|
|
|
|
self.receiver.len()
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-20 14:05:43 -07:00
|
|
|
/// Check if queue is empty.
|
2026-02-16 10:03:23 -07:00
|
|
|
pub fn is_empty(&self) -> bool {
|
|
|
|
|
self.receiver.is_empty()
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-20 14:05:43 -07:00
|
|
|
/// Try to receive without blocking.
|
2026-02-16 10:03:23 -07:00
|
|
|
pub fn try_recv(&self) -> Option<T> {
|
|
|
|
|
self.receiver.try_recv().ok()
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-20 14:05:43 -07:00
|
|
|
/// Receive a message (blocking).
|
2026-02-16 10:03:23 -07:00
|
|
|
pub fn recv(&self) -> Option<T> {
|
|
|
|
|
self.receiver.recv().ok()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<T: serde::Serialize + Send> TreeElement for Queue<T> {
|
|
|
|
|
fn get_type(&self) -> Value {
|
|
|
|
|
json!(TYPE_QUEUE)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn send_message(&mut self, target: Value, message: Value) -> Value {
|
|
|
|
|
match (target, message) {
|
|
|
|
|
(Value::Null, Value::String(cmd)) => match cmd.as_ref() {
|
|
|
|
|
symbols::CMD_GET => self.recv().map(|v| json!(v)).unwrap_or(json!(Value::Null)),
|
|
|
|
|
symbols::CMD_POLL => self
|
|
|
|
|
.try_recv()
|
|
|
|
|
.map(|v| json!(v))
|
|
|
|
|
.unwrap_or(json!(Value::Null)),
|
|
|
|
|
symbols::CMD_GET_LENGTH => json!(self.receiver.len()),
|
|
|
|
|
_ => json!(symbols::ERR_INVALID_COMMAND),
|
|
|
|
|
},
|
|
|
|
|
_ => json!(symbols::ERR_INVALID_TARGET),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|