std/sync/mpmc/
counter.rs

1use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2use crate::{ops, process};
3
4/// Reference counter internals.
5struct Counter<C> {
6    /// The number of senders associated with the channel.
7    senders: AtomicUsize,
8
9    /// The number of receivers associated with the channel.
10    receivers: AtomicUsize,
11
12    /// Set to `true` if the last sender or the last receiver reference deallocates the channel.
13    destroy: AtomicBool,
14
15    /// The internal channel.
16    chan: C,
17}
18
19/// Wraps a channel into the reference counter.
20pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
21    let counter = Box::into_raw(Box::new(Counter {
22        senders: AtomicUsize::new(1),
23        receivers: AtomicUsize::new(1),
24        destroy: AtomicBool::new(false),
25        chan,
26    }));
27    let s = Sender { counter };
28    let r = Receiver { counter };
29    (s, r)
30}
31
32/// The sending side.
33pub(crate) struct Sender<C> {
34    counter: *mut Counter<C>,
35}
36
37impl<C> Sender<C> {
38    /// Returns the internal `Counter`.
39    fn counter(&self) -> &Counter<C> {
40        unsafe { &*self.counter }
41    }
42
43    /// Acquires another sender reference.
44    pub(crate) fn acquire(&self) -> Sender<C> {
45        let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
46
47        // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
48        // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
49        // just abort when the count becomes very large.
50        if count > isize::MAX as usize {
51            process::abort();
52        }
53
54        Sender { counter: self.counter }
55    }
56
57    /// Releases the sender reference.
58    ///
59    /// Function `disconnect` will be called if this is the last sender reference.
60    pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
61        if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
62            disconnect(&self.counter().chan);
63
64            if self.counter().destroy.swap(true, Ordering::AcqRel) {
65                drop(unsafe { Box::from_raw(self.counter) });
66            }
67        }
68    }
69}
70
71impl<C> ops::Deref for Sender<C> {
72    type Target = C;
73
74    fn deref(&self) -> &C {
75        &self.counter().chan
76    }
77}
78
79impl<C> PartialEq for Sender<C> {
80    fn eq(&self, other: &Sender<C>) -> bool {
81        self.counter == other.counter
82    }
83}
84
85/// The receiving side.
86pub(crate) struct Receiver<C> {
87    counter: *mut Counter<C>,
88}
89
90impl<C> Receiver<C> {
91    /// Returns the internal `Counter`.
92    fn counter(&self) -> &Counter<C> {
93        unsafe { &*self.counter }
94    }
95
96    /// Acquires another receiver reference.
97    pub(crate) fn acquire(&self) -> Receiver<C> {
98        let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
99
100        // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
101        // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
102        // just abort when the count becomes very large.
103        if count > isize::MAX as usize {
104            process::abort();
105        }
106
107        Receiver { counter: self.counter }
108    }
109
110    /// Releases the receiver reference.
111    ///
112    /// Function `disconnect` will be called if this is the last receiver reference.
113    pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
114        if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
115            disconnect(&self.counter().chan);
116
117            if self.counter().destroy.swap(true, Ordering::AcqRel) {
118                drop(unsafe { Box::from_raw(self.counter) });
119            }
120        }
121    }
122}
123
124impl<C> ops::Deref for Receiver<C> {
125    type Target = C;
126
127    fn deref(&self) -> &C {
128        &self.counter().chan
129    }
130}
131
132impl<C> PartialEq for Receiver<C> {
133    fn eq(&self, other: &Receiver<C>) -> bool {
134        self.counter == other.counter
135    }
136}