1use std::collections::VecDeque;
2use std::sync::{Condvar, Mutex};
3use std::time::Duration;
45/// A simple, threadsafe, queue of items of type `T`
6///
7/// This is a sort of channel where any thread can push to a queue and any
8/// thread can pop from a queue.
9///
10/// This supports both bounded and unbounded operations. [`push`] will never block,
11/// and allows the queue to grow without bounds. [`push_bounded`] will block if
12/// the queue is over capacity, and will resume once there is enough capacity.
13///
14/// [`push`]: Self::push
15/// [`push_bounded`]: Self::push_bounded
16pub struct Queue<T> {
17 state: Mutex<State<T>>,
18 popper_cv: Condvar,
19 bounded_cv: Condvar,
20 bound: usize,
21}
2223struct State<T> {
24 items: VecDeque<T>,
25}
2627impl<T> Queue<T> {
28/// Creates a queue with a given bound.
29pub fn new(bound: usize) -> Queue<T> {
30 Queue {
31 state: Mutex::new(State {
32 items: VecDeque::new(),
33 }),
34 popper_cv: Condvar::new(),
35 bounded_cv: Condvar::new(),
36 bound,
37 }
38 }
3940/// Pushes an item onto the queue, regardless of the capacity of the queue.
41pub fn push(&self, item: T) {
42self.state.lock().unwrap().items.push_back(item);
43self.popper_cv.notify_one();
44 }
4546/// Pushes an item onto the queue, blocking if the queue is full.
47pub fn push_bounded(&self, item: T) {
48let locked_state = self.state.lock().unwrap();
49let mut state = self
50.bounded_cv
51 .wait_while(locked_state, |s| s.items.len() >= self.bound)
52 .unwrap();
53 state.items.push_back(item);
54self.popper_cv.notify_one();
55 }
5657/// Pops an item from the queue, blocking if the queue is empty.
58pub fn pop(&self, timeout: Duration) -> Option<T> {
59let (mut state, result) = self
60.popper_cv
61 .wait_timeout_while(self.state.lock().unwrap(), timeout, |s| s.items.is_empty())
62 .unwrap();
63if result.timed_out() {
64None
65} else {
66let value = state.items.pop_front()?;
67if state.items.len() < self.bound {
68// Assumes threads cannot be canceled.
69self.bounded_cv.notify_one();
70 }
71Some(value)
72 }
73 }
7475/// Pops all items from the queue without blocking.
76pub fn try_pop_all(&self) -> Vec<T> {
77let mut state = self.state.lock().unwrap();
78let result = state.items.drain(..).collect();
79self.bounded_cv.notify_all();
80 result
81 }
82}