Skip to main content

std/sync/mpmc/
mod.rs

1//! Multi-producer, multi-consumer FIFO queue communication primitives.
2//!
3//! This module provides message-based communication over channels, concretely
4//! defined by two types:
5//!
6//! * [`Sender`]
7//! * [`Receiver`]
8//!
9//! [`Sender`]s are used to send data to a set of [`Receiver`]s where each item
10//! sent is delivered to (at most) one receiver. Both sender and receiver are
11//! cloneable (multi-producer) such that many threads can send simultaneously
12//! to receivers (multi-consumer).
13//!
14//! These channels come in two flavors:
15//!
16//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
17//!    will return a `(Sender, Receiver)` tuple where all sends will be
18//!    **asynchronous** (they never block). The channel conceptually has an
19//!    infinite buffer.
20//!
21//! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22//!    return a `(Sender, Receiver)` tuple where the storage for pending
23//!    messages is a pre-allocated buffer of a fixed size. All sends will be
24//!    **synchronous** by blocking until there is buffer space available. Note
25//!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26//!    channel where each sender atomically hands off a message to a receiver.
27//!
28//! [`send`]: Sender::send
29//!
30//! ## Disconnection
31//!
32//! The send and receive operations on channels will all return a [`Result`]
33//! indicating whether the operation succeeded or not. An unsuccessful operation
34//! is normally indicative of the other half of a channel having "hung up" by
35//! being dropped in its corresponding thread.
36//!
37//! Once half of a channel has been deallocated, most operations can no longer
38//! continue to make progress, so [`Err`] will be returned. Many applications
39//! will continue to [`unwrap`] the results returned from this module,
40//! instigating a propagation of failure among threads if one unexpectedly dies.
41//!
42//! [`unwrap`]: Result::unwrap
43//!
44//! # Examples
45//!
46//! Simple usage:
47//!
48//! ```
49//! #![feature(mpmc_channel)]
50//!
51//! use std::thread;
52//! use std::sync::mpmc::channel;
53//!
54//! // Create a simple streaming channel
55//! let (tx, rx) = channel();
56//! thread::spawn(move || {
57//!     tx.send(10).unwrap();
58//! });
59//! assert_eq!(rx.recv().unwrap(), 10);
60//! ```
61//!
62//! Shared usage:
63//!
64//! ```
65//! #![feature(mpmc_channel)]
66//!
67//! use std::thread;
68//! use std::sync::mpmc::channel;
69//!
70//! thread::scope(|s| {
71//!     // Create a shared channel that can be sent along from many threads
72//!     // where tx is the sending half (tx for transmission), and rx is the receiving
73//!     // half (rx for receiving).
74//!     let (tx, rx) = channel();
75//!     for i in 0..10 {
76//!         let tx = tx.clone();
77//!         s.spawn(move || {
78//!             tx.send(i).unwrap();
79//!         });
80//!     }
81//!
82//!     for _ in 0..5 {
83//!         let rx1 = rx.clone();
84//!         let rx2 = rx.clone();
85//!         s.spawn(move || {
86//!             let j = rx1.recv().unwrap();
87//!             assert!(0 <= j && j < 10);
88//!         });
89//!         s.spawn(move || {
90//!             let j = rx2.recv().unwrap();
91//!             assert!(0 <= j && j < 10);
92//!         });
93//!     }
94//! })
95//! ```
96//!
97//! Propagating panics:
98//!
99//! ```
100//! #![feature(mpmc_channel)]
101//!
102//! use std::sync::mpmc::channel;
103//!
104//! // The call to recv() will return an error because the channel has already
105//! // hung up (or been deallocated)
106//! let (tx, rx) = channel::<i32>();
107//! drop(tx);
108//! assert!(rx.recv().is_err());
109//! ```
110
111// This module is used as the implementation for the channels in `sync::mpsc`.
112// The implementation comes from the crossbeam-channel crate:
113//
114// Copyright (c) 2019 The Crossbeam Project Developers
115//
116// Permission is hereby granted, free of charge, to any
117// person obtaining a copy of this software and associated
118// documentation files (the "Software"), to deal in the
119// Software without restriction, including without
120// limitation the rights to use, copy, modify, merge,
121// publish, distribute, sublicense, and/or sell copies of
122// the Software, and to permit persons to whom the Software
123// is furnished to do so, subject to the following
124// conditions:
125//
126// The above copyright notice and this permission notice
127// shall be included in all copies or substantial portions
128// of the Software.
129//
130// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
131// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
132// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
133// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
134// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
135// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
136// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
137// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
138// DEALINGS IN THE SOFTWARE.
139
140mod array;
141mod context;
142mod counter;
143mod error;
144mod list;
145mod select;
146mod utils;
147mod waker;
148mod zero;
149
150pub use error::*;
151
152use crate::fmt;
153use crate::panic::{RefUnwindSafe, UnwindSafe};
154use crate::time::{Duration, Instant};
155
156/// Creates a new asynchronous channel, returning the sender/receiver halves.
157///
158/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
159/// the same order as it was sent, and no [`send`] will block the calling thread
160/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
161/// block after its buffer limit is reached). [`recv`] will block until a message
162/// is available while there is at least one [`Sender`] alive (including clones).
163///
164/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times.
165/// The [`Receiver`] also can be cloned to have multi receivers.
166///
167/// If the [`Receiver`] is disconnected while trying to [`send`] with the
168/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
169/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
170/// return a [`RecvError`].
171///
172/// [`send`]: Sender::send
173/// [`recv`]: Receiver::recv
174///
175/// # Examples
176///
177/// ```
178/// #![feature(mpmc_channel)]
179///
180/// use std::sync::mpmc::channel;
181/// use std::thread;
182///
183/// let (sender, receiver) = channel();
184///
185/// // Spawn off an expensive computation
186/// thread::spawn(move || {
187/// #   fn expensive_computation() {}
188///     sender.send(expensive_computation()).unwrap();
189/// });
190///
191/// // Do some useful work for a while
192///
193/// // Let's see what that answer was
194/// println!("{:?}", receiver.recv().unwrap());
195/// ```
196#[must_use]
197#[unstable(feature = "mpmc_channel", issue = "126840")]
198pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
199    let (s, r) = counter::new(list::Channel::new());
200    let s = Sender { flavor: SenderFlavor::List(s) };
201    let r = Receiver { flavor: ReceiverFlavor::List(r) };
202    (s, r)
203}
204
205/// Creates a new synchronous, bounded channel.
206///
207/// All data sent on the [`Sender`] will become available on the [`Receiver`]
208/// in the same order as it was sent. Like asynchronous [`channel`]s, the
209/// [`Receiver`] will block until a message becomes available. `sync_channel`
210/// differs greatly in the semantics of the sender, however.
211///
212/// This channel has an internal buffer on which messages will be queued.
213/// `bound` specifies the buffer size. When the internal buffer becomes full,
214/// future sends will *block* waiting for the buffer to open up. Note that a
215/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
216/// where each [`send`] will not return until a [`recv`] is paired with it.
217///
218/// The [`Sender`] can be cloned to [`send`] to the same channel multiple
219/// times. The [`Receiver`] also can be cloned to have multi receivers.
220///
221/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
222/// to [`send`] with the [`Sender`], the [`send`] method will return a
223/// [`SendError`]. Similarly, If the [`Sender`] is disconnected while trying
224/// to [`recv`], the [`recv`] method will return a [`RecvError`].
225///
226/// [`send`]: Sender::send
227/// [`recv`]: Receiver::recv
228///
229/// # Examples
230///
231/// ```
232/// use std::sync::mpsc::sync_channel;
233/// use std::thread;
234///
235/// let (sender, receiver) = sync_channel(1);
236///
237/// // this returns immediately
238/// sender.send(1).unwrap();
239///
240/// thread::spawn(move || {
241///     // this will block until the previous message has been received
242///     sender.send(2).unwrap();
243/// });
244///
245/// assert_eq!(receiver.recv().unwrap(), 1);
246/// assert_eq!(receiver.recv().unwrap(), 2);
247/// ```
248#[must_use]
249#[unstable(feature = "mpmc_channel", issue = "126840")]
250pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
251    if cap == 0 {
252        let (s, r) = counter::new(zero::Channel::new());
253        let s = Sender { flavor: SenderFlavor::Zero(s) };
254        let r = Receiver { flavor: ReceiverFlavor::Zero(r) };
255        (s, r)
256    } else {
257        let (s, r) = counter::new(array::Channel::with_capacity(cap));
258        let s = Sender { flavor: SenderFlavor::Array(s) };
259        let r = Receiver { flavor: ReceiverFlavor::Array(r) };
260        (s, r)
261    }
262}
263
264/// The sending-half of Rust's synchronous [`channel`] type.
265///
266/// Messages can be sent through this channel with [`send`].
267///
268/// Note: all senders (the original and its clones) need to be dropped for the receiver
269/// to stop blocking to receive messages with [`Receiver::recv`].
270///
271/// [`send`]: Sender::send
272///
273/// # Examples
274///
275/// ```rust
276/// #![feature(mpmc_channel)]
277///
278/// use std::sync::mpmc::channel;
279/// use std::thread;
280///
281/// let (sender, receiver) = channel();
282/// let sender2 = sender.clone();
283///
284/// // First thread owns sender
285/// thread::spawn(move || {
286///     sender.send(1).unwrap();
287/// });
288///
289/// // Second thread owns sender2
290/// thread::spawn(move || {
291///     sender2.send(2).unwrap();
292/// });
293///
294/// let msg = receiver.recv().unwrap();
295/// let msg2 = receiver.recv().unwrap();
296///
297/// assert_eq!(3, msg + msg2);
298/// ```
299#[unstable(feature = "mpmc_channel", issue = "126840")]
300pub struct Sender<T> {
301    flavor: SenderFlavor<T>,
302}
303
304/// Sender flavors.
305enum SenderFlavor<T> {
306    /// Bounded channel based on a preallocated array.
307    Array(counter::Sender<array::Channel<T>>),
308
309    /// Unbounded channel implemented as a linked list.
310    List(counter::Sender<list::Channel<T>>),
311
312    /// Zero-capacity channel.
313    Zero(counter::Sender<zero::Channel<T>>),
314}
315
316#[unstable(feature = "mpmc_channel", issue = "126840")]
317unsafe impl<T: Send> Send for Sender<T> {}
318#[unstable(feature = "mpmc_channel", issue = "126840")]
319unsafe impl<T: Send> Sync for Sender<T> {}
320
321#[unstable(feature = "mpmc_channel", issue = "126840")]
322impl<T> UnwindSafe for Sender<T> {}
323#[unstable(feature = "mpmc_channel", issue = "126840")]
324impl<T> RefUnwindSafe for Sender<T> {}
325
326impl<T> Sender<T> {
327    /// Attempts to send a message into the channel without blocking.
328    ///
329    /// This method will either send a message into the channel immediately or return an error if
330    /// the channel is full or disconnected. The returned error contains the original message.
331    ///
332    /// If called on a zero-capacity channel, this method will send the message only if there
333    /// happens to be a receive operation on the other side of the channel at the same time.
334    ///
335    /// # Examples
336    ///
337    /// ```rust
338    /// #![feature(mpmc_channel)]
339    ///
340    /// use std::sync::mpmc::{channel, Receiver, Sender};
341    ///
342    /// let (sender, _receiver): (Sender<i32>, Receiver<i32>) = channel();
343    ///
344    /// assert!(sender.try_send(1).is_ok());
345    /// ```
346    #[unstable(feature = "mpmc_channel", issue = "126840")]
347    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
348        match &self.flavor {
349            SenderFlavor::Array(chan) => chan.try_send(msg),
350            SenderFlavor::List(chan) => chan.try_send(msg),
351            SenderFlavor::Zero(chan) => chan.try_send(msg),
352        }
353    }
354
355    /// Attempts to send a value on this channel, returning it back if it could
356    /// not be sent.
357    ///
358    /// A successful send occurs when it is determined that the other end of
359    /// the channel has not hung up already. An unsuccessful send would be one
360    /// where the corresponding receiver has already been deallocated. Note
361    /// that a return value of [`Err`] means that the data will never be
362    /// received, but a return value of [`Ok`] does *not* mean that the data
363    /// will be received. It is possible for the corresponding receiver to
364    /// hang up immediately after this function returns [`Ok`]. However, if
365    /// the channel is zero-capacity, it acts as a rendezvous channel and a
366    /// return value of [`Ok`] means that the data has been received.
367    ///
368    /// If the channel is full and not disconnected, this call will block until
369    /// the send operation can proceed. If the channel becomes disconnected,
370    /// this call will wake up and return an error. The returned error contains
371    /// the original message.
372    ///
373    /// If called on a zero-capacity channel, this method will wait for a receive
374    /// operation to appear on the other side of the channel.
375    ///
376    /// # Examples
377    ///
378    /// ```
379    /// #![feature(mpmc_channel)]
380    ///
381    /// use std::sync::mpmc::channel;
382    ///
383    /// let (tx, rx) = channel();
384    ///
385    /// // This send is always successful
386    /// tx.send(1).unwrap();
387    ///
388    /// // This send will fail because the receiver is gone
389    /// drop(rx);
390    /// assert!(tx.send(1).is_err());
391    /// ```
392    #[unstable(feature = "mpmc_channel", issue = "126840")]
393    pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
394        match &self.flavor {
395            SenderFlavor::Array(chan) => chan.send(msg, None),
396            SenderFlavor::List(chan) => chan.send(msg, None),
397            SenderFlavor::Zero(chan) => chan.send(msg, None),
398        }
399        .map_err(|err| match err {
400            SendTimeoutError::Disconnected(msg) => SendError(msg),
401            SendTimeoutError::Timeout(_) => unreachable!(),
402        })
403    }
404}
405
406impl<T> Sender<T> {
407    /// Waits for a message to be sent into the channel, but only for a limited time.
408    ///
409    /// If the channel is full and not disconnected, this call will block until the send operation
410    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
411    /// wake up and return an error. The returned error contains the original message.
412    ///
413    /// If called on a zero-capacity channel, this method will wait for a receive operation to
414    /// appear on the other side of the channel.
415    ///
416    /// # Examples
417    ///
418    /// ```
419    /// #![feature(mpmc_channel)]
420    ///
421    /// use std::sync::mpmc::channel;
422    /// use std::time::Duration;
423    ///
424    /// let (tx, rx) = channel();
425    ///
426    /// tx.send_timeout(1, Duration::from_millis(400)).unwrap();
427    /// ```
428    #[unstable(feature = "mpmc_channel", issue = "126840")]
429    pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
430        match Instant::now().checked_add(timeout) {
431            Some(deadline) => self.send_deadline(msg, deadline),
432            // So far in the future that it's practically the same as waiting indefinitely.
433            None => self.send(msg).map_err(SendTimeoutError::from),
434        }
435    }
436
437    /// Waits for a message to be sent into the channel, but only until a given deadline.
438    ///
439    /// If the channel is full and not disconnected, this call will block until the send operation
440    /// can proceed or the operation times out. If the channel becomes disconnected, this call will
441    /// wake up and return an error. The returned error contains the original message.
442    ///
443    /// If called on a zero-capacity channel, this method will wait for a receive operation to
444    /// appear on the other side of the channel.
445    ///
446    /// # Examples
447    ///
448    /// ```
449    /// #![feature(mpmc_channel)]
450    ///
451    /// use std::sync::mpmc::channel;
452    /// use std::time::{Duration, Instant};
453    ///
454    /// let (tx, rx) = channel();
455    ///
456    /// let t = Instant::now() + Duration::from_millis(400);
457    /// tx.send_deadline(1, t).unwrap();
458    /// ```
459    #[unstable(feature = "mpmc_channel", issue = "126840")]
460    pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
461        match &self.flavor {
462            SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
463            SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
464            SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
465        }
466    }
467
468    /// Returns `true` if the channel is empty.
469    ///
470    /// Note: Zero-capacity channels are always empty.
471    ///
472    /// # Examples
473    ///
474    /// ```
475    /// #![feature(mpmc_channel)]
476    ///
477    /// use std::sync::mpmc;
478    /// use std::thread;
479    ///
480    /// let (send, _recv) = mpmc::channel();
481    ///
482    /// let tx1 = send.clone();
483    /// let tx2 = send.clone();
484    ///
485    /// assert!(tx1.is_empty());
486    ///
487    /// let handle = thread::spawn(move || {
488    ///     tx2.send(1u8).unwrap();
489    /// });
490    ///
491    /// handle.join().unwrap();
492    ///
493    /// assert!(!tx1.is_empty());
494    /// ```
495    #[unstable(feature = "mpmc_channel", issue = "126840")]
496    pub fn is_empty(&self) -> bool {
497        match &self.flavor {
498            SenderFlavor::Array(chan) => chan.is_empty(),
499            SenderFlavor::List(chan) => chan.is_empty(),
500            SenderFlavor::Zero(chan) => chan.is_empty(),
501        }
502    }
503
504    /// Returns `true` if the channel is full.
505    ///
506    /// Note: Zero-capacity channels are always full.
507    ///
508    /// # Examples
509    ///
510    /// ```
511    /// #![feature(mpmc_channel)]
512    ///
513    /// use std::sync::mpmc;
514    /// use std::thread;
515    ///
516    /// let (send, _recv) = mpmc::sync_channel(1);
517    ///
518    /// let (tx1, tx2) = (send.clone(), send.clone());
519    /// assert!(!tx1.is_full());
520    ///
521    /// let handle = thread::spawn(move || {
522    ///     tx2.send(1u8).unwrap();
523    /// });
524    ///
525    /// handle.join().unwrap();
526    ///
527    /// assert!(tx1.is_full());
528    /// ```
529    #[unstable(feature = "mpmc_channel", issue = "126840")]
530    pub fn is_full(&self) -> bool {
531        match &self.flavor {
532            SenderFlavor::Array(chan) => chan.is_full(),
533            SenderFlavor::List(chan) => chan.is_full(),
534            SenderFlavor::Zero(chan) => chan.is_full(),
535        }
536    }
537
538    /// Returns the number of messages in the channel.
539    ///
540    /// # Examples
541    ///
542    /// ```
543    /// #![feature(mpmc_channel)]
544    ///
545    /// use std::sync::mpmc;
546    /// use std::thread;
547    ///
548    /// let (send, _recv) = mpmc::channel();
549    /// let (tx1, tx2) = (send.clone(), send.clone());
550    ///
551    /// assert_eq!(tx1.len(), 0);
552    ///
553    /// let handle = thread::spawn(move || {
554    ///     tx2.send(1u8).unwrap();
555    /// });
556    ///
557    /// handle.join().unwrap();
558    ///
559    /// assert_eq!(tx1.len(), 1);
560    /// ```
561    #[unstable(feature = "mpmc_channel", issue = "126840")]
562    pub fn len(&self) -> usize {
563        match &self.flavor {
564            SenderFlavor::Array(chan) => chan.len(),
565            SenderFlavor::List(chan) => chan.len(),
566            SenderFlavor::Zero(chan) => chan.len(),
567        }
568    }
569
570    /// If the channel is bounded, returns its capacity.
571    ///
572    /// # Examples
573    ///
574    /// ```
575    /// #![feature(mpmc_channel)]
576    ///
577    /// use std::sync::mpmc;
578    /// use std::thread;
579    ///
580    /// let (send, _recv) = mpmc::sync_channel(3);
581    /// let (tx1, tx2) = (send.clone(), send.clone());
582    ///
583    /// assert_eq!(tx1.capacity(), Some(3));
584    ///
585    /// let handle = thread::spawn(move || {
586    ///     tx2.send(1u8).unwrap();
587    /// });
588    ///
589    /// handle.join().unwrap();
590    ///
591    /// assert_eq!(tx1.capacity(), Some(3));
592    /// ```
593    #[unstable(feature = "mpmc_channel", issue = "126840")]
594    pub fn capacity(&self) -> Option<usize> {
595        match &self.flavor {
596            SenderFlavor::Array(chan) => chan.capacity(),
597            SenderFlavor::List(chan) => chan.capacity(),
598            SenderFlavor::Zero(chan) => chan.capacity(),
599        }
600    }
601
602    /// Returns `true` if senders belong to the same channel.
603    ///
604    /// # Examples
605    ///
606    /// ```
607    /// #![feature(mpmc_channel)]
608    ///
609    /// use std::sync::mpmc;
610    ///
611    /// let (tx1, _) = mpmc::channel::<i32>();
612    /// let (tx2, _) = mpmc::channel::<i32>();
613    ///
614    /// assert!(tx1.same_channel(&tx1));
615    /// assert!(!tx1.same_channel(&tx2));
616    /// ```
617    #[unstable(feature = "mpmc_channel", issue = "126840")]
618    pub fn same_channel(&self, other: &Sender<T>) -> bool {
619        match (&self.flavor, &other.flavor) {
620            (SenderFlavor::Array(a), SenderFlavor::Array(b)) => a == b,
621            (SenderFlavor::List(a), SenderFlavor::List(b)) => a == b,
622            (SenderFlavor::Zero(a), SenderFlavor::Zero(b)) => a == b,
623            _ => false,
624        }
625    }
626
627    /// Returns `true` if the channel is disconnected.
628    ///
629    /// Note that a return value of `false` does not guarantee the channel will
630    /// remain connected. The channel may be disconnected immediately after this method
631    /// returns, so a subsequent [`Sender::send`] may still fail with [`SendError`].
632    ///
633    /// # Examples
634    ///
635    /// ```
636    /// #![feature(mpmc_channel)]
637    ///
638    /// use std::sync::mpmc::channel;
639    ///
640    /// let (tx, rx) = channel::<i32>();
641    /// assert!(!tx.is_disconnected());
642    /// drop(rx);
643    /// assert!(tx.is_disconnected());
644    /// ```
645    #[unstable(feature = "mpmc_channel", issue = "126840")]
646    pub fn is_disconnected(&self) -> bool {
647        match &self.flavor {
648            SenderFlavor::Array(chan) => chan.is_disconnected(),
649            SenderFlavor::List(chan) => chan.is_disconnected(),
650            SenderFlavor::Zero(chan) => chan.is_disconnected(),
651        }
652    }
653}
654
655#[unstable(feature = "mpmc_channel", issue = "126840")]
656impl<T> Drop for Sender<T> {
657    fn drop(&mut self) {
658        unsafe {
659            match &self.flavor {
660                SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()),
661                SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
662                SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
663            }
664        }
665    }
666}
667
668#[unstable(feature = "mpmc_channel", issue = "126840")]
669impl<T> Clone for Sender<T> {
670    fn clone(&self) -> Self {
671        let flavor = match &self.flavor {
672            SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
673            SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
674            SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
675        };
676
677        Sender { flavor }
678    }
679}
680
681#[unstable(feature = "mpmc_channel", issue = "126840")]
682impl<T> fmt::Debug for Sender<T> {
683    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
684        f.debug_struct("Sender").finish_non_exhaustive()
685    }
686}
687
688/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
689/// Different threads can share this [`Receiver`] by cloning it.
690///
691/// Messages sent to the channel can be retrieved using [`recv`].
692///
693/// [`recv`]: Receiver::recv
694///
695/// # Examples
696///
697/// ```rust
698/// #![feature(mpmc_channel)]
699///
700/// use std::sync::mpmc::channel;
701/// use std::thread;
702/// use std::time::Duration;
703///
704/// let (send, recv) = channel();
705///
706/// let tx_thread = thread::spawn(move || {
707///     send.send("Hello world!").unwrap();
708///     thread::sleep(Duration::from_secs(2)); // block for two seconds
709///     send.send("Delayed for 2 seconds").unwrap();
710/// });
711///
712/// let (rx1, rx2) = (recv.clone(), recv.clone());
713/// let rx_thread_1 = thread::spawn(move || {
714///     println!("{}", rx1.recv().unwrap()); // Received immediately
715/// });
716/// let rx_thread_2 = thread::spawn(move || {
717///     println!("{}", rx2.recv().unwrap()); // Received after 2 seconds
718/// });
719///
720/// tx_thread.join().unwrap();
721/// rx_thread_1.join().unwrap();
722/// rx_thread_2.join().unwrap();
723/// ```
724#[unstable(feature = "mpmc_channel", issue = "126840")]
725pub struct Receiver<T> {
726    flavor: ReceiverFlavor<T>,
727}
728
729/// An iterator over messages on a [`Receiver`], created by [`iter`].
730///
731/// This iterator will block whenever [`next`] is called,
732/// waiting for a new message, and [`None`] will be returned
733/// when the corresponding channel has hung up.
734///
735/// [`iter`]: Receiver::iter
736/// [`next`]: Iterator::next
737///
738/// # Examples
739///
740/// ```rust
741/// #![feature(mpmc_channel)]
742///
743/// use std::sync::mpmc::channel;
744/// use std::thread;
745///
746/// let (send, recv) = channel();
747///
748/// thread::spawn(move || {
749///     send.send(1u8).unwrap();
750///     send.send(2u8).unwrap();
751///     send.send(3u8).unwrap();
752/// });
753///
754/// for x in recv.iter() {
755///     println!("Got: {x}");
756/// }
757/// ```
758#[unstable(feature = "mpmc_channel", issue = "126840")]
759#[derive(Debug)]
760pub struct Iter<'a, T: 'a> {
761    rx: &'a Receiver<T>,
762}
763
764/// An iterator that attempts to yield all pending values for a [`Receiver`],
765/// created by [`try_iter`].
766///
767/// [`None`] will be returned when there are no pending values remaining or
768/// if the corresponding channel has hung up.
769///
770/// This iterator will never block the caller in order to wait for data to
771/// become available. Instead, it will return [`None`].
772///
773/// [`try_iter`]: Receiver::try_iter
774///
775/// # Examples
776///
777/// ```rust
778/// #![feature(mpmc_channel)]
779///
780/// use std::sync::mpmc::channel;
781/// use std::thread;
782/// use std::time::Duration;
783///
784/// let (sender, receiver) = channel();
785///
786/// // Nothing is in the buffer yet
787/// assert!(receiver.try_iter().next().is_none());
788/// println!("Nothing in the buffer...");
789///
790/// thread::spawn(move || {
791///     sender.send(1).unwrap();
792///     sender.send(2).unwrap();
793///     sender.send(3).unwrap();
794/// });
795///
796/// println!("Going to sleep...");
797/// thread::sleep(Duration::from_secs(2)); // block for two seconds
798///
799/// for x in receiver.try_iter() {
800///     println!("Got: {x}");
801/// }
802/// ```
803#[unstable(feature = "mpmc_channel", issue = "126840")]
804#[derive(Debug)]
805pub struct TryIter<'a, T: 'a> {
806    rx: &'a Receiver<T>,
807}
808
809/// An owning iterator over messages on a [`Receiver`],
810/// created by [`into_iter`].
811///
812/// This iterator will block whenever [`next`]
813/// is called, waiting for a new message, and [`None`] will be
814/// returned if the corresponding channel has hung up.
815///
816/// [`into_iter`]: Receiver::into_iter
817/// [`next`]: Iterator::next
818///
819/// # Examples
820///
821/// ```rust
822/// #![feature(mpmc_channel)]
823///
824/// use std::sync::mpmc::channel;
825/// use std::thread;
826///
827/// let (send, recv) = channel();
828///
829/// thread::spawn(move || {
830///     send.send(1u8).unwrap();
831///     send.send(2u8).unwrap();
832///     send.send(3u8).unwrap();
833/// });
834///
835/// for x in recv.into_iter() {
836///     println!("Got: {x}");
837/// }
838/// ```
839#[unstable(feature = "mpmc_channel", issue = "126840")]
840#[derive(Debug)]
841pub struct IntoIter<T> {
842    rx: Receiver<T>,
843}
844
845#[unstable(feature = "mpmc_channel", issue = "126840")]
846impl<'a, T> Iterator for Iter<'a, T> {
847    type Item = T;
848
849    fn next(&mut self) -> Option<T> {
850        self.rx.recv().ok()
851    }
852}
853
854#[unstable(feature = "mpmc_channel", issue = "126840")]
855impl<'a, T> Iterator for TryIter<'a, T> {
856    type Item = T;
857
858    fn next(&mut self) -> Option<T> {
859        self.rx.try_recv().ok()
860    }
861}
862
863#[unstable(feature = "mpmc_channel", issue = "126840")]
864impl<'a, T> IntoIterator for &'a Receiver<T> {
865    type Item = T;
866    type IntoIter = Iter<'a, T>;
867
868    fn into_iter(self) -> Iter<'a, T> {
869        self.iter()
870    }
871}
872
873#[unstable(feature = "mpmc_channel", issue = "126840")]
874impl<T> Iterator for IntoIter<T> {
875    type Item = T;
876    fn next(&mut self) -> Option<T> {
877        self.rx.recv().ok()
878    }
879}
880
881#[unstable(feature = "mpmc_channel", issue = "126840")]
882impl<T> IntoIterator for Receiver<T> {
883    type Item = T;
884    type IntoIter = IntoIter<T>;
885
886    fn into_iter(self) -> IntoIter<T> {
887        IntoIter { rx: self }
888    }
889}
890
891/// Receiver flavors.
892enum ReceiverFlavor<T> {
893    /// Bounded channel based on a preallocated array.
894    Array(counter::Receiver<array::Channel<T>>),
895
896    /// Unbounded channel implemented as a linked list.
897    List(counter::Receiver<list::Channel<T>>),
898
899    /// Zero-capacity channel.
900    Zero(counter::Receiver<zero::Channel<T>>),
901}
902
903#[unstable(feature = "mpmc_channel", issue = "126840")]
904unsafe impl<T: Send> Send for Receiver<T> {}
905#[unstable(feature = "mpmc_channel", issue = "126840")]
906unsafe impl<T: Send> Sync for Receiver<T> {}
907
908#[unstable(feature = "mpmc_channel", issue = "126840")]
909impl<T> UnwindSafe for Receiver<T> {}
910#[unstable(feature = "mpmc_channel", issue = "126840")]
911impl<T> RefUnwindSafe for Receiver<T> {}
912
913impl<T> Receiver<T> {
914    /// Attempts to receive a message from the channel without blocking.
915    ///
916    /// This method will never block the caller in order to wait for data to
917    /// become available. Instead, this will always return immediately with a
918    /// possible option of pending data on the channel.
919    ///
920    /// If called on a zero-capacity channel, this method will receive a message only if there
921    /// happens to be a send operation on the other side of the channel at the same time.
922    ///
923    /// This is useful for a flavor of "optimistic check" before deciding to
924    /// block on a receiver.
925    ///
926    /// Compared with [`recv`], this function has two failure cases instead of one
927    /// (one for disconnection, one for an empty buffer).
928    ///
929    /// [`recv`]: Self::recv
930    ///
931    /// # Examples
932    ///
933    /// ```rust
934    /// #![feature(mpmc_channel)]
935    ///
936    /// use std::sync::mpmc::{Receiver, channel};
937    ///
938    /// let (_, receiver): (_, Receiver<i32>) = channel();
939    ///
940    /// assert!(receiver.try_recv().is_err());
941    /// ```
942    #[unstable(feature = "mpmc_channel", issue = "126840")]
943    pub fn try_recv(&self) -> Result<T, TryRecvError> {
944        match &self.flavor {
945            ReceiverFlavor::Array(chan) => chan.try_recv(),
946            ReceiverFlavor::List(chan) => chan.try_recv(),
947            ReceiverFlavor::Zero(chan) => chan.try_recv(),
948        }
949    }
950
951    /// Attempts to wait for a value on this receiver, returning an error if the
952    /// corresponding channel has hung up.
953    ///
954    /// This function will always block the current thread if there is no data
955    /// available and it's possible for more data to be sent (at least one sender
956    /// still exists). Once a message is sent to the corresponding [`Sender`],
957    /// this receiver will wake up and return that message.
958    ///
959    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
960    /// this call is blocking, this call will wake up and return [`Err`] to
961    /// indicate that no more messages can ever be received on this channel.
962    /// However, since channels are buffered, messages sent before the disconnect
963    /// will still be properly received.
964    ///
965    /// # Examples
966    ///
967    /// ```
968    /// #![feature(mpmc_channel)]
969    ///
970    /// use std::sync::mpmc;
971    /// use std::thread;
972    ///
973    /// let (send, recv) = mpmc::channel();
974    /// let handle = thread::spawn(move || {
975    ///     send.send(1u8).unwrap();
976    /// });
977    ///
978    /// handle.join().unwrap();
979    ///
980    /// assert_eq!(Ok(1), recv.recv());
981    /// ```
982    ///
983    /// Buffering behavior:
984    ///
985    /// ```
986    /// #![feature(mpmc_channel)]
987    ///
988    /// use std::sync::mpmc;
989    /// use std::thread;
990    /// use std::sync::mpmc::RecvError;
991    ///
992    /// let (send, recv) = mpmc::channel();
993    /// let handle = thread::spawn(move || {
994    ///     send.send(1u8).unwrap();
995    ///     send.send(2).unwrap();
996    ///     send.send(3).unwrap();
997    ///     drop(send);
998    /// });
999    ///
1000    /// // wait for the thread to join so we ensure the sender is dropped
1001    /// handle.join().unwrap();
1002    ///
1003    /// assert_eq!(Ok(1), recv.recv());
1004    /// assert_eq!(Ok(2), recv.recv());
1005    /// assert_eq!(Ok(3), recv.recv());
1006    /// assert_eq!(Err(RecvError), recv.recv());
1007    /// ```
1008    #[unstable(feature = "mpmc_channel", issue = "126840")]
1009    pub fn recv(&self) -> Result<T, RecvError> {
1010        match &self.flavor {
1011            ReceiverFlavor::Array(chan) => chan.recv(None),
1012            ReceiverFlavor::List(chan) => chan.recv(None),
1013            ReceiverFlavor::Zero(chan) => chan.recv(None),
1014        }
1015        .map_err(|_| RecvError)
1016    }
1017
1018    /// Attempts to wait for a value on this receiver, returning an error if the
1019    /// corresponding channel has hung up, or if it waits more than `timeout`.
1020    ///
1021    /// This function will always block the current thread if there is no data
1022    /// available and it's possible for more data to be sent (at least one sender
1023    /// still exists). Once a message is sent to the corresponding [`Sender`],
1024    /// this receiver will wake up and return that message.
1025    ///
1026    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1027    /// this call is blocking, this call will wake up and return [`Err`] to
1028    /// indicate that no more messages can ever be received on this channel.
1029    /// However, since channels are buffered, messages sent before the disconnect
1030    /// will still be properly received.
1031    ///
1032    /// # Examples
1033    ///
1034    /// Successfully receiving value before encountering timeout:
1035    ///
1036    /// ```no_run
1037    /// #![feature(mpmc_channel)]
1038    ///
1039    /// use std::thread;
1040    /// use std::time::Duration;
1041    /// use std::sync::mpmc;
1042    ///
1043    /// let (send, recv) = mpmc::channel();
1044    ///
1045    /// thread::spawn(move || {
1046    ///     send.send('a').unwrap();
1047    /// });
1048    ///
1049    /// assert_eq!(
1050    ///     recv.recv_timeout(Duration::from_millis(400)),
1051    ///     Ok('a')
1052    /// );
1053    /// ```
1054    ///
1055    /// Receiving an error upon reaching timeout:
1056    ///
1057    /// ```no_run
1058    /// #![feature(mpmc_channel)]
1059    ///
1060    /// use std::thread;
1061    /// use std::time::Duration;
1062    /// use std::sync::mpmc;
1063    ///
1064    /// let (send, recv) = mpmc::channel();
1065    ///
1066    /// thread::spawn(move || {
1067    ///     thread::sleep(Duration::from_millis(800));
1068    ///     send.send('a').unwrap();
1069    /// });
1070    ///
1071    /// assert_eq!(
1072    ///     recv.recv_timeout(Duration::from_millis(400)),
1073    ///     Err(mpmc::RecvTimeoutError::Timeout)
1074    /// );
1075    /// ```
1076    #[unstable(feature = "mpmc_channel", issue = "126840")]
1077    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1078        match Instant::now().checked_add(timeout) {
1079            Some(deadline) => self.recv_deadline(deadline),
1080            // So far in the future that it's practically the same as waiting indefinitely.
1081            None => self.recv().map_err(RecvTimeoutError::from),
1082        }
1083    }
1084
1085    /// Attempts to wait for a value on this receiver, returning an error if the
1086    /// corresponding channel has hung up, or if `deadline` is reached.
1087    ///
1088    /// This function will always block the current thread if there is no data
1089    /// available and it's possible for more data to be sent. Once a message is
1090    /// sent to the corresponding [`Sender`], then this receiver will wake up
1091    /// and return that message.
1092    ///
1093    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1094    /// this call is blocking, this call will wake up and return [`Err`] to
1095    /// indicate that no more messages can ever be received on this channel.
1096    /// However, since channels are buffered, messages sent before the disconnect
1097    /// will still be properly received.
1098    ///
1099    /// # Examples
1100    ///
1101    /// Successfully receiving value before reaching deadline:
1102    ///
1103    /// ```no_run
1104    /// #![feature(mpmc_channel)]
1105    ///
1106    /// use std::thread;
1107    /// use std::time::{Duration, Instant};
1108    /// use std::sync::mpmc;
1109    ///
1110    /// let (send, recv) = mpmc::channel();
1111    ///
1112    /// thread::spawn(move || {
1113    ///     send.send('a').unwrap();
1114    /// });
1115    ///
1116    /// assert_eq!(
1117    ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1118    ///     Ok('a')
1119    /// );
1120    /// ```
1121    ///
1122    /// Receiving an error upon reaching deadline:
1123    ///
1124    /// ```no_run
1125    /// #![feature(mpmc_channel)]
1126    ///
1127    /// use std::thread;
1128    /// use std::time::{Duration, Instant};
1129    /// use std::sync::mpmc;
1130    ///
1131    /// let (send, recv) = mpmc::channel();
1132    ///
1133    /// thread::spawn(move || {
1134    ///     thread::sleep(Duration::from_millis(800));
1135    ///     send.send('a').unwrap();
1136    /// });
1137    ///
1138    /// assert_eq!(
1139    ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1140    ///     Err(mpmc::RecvTimeoutError::Timeout)
1141    /// );
1142    /// ```
1143    #[unstable(feature = "mpmc_channel", issue = "126840")]
1144    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1145        match &self.flavor {
1146            ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
1147            ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
1148            ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
1149        }
1150    }
1151
1152    /// Returns an iterator that will attempt to yield all pending values.
1153    /// It will return `None` if there are no more pending values or if the
1154    /// channel has hung up. The iterator will never [`panic!`] or block the
1155    /// user by waiting for values.
1156    ///
1157    /// # Examples
1158    ///
1159    /// ```no_run
1160    /// #![feature(mpmc_channel)]
1161    ///
1162    /// use std::sync::mpmc::channel;
1163    /// use std::thread;
1164    /// use std::time::Duration;
1165    ///
1166    /// let (sender, receiver) = channel();
1167    ///
1168    /// // nothing is in the buffer yet
1169    /// assert!(receiver.try_iter().next().is_none());
1170    ///
1171    /// thread::spawn(move || {
1172    ///     thread::sleep(Duration::from_secs(1));
1173    ///     sender.send(1).unwrap();
1174    ///     sender.send(2).unwrap();
1175    ///     sender.send(3).unwrap();
1176    /// });
1177    ///
1178    /// // nothing is in the buffer yet
1179    /// assert!(receiver.try_iter().next().is_none());
1180    ///
1181    /// // block for two seconds
1182    /// thread::sleep(Duration::from_secs(2));
1183    ///
1184    /// let mut iter = receiver.try_iter();
1185    /// assert_eq!(iter.next(), Some(1));
1186    /// assert_eq!(iter.next(), Some(2));
1187    /// assert_eq!(iter.next(), Some(3));
1188    /// assert_eq!(iter.next(), None);
1189    /// ```
1190    #[unstable(feature = "mpmc_channel", issue = "126840")]
1191    pub fn try_iter(&self) -> TryIter<'_, T> {
1192        TryIter { rx: self }
1193    }
1194}
1195
1196impl<T> Receiver<T> {
1197    /// Returns `true` if the channel is empty.
1198    ///
1199    /// Note: Zero-capacity channels are always empty.
1200    ///
1201    /// # Examples
1202    ///
1203    /// ```
1204    /// #![feature(mpmc_channel)]
1205    ///
1206    /// use std::sync::mpmc;
1207    /// use std::thread;
1208    ///
1209    /// let (send, recv) = mpmc::channel();
1210    ///
1211    /// assert!(recv.is_empty());
1212    ///
1213    /// let handle = thread::spawn(move || {
1214    ///     send.send(1u8).unwrap();
1215    /// });
1216    ///
1217    /// handle.join().unwrap();
1218    ///
1219    /// assert!(!recv.is_empty());
1220    /// ```
1221    #[unstable(feature = "mpmc_channel", issue = "126840")]
1222    pub fn is_empty(&self) -> bool {
1223        match &self.flavor {
1224            ReceiverFlavor::Array(chan) => chan.is_empty(),
1225            ReceiverFlavor::List(chan) => chan.is_empty(),
1226            ReceiverFlavor::Zero(chan) => chan.is_empty(),
1227        }
1228    }
1229
1230    /// Returns `true` if the channel is full.
1231    ///
1232    /// Note: Zero-capacity channels are always full.
1233    ///
1234    /// # Examples
1235    ///
1236    /// ```
1237    /// #![feature(mpmc_channel)]
1238    ///
1239    /// use std::sync::mpmc;
1240    /// use std::thread;
1241    ///
1242    /// let (send, recv) = mpmc::sync_channel(1);
1243    ///
1244    /// assert!(!recv.is_full());
1245    ///
1246    /// let handle = thread::spawn(move || {
1247    ///     send.send(1u8).unwrap();
1248    /// });
1249    ///
1250    /// handle.join().unwrap();
1251    ///
1252    /// assert!(recv.is_full());
1253    /// ```
1254    #[unstable(feature = "mpmc_channel", issue = "126840")]
1255    pub fn is_full(&self) -> bool {
1256        match &self.flavor {
1257            ReceiverFlavor::Array(chan) => chan.is_full(),
1258            ReceiverFlavor::List(chan) => chan.is_full(),
1259            ReceiverFlavor::Zero(chan) => chan.is_full(),
1260        }
1261    }
1262
1263    /// Returns the number of messages in the channel.
1264    ///
1265    /// # Examples
1266    ///
1267    /// ```
1268    /// #![feature(mpmc_channel)]
1269    ///
1270    /// use std::sync::mpmc;
1271    /// use std::thread;
1272    ///
1273    /// let (send, recv) = mpmc::channel();
1274    ///
1275    /// assert_eq!(recv.len(), 0);
1276    ///
1277    /// let handle = thread::spawn(move || {
1278    ///     send.send(1u8).unwrap();
1279    /// });
1280    ///
1281    /// handle.join().unwrap();
1282    ///
1283    /// assert_eq!(recv.len(), 1);
1284    /// ```
1285    #[unstable(feature = "mpmc_channel", issue = "126840")]
1286    pub fn len(&self) -> usize {
1287        match &self.flavor {
1288            ReceiverFlavor::Array(chan) => chan.len(),
1289            ReceiverFlavor::List(chan) => chan.len(),
1290            ReceiverFlavor::Zero(chan) => chan.len(),
1291        }
1292    }
1293
1294    /// If the channel is bounded, returns its capacity.
1295    ///
1296    /// # Examples
1297    ///
1298    /// ```
1299    /// #![feature(mpmc_channel)]
1300    ///
1301    /// use std::sync::mpmc;
1302    /// use std::thread;
1303    ///
1304    /// let (send, recv) = mpmc::sync_channel(3);
1305    ///
1306    /// assert_eq!(recv.capacity(), Some(3));
1307    ///
1308    /// let handle = thread::spawn(move || {
1309    ///     send.send(1u8).unwrap();
1310    /// });
1311    ///
1312    /// handle.join().unwrap();
1313    ///
1314    /// assert_eq!(recv.capacity(), Some(3));
1315    /// ```
1316    #[unstable(feature = "mpmc_channel", issue = "126840")]
1317    pub fn capacity(&self) -> Option<usize> {
1318        match &self.flavor {
1319            ReceiverFlavor::Array(chan) => chan.capacity(),
1320            ReceiverFlavor::List(chan) => chan.capacity(),
1321            ReceiverFlavor::Zero(chan) => chan.capacity(),
1322        }
1323    }
1324
1325    /// Returns `true` if receivers belong to the same channel.
1326    ///
1327    /// # Examples
1328    ///
1329    /// ```
1330    /// #![feature(mpmc_channel)]
1331    ///
1332    /// use std::sync::mpmc;
1333    ///
1334    /// let (_, rx1) = mpmc::channel::<i32>();
1335    /// let (_, rx2) = mpmc::channel::<i32>();
1336    ///
1337    /// assert!(rx1.same_channel(&rx1));
1338    /// assert!(!rx1.same_channel(&rx2));
1339    /// ```
1340    #[unstable(feature = "mpmc_channel", issue = "126840")]
1341    pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1342        match (&self.flavor, &other.flavor) {
1343            (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1344            (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1345            (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1346            _ => false,
1347        }
1348    }
1349
1350    /// Returns an iterator that will block waiting for messages, but never
1351    /// [`panic!`]. It will return [`None`] when the channel has hung up.
1352    ///
1353    /// # Examples
1354    ///
1355    /// ```rust
1356    /// #![feature(mpmc_channel)]
1357    ///
1358    /// use std::sync::mpmc::channel;
1359    /// use std::thread;
1360    ///
1361    /// let (send, recv) = channel();
1362    ///
1363    /// thread::spawn(move || {
1364    ///     send.send(1).unwrap();
1365    ///     send.send(2).unwrap();
1366    ///     send.send(3).unwrap();
1367    /// });
1368    ///
1369    /// let mut iter = recv.iter();
1370    /// assert_eq!(iter.next(), Some(1));
1371    /// assert_eq!(iter.next(), Some(2));
1372    /// assert_eq!(iter.next(), Some(3));
1373    /// assert_eq!(iter.next(), None);
1374    /// ```
1375    #[unstable(feature = "mpmc_channel", issue = "126840")]
1376    pub fn iter(&self) -> Iter<'_, T> {
1377        Iter { rx: self }
1378    }
1379
1380    /// Returns `true` if the channel is disconnected.
1381    ///
1382    /// Note that a return value of `false` does not guarantee the channel will
1383    /// remain connected. The channel may be disconnected immediately after this method
1384    /// returns, so a subsequent [`Receiver::recv`] may still fail with [`RecvError`].
1385    ///
1386    /// # Examples
1387    ///
1388    /// ```
1389    /// #![feature(mpmc_channel)]
1390    ///
1391    /// use std::sync::mpmc::channel;
1392    ///
1393    /// let (tx, rx) = channel::<i32>();
1394    /// assert!(!rx.is_disconnected());
1395    /// drop(tx);
1396    /// assert!(rx.is_disconnected());
1397    /// ```
1398    #[unstable(feature = "mpmc_channel", issue = "126840")]
1399    pub fn is_disconnected(&self) -> bool {
1400        match &self.flavor {
1401            ReceiverFlavor::Array(chan) => chan.is_disconnected(),
1402            ReceiverFlavor::List(chan) => chan.is_disconnected(),
1403            ReceiverFlavor::Zero(chan) => chan.is_disconnected(),
1404        }
1405    }
1406}
1407
1408#[unstable(feature = "mpmc_channel", issue = "126840")]
1409impl<T> Drop for Receiver<T> {
1410    fn drop(&mut self) {
1411        unsafe {
1412            match &self.flavor {
1413                ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()),
1414                ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1415                ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1416            }
1417        }
1418    }
1419}
1420
1421#[unstable(feature = "mpmc_channel", issue = "126840")]
1422impl<T> Clone for Receiver<T> {
1423    fn clone(&self) -> Self {
1424        let flavor = match &self.flavor {
1425            ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1426            ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1427            ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1428        };
1429
1430        Receiver { flavor }
1431    }
1432}
1433
1434#[unstable(feature = "mpmc_channel", issue = "126840")]
1435impl<T> fmt::Debug for Receiver<T> {
1436    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1437        f.debug_struct("Receiver").finish_non_exhaustive()
1438    }
1439}
1440
1441#[cfg(test)]
1442mod tests;