cargo/util/
queue.rs
1use std::collections::VecDeque;
2use std::sync::{Condvar, Mutex};
3use std::time::Duration;
4
5pub struct Queue<T> {
17 state: Mutex<State<T>>,
18 popper_cv: Condvar,
19 bounded_cv: Condvar,
20 bound: usize,
21}
22
23struct State<T> {
24 items: VecDeque<T>,
25}
26
27impl<T> Queue<T> {
28 pub fn new(bound: usize) -> Queue<T> {
30 Queue {
31 state: Mutex::new(State {
32 items: VecDeque::new(),
33 }),
34 popper_cv: Condvar::new(),
35 bounded_cv: Condvar::new(),
36 bound,
37 }
38 }
39
40 pub fn push(&self, item: T) {
42 self.state.lock().unwrap().items.push_back(item);
43 self.popper_cv.notify_one();
44 }
45
46 pub fn push_bounded(&self, item: T) {
48 let locked_state = self.state.lock().unwrap();
49 let mut state = self
50 .bounded_cv
51 .wait_while(locked_state, |s| s.items.len() >= self.bound)
52 .unwrap();
53 state.items.push_back(item);
54 self.popper_cv.notify_one();
55 }
56
57 pub fn pop(&self, timeout: Duration) -> Option<T> {
59 let (mut state, result) = self
60 .popper_cv
61 .wait_timeout_while(self.state.lock().unwrap(), timeout, |s| s.items.is_empty())
62 .unwrap();
63 if result.timed_out() {
64 None
65 } else {
66 let value = state.items.pop_front()?;
67 if state.items.len() < self.bound {
68 self.bounded_cv.notify_one();
70 }
71 Some(value)
72 }
73 }
74
75 pub fn try_pop_all(&self) -> Vec<T> {
77 let mut state = self.state.lock().unwrap();
78 let result = state.items.drain(..).collect();
79 self.bounded_cv.notify_all();
80 result
81 }
82}