1//! Unbounded channel implemented as a linked list.
23use super::context::Context;
4use super::error::*;
5use super::select::{Operation, Selected, Token};
6use super::utils::{Backoff, CachePadded};
7use super::waker::SyncWaker;
8use crate::cell::UnsafeCell;
9use crate::marker::PhantomData;
10use crate::mem::MaybeUninit;
11use crate::ptr;
12use crate::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
13use crate::time::Instant;
1415// Bits indicating the state of a slot:
16// * If a message has been written into the slot, `WRITE` is set.
17// * If a message has been read from the slot, `READ` is set.
18// * If the block is being destroyed, `DESTROY` is set.
19const WRITE: usize = 1;
20const READ: usize = 2;
21const DESTROY: usize = 4;
2223// Each block covers one "lap" of indices.
24const LAP: usize = 32;
25// The maximum number of messages a block can hold.
26const BLOCK_CAP: usize = LAP - 1;
27// How many lower bits are reserved for metadata.
28const SHIFT: usize = 1;
29// Has two different purposes:
30// * If set in head, indicates that the block is not the last one.
31// * If set in tail, indicates that the channel is disconnected.
32const MARK_BIT: usize = 1;
3334/// A slot in a block.
35struct Slot<T> {
36/// The message.
37msg: UnsafeCell<MaybeUninit<T>>,
3839/// The state of the slot.
40state: AtomicUsize,
41}
4243impl<T> Slot<T> {
44/// Waits until a message is written into the slot.
45fn wait_write(&self) {
46let backoff = Backoff::new();
47while self.state.load(Ordering::Acquire) & WRITE == 0 {
48 backoff.spin_heavy();
49 }
50 }
51}
5253/// A block in a linked list.
54///
55/// Each block in the list can hold up to `BLOCK_CAP` messages.
56struct Block<T> {
57/// The next block in the linked list.
58next: AtomicPtr<Block<T>>,
5960/// Slots for messages.
61slots: [Slot<T>; BLOCK_CAP],
62}
6364impl<T> Block<T> {
65/// Creates an empty block.
66fn new() -> Box<Block<T>> {
67// SAFETY: This is safe because:
68 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
69 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
70 // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
71 // holds a MaybeUninit.
72 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
73unsafe { Box::new_zeroed().assume_init() }
74 }
7576/// Waits until the next pointer is set.
77fn wait_next(&self) -> *mut Block<T> {
78let backoff = Backoff::new();
79loop {
80let next = self.next.load(Ordering::Acquire);
81if !next.is_null() {
82return next;
83 }
84 backoff.spin_heavy();
85 }
86 }
8788/// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
89unsafe fn destroy(this: *mut Block<T>, start: usize) {
90// It is not necessary to set the `DESTROY` bit in the last slot because that slot has
91 // begun destruction of the block.
92for i in start..BLOCK_CAP - 1 {
93let slot = unsafe { (*this).slots.get_unchecked(i) };
9495// Mark the `DESTROY` bit if a thread is still using the slot.
96if slot.state.load(Ordering::Acquire) & READ == 0
97&& slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
98{
99// If a thread is still using the slot, it will continue destruction of the block.
100return;
101 }
102 }
103104// No thread is using the block, now it is safe to destroy it.
105drop(unsafe { Box::from_raw(this) });
106 }
107}
108109/// A position in a channel.
110#[derive(Debug)]
111struct Position<T> {
112/// The index in the channel.
113index: AtomicUsize,
114115/// The block in the linked list.
116block: AtomicPtr<Block<T>>,
117}
118119/// The token type for the list flavor.
120#[derive(Debug)]
121pub(crate) struct ListToken {
122/// The block of slots.
123block: *const u8,
124125/// The offset into the block.
126offset: usize,
127}
128129impl Default for ListToken {
130#[inline]
131fn default() -> Self {
132 ListToken { block: ptr::null(), offset: 0 }
133 }
134}
135136/// Unbounded channel implemented as a linked list.
137///
138/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
139/// represented as numbers of type `usize` and wrap on overflow.
140///
141/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
142/// improve cache efficiency.
143pub(crate) struct Channel<T> {
144/// The head of the channel.
145head: CachePadded<Position<T>>,
146147/// The tail of the channel.
148tail: CachePadded<Position<T>>,
149150/// Receivers waiting while the channel is empty and not disconnected.
151receivers: SyncWaker,
152153/// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
154_marker: PhantomData<T>,
155}
156157impl<T> Channel<T> {
158/// Creates a new unbounded channel.
159pub(crate) fn new() -> Self {
160 Channel {
161 head: CachePadded::new(Position {
162 block: AtomicPtr::new(ptr::null_mut()),
163 index: AtomicUsize::new(0),
164 }),
165 tail: CachePadded::new(Position {
166 block: AtomicPtr::new(ptr::null_mut()),
167 index: AtomicUsize::new(0),
168 }),
169 receivers: SyncWaker::new(),
170 _marker: PhantomData,
171 }
172 }
173174/// Attempts to reserve a slot for sending a message.
175fn start_send(&self, token: &mut Token) -> bool {
176let backoff = Backoff::new();
177let mut tail = self.tail.index.load(Ordering::Acquire);
178let mut block = self.tail.block.load(Ordering::Acquire);
179let mut next_block = None;
180181loop {
182// Check if the channel is disconnected.
183if tail & MARK_BIT != 0 {
184 token.list.block = ptr::null();
185return true;
186 }
187188// Calculate the offset of the index into the block.
189let offset = (tail >> SHIFT) % LAP;
190191// If we reached the end of the block, wait until the next one is installed.
192if offset == BLOCK_CAP {
193 backoff.spin_heavy();
194 tail = self.tail.index.load(Ordering::Acquire);
195 block = self.tail.block.load(Ordering::Acquire);
196continue;
197 }
198199// If we're going to have to install the next block, allocate it in advance in order to
200 // make the wait for other threads as short as possible.
201if offset + 1 == BLOCK_CAP && next_block.is_none() {
202 next_block = Some(Block::<T>::new());
203 }
204205// If this is the first message to be sent into the channel, we need to allocate the
206 // first block and install it.
207if block.is_null() {
208let new = Box::into_raw(Block::<T>::new());
209210if self
211.tail
212 .block
213 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
214 .is_ok()
215 {
216self.head.block.store(new, Ordering::Release);
217 block = new;
218 } else {
219 next_block = unsafe { Some(Box::from_raw(new)) };
220 tail = self.tail.index.load(Ordering::Acquire);
221 block = self.tail.block.load(Ordering::Acquire);
222continue;
223 }
224 }
225226let new_tail = tail + (1 << SHIFT);
227228// Try advancing the tail forward.
229match self.tail.index.compare_exchange_weak(
230 tail,
231 new_tail,
232 Ordering::SeqCst,
233 Ordering::Acquire,
234 ) {
235Ok(_) => unsafe {
236// If we've reached the end of the block, install the next one.
237if offset + 1 == BLOCK_CAP {
238let next_block = Box::into_raw(next_block.unwrap());
239self.tail.block.store(next_block, Ordering::Release);
240self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
241 (*block).next.store(next_block, Ordering::Release);
242 }
243244 token.list.block = block as *const u8;
245 token.list.offset = offset;
246return true;
247 },
248Err(_) => {
249 backoff.spin_light();
250 tail = self.tail.index.load(Ordering::Acquire);
251 block = self.tail.block.load(Ordering::Acquire);
252 }
253 }
254 }
255 }
256257/// Writes a message into the channel.
258pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
259// If there is no slot, the channel is disconnected.
260if token.list.block.is_null() {
261return Err(msg);
262 }
263264// Write the message into the slot.
265let block = token.list.block as *mut Block<T>;
266let offset = token.list.offset;
267unsafe {
268let slot = (*block).slots.get_unchecked(offset);
269 slot.msg.get().write(MaybeUninit::new(msg));
270 slot.state.fetch_or(WRITE, Ordering::Release);
271 }
272273// Wake a sleeping receiver.
274self.receivers.notify();
275Ok(())
276 }
277278/// Attempts to reserve a slot for receiving a message.
279fn start_recv(&self, token: &mut Token) -> bool {
280let backoff = Backoff::new();
281let mut head = self.head.index.load(Ordering::Acquire);
282let mut block = self.head.block.load(Ordering::Acquire);
283284loop {
285// Calculate the offset of the index into the block.
286let offset = (head >> SHIFT) % LAP;
287288// If we reached the end of the block, wait until the next one is installed.
289if offset == BLOCK_CAP {
290 backoff.spin_heavy();
291 head = self.head.index.load(Ordering::Acquire);
292 block = self.head.block.load(Ordering::Acquire);
293continue;
294 }
295296let mut new_head = head + (1 << SHIFT);
297298if new_head & MARK_BIT == 0 {
299 atomic::fence(Ordering::SeqCst);
300let tail = self.tail.index.load(Ordering::Relaxed);
301302// If the tail equals the head, that means the channel is empty.
303if head >> SHIFT == tail >> SHIFT {
304// If the channel is disconnected...
305if tail & MARK_BIT != 0 {
306// ...then receive an error.
307token.list.block = ptr::null();
308return true;
309 } else {
310// Otherwise, the receive operation is not ready.
311return false;
312 }
313 }
314315// If head and tail are not in the same block, set `MARK_BIT` in head.
316if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
317 new_head |= MARK_BIT;
318 }
319 }
320321// The block can be null here only if the first message is being sent into the channel.
322 // In that case, just wait until it gets initialized.
323if block.is_null() {
324 backoff.spin_heavy();
325 head = self.head.index.load(Ordering::Acquire);
326 block = self.head.block.load(Ordering::Acquire);
327continue;
328 }
329330// Try moving the head index forward.
331match self.head.index.compare_exchange_weak(
332 head,
333 new_head,
334 Ordering::SeqCst,
335 Ordering::Acquire,
336 ) {
337Ok(_) => unsafe {
338// If we've reached the end of the block, move to the next one.
339if offset + 1 == BLOCK_CAP {
340let next = (*block).wait_next();
341let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
342if !(*next).next.load(Ordering::Relaxed).is_null() {
343 next_index |= MARK_BIT;
344 }
345346self.head.block.store(next, Ordering::Release);
347self.head.index.store(next_index, Ordering::Release);
348 }
349350 token.list.block = block as *const u8;
351 token.list.offset = offset;
352return true;
353 },
354Err(_) => {
355 backoff.spin_light();
356 head = self.head.index.load(Ordering::Acquire);
357 block = self.head.block.load(Ordering::Acquire);
358 }
359 }
360 }
361 }
362363/// Reads a message from the channel.
364pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
365if token.list.block.is_null() {
366// The channel is disconnected.
367return Err(());
368 }
369370// Read the message.
371let block = token.list.block as *mut Block<T>;
372let offset = token.list.offset;
373unsafe {
374let slot = (*block).slots.get_unchecked(offset);
375 slot.wait_write();
376let msg = slot.msg.get().read().assume_init();
377378// Destroy the block if we've reached the end, or if another thread wanted to destroy but
379 // couldn't because we were busy reading from the slot.
380if offset + 1 == BLOCK_CAP {
381 Block::destroy(block, 0);
382 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
383 Block::destroy(block, offset + 1);
384 }
385386Ok(msg)
387 }
388 }
389390/// Attempts to send a message into the channel.
391pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
392self.send(msg, None).map_err(|err| match err {
393 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
394 SendTimeoutError::Timeout(_) => unreachable!(),
395 })
396 }
397398/// Sends a message into the channel.
399pub(crate) fn send(
400&self,
401 msg: T,
402 _deadline: Option<Instant>,
403 ) -> Result<(), SendTimeoutError<T>> {
404let token = &mut Token::default();
405assert!(self.start_send(token));
406unsafe { self.write(token, msg).map_err(SendTimeoutError::Disconnected) }
407 }
408409/// Attempts to receive a message without blocking.
410pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
411let token = &mut Token::default();
412413if self.start_recv(token) {
414unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
415 } else {
416Err(TryRecvError::Empty)
417 }
418 }
419420/// Receives a message from the channel.
421pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
422let token = &mut Token::default();
423loop {
424if self.start_recv(token) {
425unsafe {
426return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
427 }
428 }
429430if let Some(d) = deadline {
431if Instant::now() >= d {
432return Err(RecvTimeoutError::Timeout);
433 }
434 }
435436// Prepare for blocking until a sender wakes us up.
437Context::with(|cx| {
438let oper = Operation::hook(token);
439self.receivers.register(oper, cx);
440441// Has the channel become ready just now?
442if !self.is_empty() || self.is_disconnected() {
443let _ = cx.try_select(Selected::Aborted);
444 }
445446// Block the current thread.
447 // SAFETY: the context belongs to the current thread.
448let sel = unsafe { cx.wait_until(deadline) };
449450match sel {
451 Selected::Waiting => unreachable!(),
452 Selected::Aborted | Selected::Disconnected => {
453self.receivers.unregister(oper).unwrap();
454// If the channel was disconnected, we still have to check for remaining
455 // messages.
456}
457 Selected::Operation(_) => {}
458 }
459 });
460 }
461 }
462463/// Returns the current number of messages inside the channel.
464pub(crate) fn len(&self) -> usize {
465loop {
466// Load the tail index, then load the head index.
467let mut tail = self.tail.index.load(Ordering::SeqCst);
468let mut head = self.head.index.load(Ordering::SeqCst);
469470// If the tail index didn't change, we've got consistent indices to work with.
471if self.tail.index.load(Ordering::SeqCst) == tail {
472// Erase the lower bits.
473tail &= !((1 << SHIFT) - 1);
474 head &= !((1 << SHIFT) - 1);
475476// Fix up indices if they fall onto block ends.
477if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
478 tail = tail.wrapping_add(1 << SHIFT);
479 }
480if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
481 head = head.wrapping_add(1 << SHIFT);
482 }
483484// Rotate indices so that head falls into the first block.
485let lap = (head >> SHIFT) / LAP;
486 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
487 head = head.wrapping_sub((lap * LAP) << SHIFT);
488489// Remove the lower bits.
490tail >>= SHIFT;
491 head >>= SHIFT;
492493// Return the difference minus the number of blocks between tail and head.
494return tail - head - tail / LAP;
495 }
496 }
497 }
498499/// Returns the capacity of the channel.
500pub(crate) fn capacity(&self) -> Option<usize> {
501None
502}
503504/// Disconnects senders and wakes up all blocked receivers.
505 ///
506 /// Returns `true` if this call disconnected the channel.
507pub(crate) fn disconnect_senders(&self) -> bool {
508let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
509510if tail & MARK_BIT == 0 {
511self.receivers.disconnect();
512true
513} else {
514false
515}
516 }
517518/// Disconnects receivers.
519 ///
520 /// Returns `true` if this call disconnected the channel.
521pub(crate) fn disconnect_receivers(&self) -> bool {
522let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
523524if tail & MARK_BIT == 0 {
525// If receivers are dropped first, discard all messages to free
526 // memory eagerly.
527self.discard_all_messages();
528true
529} else {
530false
531}
532 }
533534/// Discards all messages.
535 ///
536 /// This method should only be called when all receivers are dropped.
537fn discard_all_messages(&self) {
538let backoff = Backoff::new();
539let mut tail = self.tail.index.load(Ordering::Acquire);
540loop {
541let offset = (tail >> SHIFT) % LAP;
542if offset != BLOCK_CAP {
543break;
544 }
545546// New updates to tail will be rejected by MARK_BIT and aborted unless it's
547 // at boundary. We need to wait for the updates take affect otherwise there
548 // can be memory leaks.
549backoff.spin_heavy();
550 tail = self.tail.index.load(Ordering::Acquire);
551 }
552553let mut head = self.head.index.load(Ordering::Acquire);
554// The channel may be uninitialized, so we have to swap to avoid overwriting any sender's attempts
555 // to initialize the first block before noticing that the receivers disconnected. Late allocations
556 // will be deallocated by the sender in Drop.
557let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
558559// If we're going to be dropping messages we need to synchronize with initialization
560if head >> SHIFT != tail >> SHIFT {
561// The block can be null here only if a sender is in the process of initializing the
562 // channel while another sender managed to send a message by inserting it into the
563 // semi-initialized channel and advanced the tail.
564 // In that case, just wait until it gets initialized.
565while block.is_null() {
566 backoff.spin_heavy();
567 block = self.head.block.load(Ordering::Acquire);
568 }
569 }
570571unsafe {
572// Drop all messages between head and tail and deallocate the heap-allocated blocks.
573while head >> SHIFT != tail >> SHIFT {
574let offset = (head >> SHIFT) % LAP;
575576if offset < BLOCK_CAP {
577// Drop the message in the slot.
578let slot = (*block).slots.get_unchecked(offset);
579 slot.wait_write();
580let p = &mut *slot.msg.get();
581 p.as_mut_ptr().drop_in_place();
582 } else {
583 (*block).wait_next();
584// Deallocate the block and move to the next one.
585let next = (*block).next.load(Ordering::Acquire);
586 drop(Box::from_raw(block));
587 block = next;
588 }
589590 head = head.wrapping_add(1 << SHIFT);
591 }
592593// Deallocate the last remaining block.
594if !block.is_null() {
595 drop(Box::from_raw(block));
596 }
597 }
598599 head &= !MARK_BIT;
600self.head.index.store(head, Ordering::Release);
601 }
602603/// Returns `true` if the channel is disconnected.
604pub(crate) fn is_disconnected(&self) -> bool {
605self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
606}
607608/// Returns `true` if the channel is empty.
609pub(crate) fn is_empty(&self) -> bool {
610let head = self.head.index.load(Ordering::SeqCst);
611let tail = self.tail.index.load(Ordering::SeqCst);
612 head >> SHIFT == tail >> SHIFT
613 }
614615/// Returns `true` if the channel is full.
616pub(crate) fn is_full(&self) -> bool {
617false
618}
619}
620621impl<T> Drop for Channel<T> {
622fn drop(&mut self) {
623let mut head = self.head.index.load(Ordering::Relaxed);
624let mut tail = self.tail.index.load(Ordering::Relaxed);
625let mut block = self.head.block.load(Ordering::Relaxed);
626627// Erase the lower bits.
628head &= !((1 << SHIFT) - 1);
629 tail &= !((1 << SHIFT) - 1);
630631unsafe {
632// Drop all messages between head and tail and deallocate the heap-allocated blocks.
633while head != tail {
634let offset = (head >> SHIFT) % LAP;
635636if offset < BLOCK_CAP {
637// Drop the message in the slot.
638let slot = (*block).slots.get_unchecked(offset);
639let p = &mut *slot.msg.get();
640 p.as_mut_ptr().drop_in_place();
641 } else {
642// Deallocate the block and move to the next one.
643let next = (*block).next.load(Ordering::Relaxed);
644 drop(Box::from_raw(block));
645 block = next;
646 }
647648 head = head.wrapping_add(1 << SHIFT);
649 }
650651// Deallocate the last remaining block.
652if !block.is_null() {
653 drop(Box::from_raw(block));
654 }
655 }
656 }
657}