std/sync/mpmc/
mod.rs

1//! Multi-producer, multi-consumer FIFO queue communication primitives.
2//!
3//! This module provides message-based communication over channels, concretely
4//! defined by two types:
5//!
6//! * [`Sender`]
7//! * [`Receiver`]
8//!
9//! [`Sender`]s are used to send data to a set of [`Receiver`]s. Both
10//! sender and receiver are cloneable (multi-producer) such that many threads can send
11//! simultaneously to receivers (multi-consumer).
12//!
13//! These channels come in two flavors:
14//!
15//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
16//!    will return a `(Sender, Receiver)` tuple where all sends will be
17//!    **asynchronous** (they never block). The channel conceptually has an
18//!    infinite buffer.
19//!
20//! 2. A synchronous, bounded channel. The [`sync_channel`] function will
21//!    return a `(Sender, Receiver)` tuple where the storage for pending
22//!    messages is a pre-allocated buffer of a fixed size. All sends will be
23//!    **synchronous** by blocking until there is buffer space available. Note
24//!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
25//!    channel where each sender atomically hands off a message to a receiver.
26//!
27//! [`send`]: Sender::send
28//!
29//! ## Disconnection
30//!
31//! The send and receive operations on channels will all return a [`Result`]
32//! indicating whether the operation succeeded or not. An unsuccessful operation
33//! is normally indicative of the other half of a channel having "hung up" by
34//! being dropped in its corresponding thread.
35//!
36//! Once half of a channel has been deallocated, most operations can no longer
37//! continue to make progress, so [`Err`] will be returned. Many applications
38//! will continue to [`unwrap`] the results returned from this module,
39//! instigating a propagation of failure among threads if one unexpectedly dies.
40//!
41//! [`unwrap`]: Result::unwrap
42//!
43//! # Examples
44//!
45//! Simple usage:
46//!
47//! ```
48//! #![feature(mpmc_channel)]
49//!
50//! use std::thread;
51//! use std::sync::mpmc::channel;
52//!
53//! // Create a simple streaming channel
54//! let (tx, rx) = channel();
55//! thread::spawn(move || {
56//!     tx.send(10).unwrap();
57//! });
58//! assert_eq!(rx.recv().unwrap(), 10);
59//! ```
60//!
61//! Shared usage:
62//!
63//! ```
64//! #![feature(mpmc_channel)]
65//!
66//! use std::thread;
67//! use std::sync::mpmc::channel;
68//!
69//! thread::scope(|s| {
70//!     // Create a shared channel that can be sent along from many threads
71//!     // where tx is the sending half (tx for transmission), and rx is the receiving
72//!     // half (rx for receiving).
73//!     let (tx, rx) = channel();
74//!     for i in 0..10 {
75//!         let tx = tx.clone();
76//!         s.spawn(move || {
77//!             tx.send(i).unwrap();
78//!         });
79//!     }
80//!
81//!     for _ in 0..5 {
82//!         let rx1 = rx.clone();
83//!         let rx2 = rx.clone();
84//!         s.spawn(move || {
85//!             let j = rx1.recv().unwrap();
86//!             assert!(0 <= j && j < 10);
87//!         });
88//!         s.spawn(move || {
89//!             let j = rx2.recv().unwrap();
90//!             assert!(0 <= j && j < 10);
91//!         });
92//!     }
93//! })
94//! ```
95//!
96//! Propagating panics:
97//!
98//! ```
99//! #![feature(mpmc_channel)]
100//!
101//! use std::sync::mpmc::channel;
102//!
103//! // The call to recv() will return an error because the channel has already
104//! // hung up (or been deallocated)
105//! let (tx, rx) = channel::<i32>();
106//! drop(tx);
107//! assert!(rx.recv().is_err());
108//! ```
109
110// This module is used as the implementation for the channels in `sync::mpsc`.
111// The implementation comes from the crossbeam-channel crate:
112//
113// Copyright (c) 2019 The Crossbeam Project Developers
114//
115// Permission is hereby granted, free of charge, to any
116// person obtaining a copy of this software and associated
117// documentation files (the "Software"), to deal in the
118// Software without restriction, including without
119// limitation the rights to use, copy, modify, merge,
120// publish, distribute, sublicense, and/or sell copies of
121// the Software, and to permit persons to whom the Software
122// is furnished to do so, subject to the following
123// conditions:
124//
125// The above copyright notice and this permission notice
126// shall be included in all copies or substantial portions
127// of the Software.
128//
129// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
130// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
131// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
132// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
133// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
134// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
135// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
136// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
137// DEALINGS IN THE SOFTWARE.
138
139mod array;
140mod context;
141mod counter;
142mod error;
143mod list;
144mod select;
145mod utils;
146mod waker;
147mod zero;
148
149pub use error::*;
150
151use crate::fmt;
152use crate::panic::{RefUnwindSafe, UnwindSafe};
153use crate::time::{Duration, Instant};
154
155/// Creates a new asynchronous channel, returning the sender/receiver halves.
156///
157/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
158/// the same order as it was sent, and no [`send`] will block the calling thread
159/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
160/// block after its buffer limit is reached). [`recv`] will block until a message
161/// is available while there is at least one [`Sender`] alive (including clones).
162///
163/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times.
164/// The [`Receiver`] also can be cloned to have multi receivers.
165///
166/// If the [`Receiver`] is disconnected while trying to [`send`] with the
167/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
168/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
169/// return a [`RecvError`].
170///
171/// [`send`]: Sender::send
172/// [`recv`]: Receiver::recv
173///
174/// # Examples
175///
176/// ```
177/// #![feature(mpmc_channel)]
178///
179/// use std::sync::mpmc::channel;
180/// use std::thread;
181///
182/// let (sender, receiver) = channel();
183///
184/// // Spawn off an expensive computation
185/// thread::spawn(move || {
186/// #   fn expensive_computation() {}
187///     sender.send(expensive_computation()).unwrap();
188/// });
189///
190/// // Do some useful work for awhile
191///
192/// // Let's see what that answer was
193/// println!("{:?}", receiver.recv().unwrap());
194/// ```
195#[must_use]
196#[unstable(feature = "mpmc_channel", issue = "126840")]
197pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
198    let (s, r) = counter::new(list::Channel::new());
199    let s = Sender { flavor: SenderFlavor::List(s) };
200    let r = Receiver { flavor: ReceiverFlavor::List(r) };
201    (s, r)
202}
203
204/// Creates a new synchronous, bounded channel.
205///
206/// All data sent on the [`Sender`] will become available on the [`Receiver`]
207/// in the same order as it was sent. Like asynchronous [`channel`]s, the
208/// [`Receiver`] will block until a message becomes available. `sync_channel`
209/// differs greatly in the semantics of the sender, however.
210///
211/// This channel has an internal buffer on which messages will be queued.
212/// `bound` specifies the buffer size. When the internal buffer becomes full,
213/// future sends will *block* waiting for the buffer to open up. Note that a
214/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
215/// where each [`send`] will not return until a [`recv`] is paired with it.
216///
217/// The [`Sender`] can be cloned to [`send`] to the same channel multiple
218/// times. The [`Receiver`] also can be cloned to have multi receivers.
219///
220/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
221/// to [`send`] with the [`Sender`], the [`send`] method will return a
222/// [`SendError`]. Similarly, If the [`Sender`] is disconnected while trying
223/// to [`recv`], the [`recv`] method will return a [`RecvError`].
224///
225/// [`send`]: Sender::send
226/// [`recv`]: Receiver::recv
227///
228/// # Examples
229///
230/// ```
231/// use std::sync::mpsc::sync_channel;
232/// use std::thread;
233///
234/// let (sender, receiver) = sync_channel(1);
235///
236/// // this returns immediately
237/// sender.send(1).unwrap();
238///
239/// thread::spawn(move || {
240///     // this will block until the previous message has been received
241///     sender.send(2).unwrap();
242/// });
243///
244/// assert_eq!(receiver.recv().unwrap(), 1);
245/// assert_eq!(receiver.recv().unwrap(), 2);
246/// ```
247#[must_use]
248#[unstable(feature = "mpmc_channel", issue = "126840")]
249pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
250    if cap == 0 {
251        let (s, r) = counter::new(zero::Channel::new());
252        let s = Sender { flavor: SenderFlavor::Zero(s) };
253        let r = Receiver { flavor: ReceiverFlavor::Zero(r) };
254        (s, r)
255    } else {
256        let (s, r) = counter::new(array::Channel::with_capacity(cap));
257        let s = Sender { flavor: SenderFlavor::Array(s) };
258        let r = Receiver { flavor: ReceiverFlavor::Array(r) };
259        (s, r)
260    }
261}
262
263/// The sending-half of Rust's synchronous [`channel`] type.
264///
265/// Messages can be sent through this channel with [`send`].
266///
267/// Note: all senders (the original and its clones) need to be dropped for the receiver
268/// to stop blocking to receive messages with [`Receiver::recv`].
269///
270/// [`send`]: Sender::send
271///
272/// # Examples
273///
274/// ```rust
275/// #![feature(mpmc_channel)]
276///
277/// use std::sync::mpmc::channel;
278/// use std::thread;
279///
280/// let (sender, receiver) = channel();
281/// let sender2 = sender.clone();
282///
283/// // First thread owns sender
284/// thread::spawn(move || {
285///     sender.send(1).unwrap();
286/// });
287///
288/// // Second thread owns sender2
289/// thread::spawn(move || {
290///     sender2.send(2).unwrap();
291/// });
292///
293/// let msg = receiver.recv().unwrap();
294/// let msg2 = receiver.recv().unwrap();
295///
296/// assert_eq!(3, msg + msg2);
297/// ```
298#[unstable(feature = "mpmc_channel", issue = "126840")]
299pub struct Sender<T> {
300    flavor: SenderFlavor<T>,
301}
302
303/// Sender flavors.
304enum SenderFlavor<T> {
305    /// Bounded channel based on a preallocated array.
306    Array(counter::Sender<array::Channel<T>>),
307
308    /// Unbounded channel implemented as a linked list.
309    List(counter::Sender<list::Channel<T>>),
310
311    /// Zero-capacity channel.
312    Zero(counter::Sender<zero::Channel<T>>),
313}
314
315#[unstable(feature = "mpmc_channel", issue = "126840")]
316unsafe impl<T: Send> Send for Sender<T> {}
317#[unstable(feature = "mpmc_channel", issue = "126840")]
318unsafe impl<T: Send> Sync for Sender<T> {}
319
320#[unstable(feature = "mpmc_channel", issue = "126840")]
321impl<T> UnwindSafe for Sender<T> {}
322#[unstable(feature = "mpmc_channel", issue = "126840")]
323impl<T> RefUnwindSafe for Sender<T> {}
324
325impl<T> Sender<T> {
326    /// Attempts to send a message into the channel without blocking.
327    ///
328    /// This method will either send a message into the channel immediately or return an error if
329    /// the channel is full or disconnected. The returned error contains the original message.
330    ///
331    /// If called on a zero-capacity channel, this method will send the message only if there
332    /// happens to be a receive operation on the other side of the channel at the same time.
333    ///
334    /// # Examples
335    ///
336    /// ```rust
337    /// #![feature(mpmc_channel)]
338    ///
339    /// use std::sync::mpmc::{channel, Receiver, Sender};
340    ///
341    /// let (sender, _receiver): (Sender<i32>, Receiver<i32>) = channel();
342    ///
343    /// assert!(sender.try_send(1).is_ok());
344    /// ```
345    #[unstable(feature = "mpmc_channel", issue = "126840")]
346    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
347        match &self.flavor {
348            SenderFlavor::Array(chan) => chan.try_send(msg),
349            SenderFlavor::List(chan) => chan.try_send(msg),
350            SenderFlavor::Zero(chan) => chan.try_send(msg),
351        }
352    }
353
354    /// Attempts to send a value on this channel, returning it back if it could
355    /// not be sent.
356    ///
357    /// A successful send occurs when it is determined that the other end of
358    /// the channel has not hung up already. An unsuccessful send would be one
359    /// where the corresponding receiver has already been deallocated. Note
360    /// that a return value of [`Err`] means that the data will never be
361    /// received, but a return value of [`Ok`] does *not* mean that the data
362    /// will be received. It is possible for the corresponding receiver to
363    /// hang up immediately after this function returns [`Ok`]. However, if
364    /// the channel is zero-capacity, it acts as a rendezvous channel and a
365    /// return value of [`Ok`] means that the data has been received.
366    ///
367    /// If the channel is full and not disconnected, this call will block until
368    /// the send operation can proceed. If the channel becomes disconnected,
369    /// this call will wake up and return an error. The returned error contains
370    /// the original message.
371    ///
372    /// If called on a zero-capacity channel, this method will wait for a receive
373    /// operation to appear on the other side of the channel.
374    ///
375    /// # Examples
376    ///
377    /// ```
378    /// #![feature(mpmc_channel)]
379    ///
380    /// use std::sync::mpmc::channel;
381    ///
382    /// let (tx, rx) = channel();
383    ///
384    /// // This send is always successful
385    /// tx.send(1).unwrap();
386    ///
387    /// // This send will fail because the receiver is gone
388    /// drop(rx);
389    /// assert!(tx.send(1).is_err());
390    /// ```
391    #[unstable(feature = "mpmc_channel", issue = "126840")]
392    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
393        match &self.flavor {
394            SenderFlavor::Array(chan) => chan.send(msg, None),
395            SenderFlavor::List(chan) => chan.send(msg, None),
396            SenderFlavor::Zero(chan) => chan.send(msg, None),
397        }
398        .map_err(|err| match err {
399            SendTimeoutError::Disconnected(msg) => SendError(msg),
400            SendTimeoutError::Timeout(_) => unreachable!(),
401        })
402    }
403}
404
405impl<T> Sender<T> {
406    /// Waits for a message to be sent into the channel, but only for a limited time.
407    ///
408    /// If the channel is full and not disconnected, this call will block until the send operation
409    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
410    /// wake up and return an error. The returned error contains the original message.
411    ///
412    /// If called on a zero-capacity channel, this method will wait for a receive operation to
413    /// appear on the other side of the channel.
414    ///
415    /// # Examples
416    ///
417    /// ```
418    /// #![feature(mpmc_channel)]
419    ///
420    /// use std::sync::mpmc::channel;
421    /// use std::time::Duration;
422    ///
423    /// let (tx, rx) = channel();
424    ///
425    /// tx.send_timeout(1, Duration::from_millis(400)).unwrap();
426    /// ```
427    #[unstable(feature = "mpmc_channel", issue = "126840")]
428    pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
429        match Instant::now().checked_add(timeout) {
430            Some(deadline) => self.send_deadline(msg, deadline),
431            // So far in the future that it's practically the same as waiting indefinitely.
432            None => self.send(msg).map_err(SendTimeoutError::from),
433        }
434    }
435
436    /// Waits for a message to be sent into the channel, but only until a given deadline.
437    ///
438    /// If the channel is full and not disconnected, this call will block until the send operation
439    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
440    /// wake up and return an error. The returned error contains the original message.
441    ///
442    /// If called on a zero-capacity channel, this method will wait for a receive operation to
443    /// appear on the other side of the channel.
444    ///
445    /// # Examples
446    ///
447    /// ```
448    /// #![feature(mpmc_channel)]
449    ///
450    /// use std::sync::mpmc::channel;
451    /// use std::time::{Duration, Instant};
452    ///
453    /// let (tx, rx) = channel();
454    ///
455    /// let t = Instant::now() + Duration::from_millis(400);
456    /// tx.send_deadline(1, t).unwrap();
457    /// ```
458    #[unstable(feature = "mpmc_channel", issue = "126840")]
459    pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
460        match &self.flavor {
461            SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
462            SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
463            SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
464        }
465    }
466
467    /// Returns `true` if the channel is empty.
468    ///
469    /// Note: Zero-capacity channels are always empty.
470    ///
471    /// # Examples
472    ///
473    /// ```
474    /// #![feature(mpmc_channel)]
475    ///
476    /// use std::sync::mpmc;
477    /// use std::thread;
478    ///
479    /// let (send, _recv) = mpmc::channel();
480    ///
481    /// let tx1 = send.clone();
482    /// let tx2 = send.clone();
483    ///
484    /// assert!(tx1.is_empty());
485    ///
486    /// let handle = thread::spawn(move || {
487    ///     tx2.send(1u8).unwrap();
488    /// });
489    ///
490    /// handle.join().unwrap();
491    ///
492    /// assert!(!tx1.is_empty());
493    /// ```
494    #[unstable(feature = "mpmc_channel", issue = "126840")]
495    pub fn is_empty(&self) -> bool {
496        match &self.flavor {
497            SenderFlavor::Array(chan) => chan.is_empty(),
498            SenderFlavor::List(chan) => chan.is_empty(),
499            SenderFlavor::Zero(chan) => chan.is_empty(),
500        }
501    }
502
503    /// Returns `true` if the channel is full.
504    ///
505    /// Note: Zero-capacity channels are always full.
506    ///
507    /// # Examples
508    ///
509    /// ```
510    /// #![feature(mpmc_channel)]
511    ///
512    /// use std::sync::mpmc;
513    /// use std::thread;
514    ///
515    /// let (send, _recv) = mpmc::sync_channel(1);
516    ///
517    /// let (tx1, tx2) = (send.clone(), send.clone());
518    /// assert!(!tx1.is_full());
519    ///
520    /// let handle = thread::spawn(move || {
521    ///     tx2.send(1u8).unwrap();
522    /// });
523    ///
524    /// handle.join().unwrap();
525    ///
526    /// assert!(tx1.is_full());
527    /// ```
528    #[unstable(feature = "mpmc_channel", issue = "126840")]
529    pub fn is_full(&self) -> bool {
530        match &self.flavor {
531            SenderFlavor::Array(chan) => chan.is_full(),
532            SenderFlavor::List(chan) => chan.is_full(),
533            SenderFlavor::Zero(chan) => chan.is_full(),
534        }
535    }
536
537    /// Returns the number of messages in the channel.
538    ///
539    /// # Examples
540    ///
541    /// ```
542    /// #![feature(mpmc_channel)]
543    ///
544    /// use std::sync::mpmc;
545    /// use std::thread;
546    ///
547    /// let (send, _recv) = mpmc::channel();
548    /// let (tx1, tx2) = (send.clone(), send.clone());
549    ///
550    /// assert_eq!(tx1.len(), 0);
551    ///
552    /// let handle = thread::spawn(move || {
553    ///     tx2.send(1u8).unwrap();
554    /// });
555    ///
556    /// handle.join().unwrap();
557    ///
558    /// assert_eq!(tx1.len(), 1);
559    /// ```
560    #[unstable(feature = "mpmc_channel", issue = "126840")]
561    pub fn len(&self) -> usize {
562        match &self.flavor {
563            SenderFlavor::Array(chan) => chan.len(),
564            SenderFlavor::List(chan) => chan.len(),
565            SenderFlavor::Zero(chan) => chan.len(),
566        }
567    }
568
569    /// If the channel is bounded, returns its capacity.
570    ///
571    /// # Examples
572    ///
573    /// ```
574    /// #![feature(mpmc_channel)]
575    ///
576    /// use std::sync::mpmc;
577    /// use std::thread;
578    ///
579    /// let (send, _recv) = mpmc::sync_channel(3);
580    /// let (tx1, tx2) = (send.clone(), send.clone());
581    ///
582    /// assert_eq!(tx1.capacity(), Some(3));
583    ///
584    /// let handle = thread::spawn(move || {
585    ///     tx2.send(1u8).unwrap();
586    /// });
587    ///
588    /// handle.join().unwrap();
589    ///
590    /// assert_eq!(tx1.capacity(), Some(3));
591    /// ```
592    #[unstable(feature = "mpmc_channel", issue = "126840")]
593    pub fn capacity(&self) -> Option<usize> {
594        match &self.flavor {
595            SenderFlavor::Array(chan) => chan.capacity(),
596            SenderFlavor::List(chan) => chan.capacity(),
597            SenderFlavor::Zero(chan) => chan.capacity(),
598        }
599    }
600
601    /// Returns `true` if senders belong to the same channel.
602    ///
603    /// # Examples
604    ///
605    /// ```
606    /// #![feature(mpmc_channel)]
607    ///
608    /// use std::sync::mpmc;
609    ///
610    /// let (tx1, _) = mpmc::channel::<i32>();
611    /// let (tx2, _) = mpmc::channel::<i32>();
612    ///
613    /// assert!(tx1.same_channel(&tx1));
614    /// assert!(!tx1.same_channel(&tx2));
615    /// ```
616    #[unstable(feature = "mpmc_channel", issue = "126840")]
617    pub fn same_channel(&self, other: &Sender<T>) -> bool {
618        match (&self.flavor, &other.flavor) {
619            (SenderFlavor::Array(a), SenderFlavor::Array(b)) => a == b,
620            (SenderFlavor::List(a), SenderFlavor::List(b)) => a == b,
621            (SenderFlavor::Zero(a), SenderFlavor::Zero(b)) => a == b,
622            _ => false,
623        }
624    }
625}
626
627#[unstable(feature = "mpmc_channel", issue = "126840")]
628impl<T> Drop for Sender<T> {
629    fn drop(&mut self) {
630        unsafe {
631            match &self.flavor {
632                SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()),
633                SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
634                SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
635            }
636        }
637    }
638}
639
640#[unstable(feature = "mpmc_channel", issue = "126840")]
641impl<T> Clone for Sender<T> {
642    fn clone(&self) -> Self {
643        let flavor = match &self.flavor {
644            SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
645            SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
646            SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
647        };
648
649        Sender { flavor }
650    }
651}
652
653#[unstable(feature = "mpmc_channel", issue = "126840")]
654impl<T> fmt::Debug for Sender<T> {
655    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
656        f.pad("Sender { .. }")
657    }
658}
659
660/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
661/// Different threads can share this [`Receiver`] by cloning it.
662///
663/// Messages sent to the channel can be retrieved using [`recv`].
664///
665/// [`recv`]: Receiver::recv
666///
667/// # Examples
668///
669/// ```rust
670/// #![feature(mpmc_channel)]
671///
672/// use std::sync::mpmc::channel;
673/// use std::thread;
674/// use std::time::Duration;
675///
676/// let (send, recv) = channel();
677///
678/// let tx_thread = thread::spawn(move || {
679///     send.send("Hello world!").unwrap();
680///     thread::sleep(Duration::from_secs(2)); // block for two seconds
681///     send.send("Delayed for 2 seconds").unwrap();
682/// });
683///
684/// let (rx1, rx2) = (recv.clone(), recv.clone());
685/// let rx_thread_1 = thread::spawn(move || {
686///     println!("{}", rx1.recv().unwrap()); // Received immediately
687/// });
688/// let rx_thread_2 = thread::spawn(move || {
689///     println!("{}", rx2.recv().unwrap()); // Received after 2 seconds
690/// });
691///
692/// tx_thread.join().unwrap();
693/// rx_thread_1.join().unwrap();
694/// rx_thread_2.join().unwrap();
695/// ```
696#[unstable(feature = "mpmc_channel", issue = "126840")]
697pub struct Receiver<T> {
698    flavor: ReceiverFlavor<T>,
699}
700
701/// An iterator over messages on a [`Receiver`], created by [`iter`].
702///
703/// This iterator will block whenever [`next`] is called,
704/// waiting for a new message, and [`None`] will be returned
705/// when the corresponding channel has hung up.
706///
707/// [`iter`]: Receiver::iter
708/// [`next`]: Iterator::next
709///
710/// # Examples
711///
712/// ```rust
713/// #![feature(mpmc_channel)]
714///
715/// use std::sync::mpmc::channel;
716/// use std::thread;
717///
718/// let (send, recv) = channel();
719///
720/// thread::spawn(move || {
721///     send.send(1u8).unwrap();
722///     send.send(2u8).unwrap();
723///     send.send(3u8).unwrap();
724/// });
725///
726/// for x in recv.iter() {
727///     println!("Got: {x}");
728/// }
729/// ```
730#[unstable(feature = "mpmc_channel", issue = "126840")]
731#[derive(Debug)]
732pub struct Iter<'a, T: 'a> {
733    rx: &'a Receiver<T>,
734}
735
736/// An iterator that attempts to yield all pending values for a [`Receiver`],
737/// created by [`try_iter`].
738///
739/// [`None`] will be returned when there are no pending values remaining or
740/// if the corresponding channel has hung up.
741///
742/// This iterator will never block the caller in order to wait for data to
743/// become available. Instead, it will return [`None`].
744///
745/// [`try_iter`]: Receiver::try_iter
746///
747/// # Examples
748///
749/// ```rust
750/// #![feature(mpmc_channel)]
751///
752/// use std::sync::mpmc::channel;
753/// use std::thread;
754/// use std::time::Duration;
755///
756/// let (sender, receiver) = channel();
757///
758/// // Nothing is in the buffer yet
759/// assert!(receiver.try_iter().next().is_none());
760/// println!("Nothing in the buffer...");
761///
762/// thread::spawn(move || {
763///     sender.send(1).unwrap();
764///     sender.send(2).unwrap();
765///     sender.send(3).unwrap();
766/// });
767///
768/// println!("Going to sleep...");
769/// thread::sleep(Duration::from_secs(2)); // block for two seconds
770///
771/// for x in receiver.try_iter() {
772///     println!("Got: {x}");
773/// }
774/// ```
775#[unstable(feature = "mpmc_channel", issue = "126840")]
776#[derive(Debug)]
777pub struct TryIter<'a, T: 'a> {
778    rx: &'a Receiver<T>,
779}
780
781/// An owning iterator over messages on a [`Receiver`],
782/// created by [`into_iter`].
783///
784/// This iterator will block whenever [`next`]
785/// is called, waiting for a new message, and [`None`] will be
786/// returned if the corresponding channel has hung up.
787///
788/// [`into_iter`]: Receiver::into_iter
789/// [`next`]: Iterator::next
790///
791/// # Examples
792///
793/// ```rust
794/// #![feature(mpmc_channel)]
795///
796/// use std::sync::mpmc::channel;
797/// use std::thread;
798///
799/// let (send, recv) = channel();
800///
801/// thread::spawn(move || {
802///     send.send(1u8).unwrap();
803///     send.send(2u8).unwrap();
804///     send.send(3u8).unwrap();
805/// });
806///
807/// for x in recv.into_iter() {
808///     println!("Got: {x}");
809/// }
810/// ```
811#[unstable(feature = "mpmc_channel", issue = "126840")]
812#[derive(Debug)]
813pub struct IntoIter<T> {
814    rx: Receiver<T>,
815}
816
817#[unstable(feature = "mpmc_channel", issue = "126840")]
818impl<'a, T> Iterator for Iter<'a, T> {
819    type Item = T;
820
821    fn next(&mut self) -> Option<T> {
822        self.rx.recv().ok()
823    }
824}
825
826#[unstable(feature = "mpmc_channel", issue = "126840")]
827impl<'a, T> Iterator for TryIter<'a, T> {
828    type Item = T;
829
830    fn next(&mut self) -> Option<T> {
831        self.rx.try_recv().ok()
832    }
833}
834
835#[unstable(feature = "mpmc_channel", issue = "126840")]
836impl<'a, T> IntoIterator for &'a Receiver<T> {
837    type Item = T;
838    type IntoIter = Iter<'a, T>;
839
840    fn into_iter(self) -> Iter<'a, T> {
841        self.iter()
842    }
843}
844
845#[unstable(feature = "mpmc_channel", issue = "126840")]
846impl<T> Iterator for IntoIter<T> {
847    type Item = T;
848    fn next(&mut self) -> Option<T> {
849        self.rx.recv().ok()
850    }
851}
852
853#[unstable(feature = "mpmc_channel", issue = "126840")]
854impl<T> IntoIterator for Receiver<T> {
855    type Item = T;
856    type IntoIter = IntoIter<T>;
857
858    fn into_iter(self) -> IntoIter<T> {
859        IntoIter { rx: self }
860    }
861}
862
863/// Receiver flavors.
864enum ReceiverFlavor<T> {
865    /// Bounded channel based on a preallocated array.
866    Array(counter::Receiver<array::Channel<T>>),
867
868    /// Unbounded channel implemented as a linked list.
869    List(counter::Receiver<list::Channel<T>>),
870
871    /// Zero-capacity channel.
872    Zero(counter::Receiver<zero::Channel<T>>),
873}
874
875#[unstable(feature = "mpmc_channel", issue = "126840")]
876unsafe impl<T: Send> Send for Receiver<T> {}
877#[unstable(feature = "mpmc_channel", issue = "126840")]
878unsafe impl<T: Send> Sync for Receiver<T> {}
879
880#[unstable(feature = "mpmc_channel", issue = "126840")]
881impl<T> UnwindSafe for Receiver<T> {}
882#[unstable(feature = "mpmc_channel", issue = "126840")]
883impl<T> RefUnwindSafe for Receiver<T> {}
884
885impl<T> Receiver<T> {
886    /// Attempts to receive a message from the channel without blocking.
887    ///
888    /// This method will never block the caller in order to wait for data to
889    /// become available. Instead, this will always return immediately with a
890    /// possible option of pending data on the channel.
891    ///
892    /// If called on a zero-capacity channel, this method will receive a message only if there
893    /// happens to be a send operation on the other side of the channel at the same time.
894    ///
895    /// This is useful for a flavor of "optimistic check" before deciding to
896    /// block on a receiver.
897    ///
898    /// Compared with [`recv`], this function has two failure cases instead of one
899    /// (one for disconnection, one for an empty buffer).
900    ///
901    /// [`recv`]: Self::recv
902    ///
903    /// # Examples
904    ///
905    /// ```rust
906    /// #![feature(mpmc_channel)]
907    ///
908    /// use std::sync::mpmc::{Receiver, channel};
909    ///
910    /// let (_, receiver): (_, Receiver<i32>) = channel();
911    ///
912    /// assert!(receiver.try_recv().is_err());
913    /// ```
914    #[unstable(feature = "mpmc_channel", issue = "126840")]
915    pub fn try_recv(&self) -> Result<T, TryRecvError> {
916        match &self.flavor {
917            ReceiverFlavor::Array(chan) => chan.try_recv(),
918            ReceiverFlavor::List(chan) => chan.try_recv(),
919            ReceiverFlavor::Zero(chan) => chan.try_recv(),
920        }
921    }
922
923    /// Attempts to wait for a value on this receiver, returning an error if the
924    /// corresponding channel has hung up.
925    ///
926    /// This function will always block the current thread if there is no data
927    /// available and it's possible for more data to be sent (at least one sender
928    /// still exists). Once a message is sent to the corresponding [`Sender`],
929    /// this receiver will wake up and return that message.
930    ///
931    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
932    /// this call is blocking, this call will wake up and return [`Err`] to
933    /// indicate that no more messages can ever be received on this channel.
934    /// However, since channels are buffered, messages sent before the disconnect
935    /// will still be properly received.
936    ///
937    /// # Examples
938    ///
939    /// ```
940    /// #![feature(mpmc_channel)]
941    ///
942    /// use std::sync::mpmc;
943    /// use std::thread;
944    ///
945    /// let (send, recv) = mpmc::channel();
946    /// let handle = thread::spawn(move || {
947    ///     send.send(1u8).unwrap();
948    /// });
949    ///
950    /// handle.join().unwrap();
951    ///
952    /// assert_eq!(Ok(1), recv.recv());
953    /// ```
954    ///
955    /// Buffering behavior:
956    ///
957    /// ```
958    /// #![feature(mpmc_channel)]
959    ///
960    /// use std::sync::mpmc;
961    /// use std::thread;
962    /// use std::sync::mpmc::RecvError;
963    ///
964    /// let (send, recv) = mpmc::channel();
965    /// let handle = thread::spawn(move || {
966    ///     send.send(1u8).unwrap();
967    ///     send.send(2).unwrap();
968    ///     send.send(3).unwrap();
969    ///     drop(send);
970    /// });
971    ///
972    /// // wait for the thread to join so we ensure the sender is dropped
973    /// handle.join().unwrap();
974    ///
975    /// assert_eq!(Ok(1), recv.recv());
976    /// assert_eq!(Ok(2), recv.recv());
977    /// assert_eq!(Ok(3), recv.recv());
978    /// assert_eq!(Err(RecvError), recv.recv());
979    /// ```
980    #[unstable(feature = "mpmc_channel", issue = "126840")]
981    pub fn recv(&self) -> Result<T, RecvError> {
982        match &self.flavor {
983            ReceiverFlavor::Array(chan) => chan.recv(None),
984            ReceiverFlavor::List(chan) => chan.recv(None),
985            ReceiverFlavor::Zero(chan) => chan.recv(None),
986        }
987        .map_err(|_| RecvError)
988    }
989
990    /// Attempts to wait for a value on this receiver, returning an error if the
991    /// corresponding channel has hung up, or if it waits more than `timeout`.
992    ///
993    /// This function will always block the current thread if there is no data
994    /// available and it's possible for more data to be sent (at least one sender
995    /// still exists). Once a message is sent to the corresponding [`Sender`],
996    /// this receiver will wake up and return that message.
997    ///
998    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
999    /// this call is blocking, this call will wake up and return [`Err`] to
1000    /// indicate that no more messages can ever be received on this channel.
1001    /// However, since channels are buffered, messages sent before the disconnect
1002    /// will still be properly received.
1003    ///
1004    /// # Examples
1005    ///
1006    /// Successfully receiving value before encountering timeout:
1007    ///
1008    /// ```no_run
1009    /// #![feature(mpmc_channel)]
1010    ///
1011    /// use std::thread;
1012    /// use std::time::Duration;
1013    /// use std::sync::mpmc;
1014    ///
1015    /// let (send, recv) = mpmc::channel();
1016    ///
1017    /// thread::spawn(move || {
1018    ///     send.send('a').unwrap();
1019    /// });
1020    ///
1021    /// assert_eq!(
1022    ///     recv.recv_timeout(Duration::from_millis(400)),
1023    ///     Ok('a')
1024    /// );
1025    /// ```
1026    ///
1027    /// Receiving an error upon reaching timeout:
1028    ///
1029    /// ```no_run
1030    /// #![feature(mpmc_channel)]
1031    ///
1032    /// use std::thread;
1033    /// use std::time::Duration;
1034    /// use std::sync::mpmc;
1035    ///
1036    /// let (send, recv) = mpmc::channel();
1037    ///
1038    /// thread::spawn(move || {
1039    ///     thread::sleep(Duration::from_millis(800));
1040    ///     send.send('a').unwrap();
1041    /// });
1042    ///
1043    /// assert_eq!(
1044    ///     recv.recv_timeout(Duration::from_millis(400)),
1045    ///     Err(mpmc::RecvTimeoutError::Timeout)
1046    /// );
1047    /// ```
1048    #[unstable(feature = "mpmc_channel", issue = "126840")]
1049    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1050        match Instant::now().checked_add(timeout) {
1051            Some(deadline) => self.recv_deadline(deadline),
1052            // So far in the future that it's practically the same as waiting indefinitely.
1053            None => self.recv().map_err(RecvTimeoutError::from),
1054        }
1055    }
1056
1057    /// Attempts to wait for a value on this receiver, returning an error if the
1058    /// corresponding channel has hung up, or if `deadline` is reached.
1059    ///
1060    /// This function will always block the current thread if there is no data
1061    /// available and it's possible for more data to be sent. Once a message is
1062    /// sent to the corresponding [`Sender`], then this receiver will wake up
1063    /// and return that message.
1064    ///
1065    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1066    /// this call is blocking, this call will wake up and return [`Err`] to
1067    /// indicate that no more messages can ever be received on this channel.
1068    /// However, since channels are buffered, messages sent before the disconnect
1069    /// will still be properly received.
1070    ///
1071    /// # Examples
1072    ///
1073    /// Successfully receiving value before reaching deadline:
1074    ///
1075    /// ```no_run
1076    /// #![feature(mpmc_channel)]
1077    ///
1078    /// use std::thread;
1079    /// use std::time::{Duration, Instant};
1080    /// use std::sync::mpmc;
1081    ///
1082    /// let (send, recv) = mpmc::channel();
1083    ///
1084    /// thread::spawn(move || {
1085    ///     send.send('a').unwrap();
1086    /// });
1087    ///
1088    /// assert_eq!(
1089    ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1090    ///     Ok('a')
1091    /// );
1092    /// ```
1093    ///
1094    /// Receiving an error upon reaching deadline:
1095    ///
1096    /// ```no_run
1097    /// #![feature(mpmc_channel)]
1098    ///
1099    /// use std::thread;
1100    /// use std::time::{Duration, Instant};
1101    /// use std::sync::mpmc;
1102    ///
1103    /// let (send, recv) = mpmc::channel();
1104    ///
1105    /// thread::spawn(move || {
1106    ///     thread::sleep(Duration::from_millis(800));
1107    ///     send.send('a').unwrap();
1108    /// });
1109    ///
1110    /// assert_eq!(
1111    ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1112    ///     Err(mpmc::RecvTimeoutError::Timeout)
1113    /// );
1114    /// ```
1115    #[unstable(feature = "mpmc_channel", issue = "126840")]
1116    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1117        match &self.flavor {
1118            ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
1119            ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
1120            ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
1121        }
1122    }
1123
1124    /// Returns an iterator that will attempt to yield all pending values.
1125    /// It will return `None` if there are no more pending values or if the
1126    /// channel has hung up. The iterator will never [`panic!`] or block the
1127    /// user by waiting for values.
1128    ///
1129    /// # Examples
1130    ///
1131    /// ```no_run
1132    /// #![feature(mpmc_channel)]
1133    ///
1134    /// use std::sync::mpmc::channel;
1135    /// use std::thread;
1136    /// use std::time::Duration;
1137    ///
1138    /// let (sender, receiver) = channel();
1139    ///
1140    /// // nothing is in the buffer yet
1141    /// assert!(receiver.try_iter().next().is_none());
1142    ///
1143    /// thread::spawn(move || {
1144    ///     thread::sleep(Duration::from_secs(1));
1145    ///     sender.send(1).unwrap();
1146    ///     sender.send(2).unwrap();
1147    ///     sender.send(3).unwrap();
1148    /// });
1149    ///
1150    /// // nothing is in the buffer yet
1151    /// assert!(receiver.try_iter().next().is_none());
1152    ///
1153    /// // block for two seconds
1154    /// thread::sleep(Duration::from_secs(2));
1155    ///
1156    /// let mut iter = receiver.try_iter();
1157    /// assert_eq!(iter.next(), Some(1));
1158    /// assert_eq!(iter.next(), Some(2));
1159    /// assert_eq!(iter.next(), Some(3));
1160    /// assert_eq!(iter.next(), None);
1161    /// ```
1162    #[unstable(feature = "mpmc_channel", issue = "126840")]
1163    pub fn try_iter(&self) -> TryIter<'_, T> {
1164        TryIter { rx: self }
1165    }
1166}
1167
1168impl<T> Receiver<T> {
1169    /// Returns `true` if the channel is empty.
1170    ///
1171    /// Note: Zero-capacity channels are always empty.
1172    ///
1173    /// # Examples
1174    ///
1175    /// ```
1176    /// #![feature(mpmc_channel)]
1177    ///
1178    /// use std::sync::mpmc;
1179    /// use std::thread;
1180    ///
1181    /// let (send, recv) = mpmc::channel();
1182    ///
1183    /// assert!(recv.is_empty());
1184    ///
1185    /// let handle = thread::spawn(move || {
1186    ///     send.send(1u8).unwrap();
1187    /// });
1188    ///
1189    /// handle.join().unwrap();
1190    ///
1191    /// assert!(!recv.is_empty());
1192    /// ```
1193    #[unstable(feature = "mpmc_channel", issue = "126840")]
1194    pub fn is_empty(&self) -> bool {
1195        match &self.flavor {
1196            ReceiverFlavor::Array(chan) => chan.is_empty(),
1197            ReceiverFlavor::List(chan) => chan.is_empty(),
1198            ReceiverFlavor::Zero(chan) => chan.is_empty(),
1199        }
1200    }
1201
1202    /// Returns `true` if the channel is full.
1203    ///
1204    /// Note: Zero-capacity channels are always full.
1205    ///
1206    /// # Examples
1207    ///
1208    /// ```
1209    /// #![feature(mpmc_channel)]
1210    ///
1211    /// use std::sync::mpmc;
1212    /// use std::thread;
1213    ///
1214    /// let (send, recv) = mpmc::sync_channel(1);
1215    ///
1216    /// assert!(!recv.is_full());
1217    ///
1218    /// let handle = thread::spawn(move || {
1219    ///     send.send(1u8).unwrap();
1220    /// });
1221    ///
1222    /// handle.join().unwrap();
1223    ///
1224    /// assert!(recv.is_full());
1225    /// ```
1226    #[unstable(feature = "mpmc_channel", issue = "126840")]
1227    pub fn is_full(&self) -> bool {
1228        match &self.flavor {
1229            ReceiverFlavor::Array(chan) => chan.is_full(),
1230            ReceiverFlavor::List(chan) => chan.is_full(),
1231            ReceiverFlavor::Zero(chan) => chan.is_full(),
1232        }
1233    }
1234
1235    /// Returns the number of messages in the channel.
1236    ///
1237    /// # Examples
1238    ///
1239    /// ```
1240    /// #![feature(mpmc_channel)]
1241    ///
1242    /// use std::sync::mpmc;
1243    /// use std::thread;
1244    ///
1245    /// let (send, recv) = mpmc::channel();
1246    ///
1247    /// assert_eq!(recv.len(), 0);
1248    ///
1249    /// let handle = thread::spawn(move || {
1250    ///     send.send(1u8).unwrap();
1251    /// });
1252    ///
1253    /// handle.join().unwrap();
1254    ///
1255    /// assert_eq!(recv.len(), 1);
1256    /// ```
1257    #[unstable(feature = "mpmc_channel", issue = "126840")]
1258    pub fn len(&self) -> usize {
1259        match &self.flavor {
1260            ReceiverFlavor::Array(chan) => chan.len(),
1261            ReceiverFlavor::List(chan) => chan.len(),
1262            ReceiverFlavor::Zero(chan) => chan.len(),
1263        }
1264    }
1265
1266    /// If the channel is bounded, returns its capacity.
1267    ///
1268    /// # Examples
1269    ///
1270    /// ```
1271    /// #![feature(mpmc_channel)]
1272    ///
1273    /// use std::sync::mpmc;
1274    /// use std::thread;
1275    ///
1276    /// let (send, recv) = mpmc::sync_channel(3);
1277    ///
1278    /// assert_eq!(recv.capacity(), Some(3));
1279    ///
1280    /// let handle = thread::spawn(move || {
1281    ///     send.send(1u8).unwrap();
1282    /// });
1283    ///
1284    /// handle.join().unwrap();
1285    ///
1286    /// assert_eq!(recv.capacity(), Some(3));
1287    /// ```
1288    #[unstable(feature = "mpmc_channel", issue = "126840")]
1289    pub fn capacity(&self) -> Option<usize> {
1290        match &self.flavor {
1291            ReceiverFlavor::Array(chan) => chan.capacity(),
1292            ReceiverFlavor::List(chan) => chan.capacity(),
1293            ReceiverFlavor::Zero(chan) => chan.capacity(),
1294        }
1295    }
1296
1297    /// Returns `true` if receivers belong to the same channel.
1298    ///
1299    /// # Examples
1300    ///
1301    /// ```
1302    /// #![feature(mpmc_channel)]
1303    ///
1304    /// use std::sync::mpmc;
1305    ///
1306    /// let (_, rx1) = mpmc::channel::<i32>();
1307    /// let (_, rx2) = mpmc::channel::<i32>();
1308    ///
1309    /// assert!(rx1.same_channel(&rx1));
1310    /// assert!(!rx1.same_channel(&rx2));
1311    /// ```
1312    #[unstable(feature = "mpmc_channel", issue = "126840")]
1313    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1314        match (&self.flavor, &other.flavor) {
1315            (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1316            (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1317            (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1318            _ => false,
1319        }
1320    }
1321
1322    /// Returns an iterator that will block waiting for messages, but never
1323    /// [`panic!`]. It will return [`None`] when the channel has hung up.
1324    ///
1325    /// # Examples
1326    ///
1327    /// ```rust
1328    /// #![feature(mpmc_channel)]
1329    ///
1330    /// use std::sync::mpmc::channel;
1331    /// use std::thread;
1332    ///
1333    /// let (send, recv) = channel();
1334    ///
1335    /// thread::spawn(move || {
1336    ///     send.send(1).unwrap();
1337    ///     send.send(2).unwrap();
1338    ///     send.send(3).unwrap();
1339    /// });
1340    ///
1341    /// let mut iter = recv.iter();
1342    /// assert_eq!(iter.next(), Some(1));
1343    /// assert_eq!(iter.next(), Some(2));
1344    /// assert_eq!(iter.next(), Some(3));
1345    /// assert_eq!(iter.next(), None);
1346    /// ```
1347    #[unstable(feature = "mpmc_channel", issue = "126840")]
1348    pub fn iter(&self) -> Iter<'_, T> {
1349        Iter { rx: self }
1350    }
1351}
1352
1353#[unstable(feature = "mpmc_channel", issue = "126840")]
1354impl<T> Drop for Receiver<T> {
1355    fn drop(&mut self) {
1356        unsafe {
1357            match &self.flavor {
1358                ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()),
1359                ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1360                ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1361            }
1362        }
1363    }
1364}
1365
1366#[unstable(feature = "mpmc_channel", issue = "126840")]
1367impl<T> Clone for Receiver<T> {
1368    fn clone(&self) -> Self {
1369        let flavor = match &self.flavor {
1370            ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1371            ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1372            ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1373        };
1374
1375        Receiver { flavor }
1376    }
1377}
1378
1379#[unstable(feature = "mpmc_channel", issue = "126840")]
1380impl<T> fmt::Debug for Receiver<T> {
1381    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1382        f.pad("Receiver { .. }")
1383    }
1384}