std/sync/mpmc/
list.rs

1//! Unbounded channel implemented as a linked list.
2
3use super::context::Context;
4use super::error::*;
5use super::select::{Operation, Selected, Token};
6use super::utils::{Backoff, CachePadded};
7use super::waker::SyncWaker;
8use crate::cell::UnsafeCell;
9use crate::marker::PhantomData;
10use crate::mem::MaybeUninit;
11use crate::ptr;
12use crate::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
13use crate::time::Instant;
14
15// Bits indicating the state of a slot:
16// * If a message has been written into the slot, `WRITE` is set.
17// * If a message has been read from the slot, `READ` is set.
18// * If the block is being destroyed, `DESTROY` is set.
19const WRITE: usize = 1;
20const READ: usize = 2;
21const DESTROY: usize = 4;
22
23// Each block covers one "lap" of indices.
24const LAP: usize = 32;
25// The maximum number of messages a block can hold.
26const BLOCK_CAP: usize = LAP - 1;
27// How many lower bits are reserved for metadata.
28const SHIFT: usize = 1;
29// Has two different purposes:
30// * If set in head, indicates that the block is not the last one.
31// * If set in tail, indicates that the channel is disconnected.
32const MARK_BIT: usize = 1;
33
34/// A slot in a block.
35struct Slot<T> {
36    /// The message.
37    msg: UnsafeCell<MaybeUninit<T>>,
38
39    /// The state of the slot.
40    state: AtomicUsize,
41}
42
43impl<T> Slot<T> {
44    /// Waits until a message is written into the slot.
45    fn wait_write(&self) {
46        let backoff = Backoff::new();
47        while self.state.load(Ordering::Acquire) & WRITE == 0 {
48            backoff.spin_heavy();
49        }
50    }
51}
52
53/// A block in a linked list.
54///
55/// Each block in the list can hold up to `BLOCK_CAP` messages.
56struct Block<T> {
57    /// The next block in the linked list.
58    next: AtomicPtr<Block<T>>,
59
60    /// Slots for messages.
61    slots: [Slot<T>; BLOCK_CAP],
62}
63
64impl<T> Block<T> {
65    /// Creates an empty block.
66    fn new() -> Box<Block<T>> {
67        // SAFETY: This is safe because:
68        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
69        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
70        //  [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
71        //       holds a MaybeUninit.
72        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
73        unsafe { Box::new_zeroed().assume_init() }
74    }
75
76    /// Waits until the next pointer is set.
77    fn wait_next(&self) -> *mut Block<T> {
78        let backoff = Backoff::new();
79        loop {
80            let next = self.next.load(Ordering::Acquire);
81            if !next.is_null() {
82                return next;
83            }
84            backoff.spin_heavy();
85        }
86    }
87
88    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
89    unsafe fn destroy(this: *mut Block<T>, start: usize) {
90        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
91        // begun destruction of the block.
92        for i in start..BLOCK_CAP - 1 {
93            let slot = unsafe { (*this).slots.get_unchecked(i) };
94
95            // Mark the `DESTROY` bit if a thread is still using the slot.
96            if slot.state.load(Ordering::Acquire) & READ == 0
97                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
98            {
99                // If a thread is still using the slot, it will continue destruction of the block.
100                return;
101            }
102        }
103
104        // No thread is using the block, now it is safe to destroy it.
105        drop(unsafe { Box::from_raw(this) });
106    }
107}
108
109/// A position in a channel.
110#[derive(Debug)]
111struct Position<T> {
112    /// The index in the channel.
113    index: AtomicUsize,
114
115    /// The block in the linked list.
116    block: AtomicPtr<Block<T>>,
117}
118
119/// The token type for the list flavor.
120#[derive(Debug)]
121pub(crate) struct ListToken {
122    /// The block of slots.
123    block: *const u8,
124
125    /// The offset into the block.
126    offset: usize,
127}
128
129impl Default for ListToken {
130    #[inline]
131    fn default() -> Self {
132        ListToken { block: ptr::null(), offset: 0 }
133    }
134}
135
136/// Unbounded channel implemented as a linked list.
137///
138/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
139/// represented as numbers of type `usize` and wrap on overflow.
140///
141/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
142/// improve cache efficiency.
143pub(crate) struct Channel<T> {
144    /// The head of the channel.
145    head: CachePadded<Position<T>>,
146
147    /// The tail of the channel.
148    tail: CachePadded<Position<T>>,
149
150    /// Receivers waiting while the channel is empty and not disconnected.
151    receivers: SyncWaker,
152
153    /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
154    _marker: PhantomData<T>,
155}
156
157impl<T> Channel<T> {
158    /// Creates a new unbounded channel.
159    pub(crate) fn new() -> Self {
160        Channel {
161            head: CachePadded::new(Position {
162                block: AtomicPtr::new(ptr::null_mut()),
163                index: AtomicUsize::new(0),
164            }),
165            tail: CachePadded::new(Position {
166                block: AtomicPtr::new(ptr::null_mut()),
167                index: AtomicUsize::new(0),
168            }),
169            receivers: SyncWaker::new(),
170            _marker: PhantomData,
171        }
172    }
173
174    /// Attempts to reserve a slot for sending a message.
175    fn start_send(&self, token: &mut Token) -> bool {
176        let backoff = Backoff::new();
177        let mut tail = self.tail.index.load(Ordering::Acquire);
178        let mut block = self.tail.block.load(Ordering::Acquire);
179        let mut next_block = None;
180
181        loop {
182            // Check if the channel is disconnected.
183            if tail & MARK_BIT != 0 {
184                token.list.block = ptr::null();
185                return true;
186            }
187
188            // Calculate the offset of the index into the block.
189            let offset = (tail >> SHIFT) % LAP;
190
191            // If we reached the end of the block, wait until the next one is installed.
192            if offset == BLOCK_CAP {
193                backoff.spin_heavy();
194                tail = self.tail.index.load(Ordering::Acquire);
195                block = self.tail.block.load(Ordering::Acquire);
196                continue;
197            }
198
199            // If we're going to have to install the next block, allocate it in advance in order to
200            // make the wait for other threads as short as possible.
201            if offset + 1 == BLOCK_CAP && next_block.is_none() {
202                next_block = Some(Block::<T>::new());
203            }
204
205            // If this is the first message to be sent into the channel, we need to allocate the
206            // first block and install it.
207            if block.is_null() {
208                let new = Box::into_raw(Block::<T>::new());
209
210                if self
211                    .tail
212                    .block
213                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
214                    .is_ok()
215                {
216                    self.head.block.store(new, Ordering::Release);
217                    block = new;
218                } else {
219                    next_block = unsafe { Some(Box::from_raw(new)) };
220                    tail = self.tail.index.load(Ordering::Acquire);
221                    block = self.tail.block.load(Ordering::Acquire);
222                    continue;
223                }
224            }
225
226            let new_tail = tail + (1 << SHIFT);
227
228            // Try advancing the tail forward.
229            match self.tail.index.compare_exchange_weak(
230                tail,
231                new_tail,
232                Ordering::SeqCst,
233                Ordering::Acquire,
234            ) {
235                Ok(_) => unsafe {
236                    // If we've reached the end of the block, install the next one.
237                    if offset + 1 == BLOCK_CAP {
238                        let next_block = Box::into_raw(next_block.unwrap());
239                        self.tail.block.store(next_block, Ordering::Release);
240                        self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
241                        (*block).next.store(next_block, Ordering::Release);
242                    }
243
244                    token.list.block = block as *const u8;
245                    token.list.offset = offset;
246                    return true;
247                },
248                Err(_) => {
249                    backoff.spin_light();
250                    tail = self.tail.index.load(Ordering::Acquire);
251                    block = self.tail.block.load(Ordering::Acquire);
252                }
253            }
254        }
255    }
256
257    /// Writes a message into the channel.
258    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
259        // If there is no slot, the channel is disconnected.
260        if token.list.block.is_null() {
261            return Err(msg);
262        }
263
264        // Write the message into the slot.
265        let block = token.list.block as *mut Block<T>;
266        let offset = token.list.offset;
267        unsafe {
268            let slot = (*block).slots.get_unchecked(offset);
269            slot.msg.get().write(MaybeUninit::new(msg));
270            slot.state.fetch_or(WRITE, Ordering::Release);
271        }
272
273        // Wake a sleeping receiver.
274        self.receivers.notify();
275        Ok(())
276    }
277
278    /// Attempts to reserve a slot for receiving a message.
279    fn start_recv(&self, token: &mut Token) -> bool {
280        let backoff = Backoff::new();
281        let mut head = self.head.index.load(Ordering::Acquire);
282        let mut block = self.head.block.load(Ordering::Acquire);
283
284        loop {
285            // Calculate the offset of the index into the block.
286            let offset = (head >> SHIFT) % LAP;
287
288            // If we reached the end of the block, wait until the next one is installed.
289            if offset == BLOCK_CAP {
290                backoff.spin_heavy();
291                head = self.head.index.load(Ordering::Acquire);
292                block = self.head.block.load(Ordering::Acquire);
293                continue;
294            }
295
296            let mut new_head = head + (1 << SHIFT);
297
298            if new_head & MARK_BIT == 0 {
299                atomic::fence(Ordering::SeqCst);
300                let tail = self.tail.index.load(Ordering::Relaxed);
301
302                // If the tail equals the head, that means the channel is empty.
303                if head >> SHIFT == tail >> SHIFT {
304                    // If the channel is disconnected...
305                    if tail & MARK_BIT != 0 {
306                        // ...then receive an error.
307                        token.list.block = ptr::null();
308                        return true;
309                    } else {
310                        // Otherwise, the receive operation is not ready.
311                        return false;
312                    }
313                }
314
315                // If head and tail are not in the same block, set `MARK_BIT` in head.
316                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
317                    new_head |= MARK_BIT;
318                }
319            }
320
321            // The block can be null here only if the first message is being sent into the channel.
322            // In that case, just wait until it gets initialized.
323            if block.is_null() {
324                backoff.spin_heavy();
325                head = self.head.index.load(Ordering::Acquire);
326                block = self.head.block.load(Ordering::Acquire);
327                continue;
328            }
329
330            // Try moving the head index forward.
331            match self.head.index.compare_exchange_weak(
332                head,
333                new_head,
334                Ordering::SeqCst,
335                Ordering::Acquire,
336            ) {
337                Ok(_) => unsafe {
338                    // If we've reached the end of the block, move to the next one.
339                    if offset + 1 == BLOCK_CAP {
340                        let next = (*block).wait_next();
341                        let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
342                        if !(*next).next.load(Ordering::Relaxed).is_null() {
343                            next_index |= MARK_BIT;
344                        }
345
346                        self.head.block.store(next, Ordering::Release);
347                        self.head.index.store(next_index, Ordering::Release);
348                    }
349
350                    token.list.block = block as *const u8;
351                    token.list.offset = offset;
352                    return true;
353                },
354                Err(_) => {
355                    backoff.spin_light();
356                    head = self.head.index.load(Ordering::Acquire);
357                    block = self.head.block.load(Ordering::Acquire);
358                }
359            }
360        }
361    }
362
363    /// Reads a message from the channel.
364    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
365        if token.list.block.is_null() {
366            // The channel is disconnected.
367            return Err(());
368        }
369
370        // Read the message.
371        let block = token.list.block as *mut Block<T>;
372        let offset = token.list.offset;
373        unsafe {
374            let slot = (*block).slots.get_unchecked(offset);
375            slot.wait_write();
376            let msg = slot.msg.get().read().assume_init();
377
378            // Destroy the block if we've reached the end, or if another thread wanted to destroy but
379            // couldn't because we were busy reading from the slot.
380            if offset + 1 == BLOCK_CAP {
381                Block::destroy(block, 0);
382            } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
383                Block::destroy(block, offset + 1);
384            }
385
386            Ok(msg)
387        }
388    }
389
390    /// Attempts to send a message into the channel.
391    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
392        self.send(msg, None).map_err(|err| match err {
393            SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
394            SendTimeoutError::Timeout(_) => unreachable!(),
395        })
396    }
397
398    /// Sends a message into the channel.
399    pub(crate) fn send(
400        &self,
401        msg: T,
402        _deadline: Option<Instant>,
403    ) -> Result<(), SendTimeoutError<T>> {
404        let token = &mut Token::default();
405        assert!(self.start_send(token));
406        unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) }
407    }
408
409    /// Attempts to receive a message without blocking.
410    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
411        let token = &mut Token::default();
412
413        if self.start_recv(token) {
414            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
415        } else {
416            Err(TryRecvError::Empty)
417        }
418    }
419
420    /// Receives a message from the channel.
421    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
422        let token = &mut Token::default();
423        loop {
424            if self.start_recv(token) {
425                unsafe {
426                    return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
427                }
428            }
429
430            if let Some(d) = deadline {
431                if Instant::now() >= d {
432                    return Err(RecvTimeoutError::Timeout);
433                }
434            }
435
436            // Prepare for blocking until a sender wakes us up.
437            Context::with(|cx| {
438                let oper = Operation::hook(token);
439                self.receivers.register(oper, cx);
440
441                // Has the channel become ready just now?
442                if !self.is_empty() || self.is_disconnected() {
443                    let _ = cx.try_select(Selected::Aborted);
444                }
445
446                // Block the current thread.
447                // SAFETY: the context belongs to the current thread.
448                let sel = unsafe { cx.wait_until(deadline) };
449
450                match sel {
451                    Selected::Waiting => unreachable!(),
452                    Selected::Aborted | Selected::Disconnected => {
453                        self.receivers.unregister(oper).unwrap();
454                        // If the channel was disconnected, we still have to check for remaining
455                        // messages.
456                    }
457                    Selected::Operation(_) => {}
458                }
459            });
460        }
461    }
462
463    /// Returns the current number of messages inside the channel.
464    pub(crate) fn len(&self) -> usize {
465        loop {
466            // Load the tail index, then load the head index.
467            let mut tail = self.tail.index.load(Ordering::SeqCst);
468            let mut head = self.head.index.load(Ordering::SeqCst);
469
470            // If the tail index didn't change, we've got consistent indices to work with.
471            if self.tail.index.load(Ordering::SeqCst) == tail {
472                // Erase the lower bits.
473                tail &= !((1 << SHIFT) - 1);
474                head &= !((1 << SHIFT) - 1);
475
476                // Fix up indices if they fall onto block ends.
477                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
478                    tail = tail.wrapping_add(1 << SHIFT);
479                }
480                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
481                    head = head.wrapping_add(1 << SHIFT);
482                }
483
484                // Rotate indices so that head falls into the first block.
485                let lap = (head >> SHIFT) / LAP;
486                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
487                head = head.wrapping_sub((lap * LAP) << SHIFT);
488
489                // Remove the lower bits.
490                tail >>= SHIFT;
491                head >>= SHIFT;
492
493                // Return the difference minus the number of blocks between tail and head.
494                return tail - head - tail / LAP;
495            }
496        }
497    }
498
499    /// Returns the capacity of the channel.
500    pub(crate) fn capacity(&self) -> Option<usize> {
501        None
502    }
503
504    /// Disconnects senders and wakes up all blocked receivers.
505    ///
506    /// Returns `true` if this call disconnected the channel.
507    pub(crate) fn disconnect_senders(&self) -> bool {
508        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
509
510        if tail & MARK_BIT == 0 {
511            self.receivers.disconnect();
512            true
513        } else {
514            false
515        }
516    }
517
518    /// Disconnects receivers.
519    ///
520    /// Returns `true` if this call disconnected the channel.
521    pub(crate) fn disconnect_receivers(&self) -> bool {
522        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
523
524        if tail & MARK_BIT == 0 {
525            // If receivers are dropped first, discard all messages to free
526            // memory eagerly.
527            self.discard_all_messages();
528            true
529        } else {
530            false
531        }
532    }
533
534    /// Discards all messages.
535    ///
536    /// This method should only be called when all receivers are dropped.
537    fn discard_all_messages(&self) {
538        let backoff = Backoff::new();
539        let mut tail = self.tail.index.load(Ordering::Acquire);
540        loop {
541            let offset = (tail >> SHIFT) % LAP;
542            if offset != BLOCK_CAP {
543                break;
544            }
545
546            // New updates to tail will be rejected by MARK_BIT and aborted unless it's
547            // at boundary. We need to wait for the updates take affect otherwise there
548            // can be memory leaks.
549            backoff.spin_heavy();
550            tail = self.tail.index.load(Ordering::Acquire);
551        }
552
553        let mut head = self.head.index.load(Ordering::Acquire);
554        // The channel may be uninitialized, so we have to swap to avoid overwriting any sender's attempts
555        // to initialize the first block before noticing that the receivers disconnected. Late allocations
556        // will be deallocated by the sender in Drop.
557        let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
558
559        // If we're going to be dropping messages we need to synchronize with initialization
560        if head >> SHIFT != tail >> SHIFT {
561            // The block can be null here only if a sender is in the process of initializing the
562            // channel while another sender managed to send a message by inserting it into the
563            // semi-initialized channel and advanced the tail.
564            // In that case, just wait until it gets initialized.
565            while block.is_null() {
566                backoff.spin_heavy();
567                block = self.head.block.load(Ordering::Acquire);
568            }
569        }
570
571        unsafe {
572            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
573            while head >> SHIFT != tail >> SHIFT {
574                let offset = (head >> SHIFT) % LAP;
575
576                if offset < BLOCK_CAP {
577                    // Drop the message in the slot.
578                    let slot = (*block).slots.get_unchecked(offset);
579                    slot.wait_write();
580                    let p = &mut *slot.msg.get();
581                    p.as_mut_ptr().drop_in_place();
582                } else {
583                    (*block).wait_next();
584                    // Deallocate the block and move to the next one.
585                    let next = (*block).next.load(Ordering::Acquire);
586                    drop(Box::from_raw(block));
587                    block = next;
588                }
589
590                head = head.wrapping_add(1 << SHIFT);
591            }
592
593            // Deallocate the last remaining block.
594            if !block.is_null() {
595                drop(Box::from_raw(block));
596            }
597        }
598
599        head &= !MARK_BIT;
600        self.head.index.store(head, Ordering::Release);
601    }
602
603    /// Returns `true` if the channel is disconnected.
604    pub(crate) fn is_disconnected(&self) -> bool {
605        self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
606    }
607
608    /// Returns `true` if the channel is empty.
609    pub(crate) fn is_empty(&self) -> bool {
610        let head = self.head.index.load(Ordering::SeqCst);
611        let tail = self.tail.index.load(Ordering::SeqCst);
612        head >> SHIFT == tail >> SHIFT
613    }
614
615    /// Returns `true` if the channel is full.
616    pub(crate) fn is_full(&self) -> bool {
617        false
618    }
619}
620
621impl<T> Drop for Channel<T> {
622    fn drop(&mut self) {
623        let mut head = self.head.index.load(Ordering::Relaxed);
624        let mut tail = self.tail.index.load(Ordering::Relaxed);
625        let mut block = self.head.block.load(Ordering::Relaxed);
626
627        // Erase the lower bits.
628        head &= !((1 << SHIFT) - 1);
629        tail &= !((1 << SHIFT) - 1);
630
631        unsafe {
632            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
633            while head != tail {
634                let offset = (head >> SHIFT) % LAP;
635
636                if offset < BLOCK_CAP {
637                    // Drop the message in the slot.
638                    let slot = (*block).slots.get_unchecked(offset);
639                    let p = &mut *slot.msg.get();
640                    p.as_mut_ptr().drop_in_place();
641                } else {
642                    // Deallocate the block and move to the next one.
643                    let next = (*block).next.load(Ordering::Relaxed);
644                    drop(Box::from_raw(block));
645                    block = next;
646                }
647
648                head = head.wrapping_add(1 << SHIFT);
649            }
650
651            // Deallocate the last remaining block.
652            if !block.is_null() {
653                drop(Box::from_raw(block));
654            }
655        }
656    }
657}