cargo/util/
queue.rs

1use std::collections::VecDeque;
2use std::sync::{Condvar, Mutex};
3use std::time::Duration;
4
5/// A simple, threadsafe, queue of items of type `T`
6///
7/// This is a sort of channel where any thread can push to a queue and any
8/// thread can pop from a queue.
9///
10/// This supports both bounded and unbounded operations. [`push`] will never block,
11/// and allows the queue to grow without bounds. [`push_bounded`] will block if
12/// the queue is over capacity, and will resume once there is enough capacity.
13///
14/// [`push`]: Self::push
15/// [`push_bounded`]: Self::push_bounded
16pub struct Queue<T> {
17    state: Mutex<State<T>>,
18    popper_cv: Condvar,
19    bounded_cv: Condvar,
20    bound: usize,
21}
22
23struct State<T> {
24    items: VecDeque<T>,
25}
26
27impl<T> Queue<T> {
28    /// Creates a queue with a given bound.
29    pub fn new(bound: usize) -> Queue<T> {
30        Queue {
31            state: Mutex::new(State {
32                items: VecDeque::new(),
33            }),
34            popper_cv: Condvar::new(),
35            bounded_cv: Condvar::new(),
36            bound,
37        }
38    }
39
40    /// Pushes an item onto the queue, regardless of the capacity of the queue.
41    pub fn push(&self, item: T) {
42        self.state.lock().unwrap().items.push_back(item);
43        self.popper_cv.notify_one();
44    }
45
46    /// Pushes an item onto the queue, blocking if the queue is full.
47    pub fn push_bounded(&self, item: T) {
48        let locked_state = self.state.lock().unwrap();
49        let mut state = self
50            .bounded_cv
51            .wait_while(locked_state, |s| s.items.len() >= self.bound)
52            .unwrap();
53        state.items.push_back(item);
54        self.popper_cv.notify_one();
55    }
56
57    /// Pops an item from the queue, blocking if the queue is empty.
58    pub fn pop(&self, timeout: Duration) -> Option<T> {
59        let (mut state, result) = self
60            .popper_cv
61            .wait_timeout_while(self.state.lock().unwrap(), timeout, |s| s.items.is_empty())
62            .unwrap();
63        if result.timed_out() {
64            None
65        } else {
66            let value = state.items.pop_front()?;
67            if state.items.len() < self.bound {
68                // Assumes threads cannot be canceled.
69                self.bounded_cv.notify_one();
70            }
71            Some(value)
72        }
73    }
74
75    /// Pops all items from the queue without blocking.
76    pub fn try_pop_all(&self) -> Vec<T> {
77        let mut state = self.state.lock().unwrap();
78        let result = state.items.drain(..).collect();
79        self.bounded_cv.notify_all();
80        result
81    }
82}