std/sync/mpmc/mod.rs
1//! Multi-producer, multi-consumer FIFO queue communication primitives.
2//!
3//! This module provides message-based communication over channels, concretely
4//! defined by two types:
5//!
6//! * [`Sender`]
7//! * [`Receiver`]
8//!
9//! [`Sender`]s are used to send data to a set of [`Receiver`]s where each item
10//! sent is delivered to (at most) one receiver. Both sender and receiver are
11//! cloneable (multi-producer) such that many threads can send simultaneously
12//! to receivers (multi-consumer).
13//!
14//! These channels come in two flavors:
15//!
16//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
17//! will return a `(Sender, Receiver)` tuple where all sends will be
18//! **asynchronous** (they never block). The channel conceptually has an
19//! infinite buffer.
20//!
21//! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22//! return a `(Sender, Receiver)` tuple where the storage for pending
23//! messages is a pre-allocated buffer of a fixed size. All sends will be
24//! **synchronous** by blocking until there is buffer space available. Note
25//! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26//! channel where each sender atomically hands off a message to a receiver.
27//!
28//! [`send`]: Sender::send
29//!
30//! ## Disconnection
31//!
32//! The send and receive operations on channels will all return a [`Result`]
33//! indicating whether the operation succeeded or not. An unsuccessful operation
34//! is normally indicative of the other half of a channel having "hung up" by
35//! being dropped in its corresponding thread.
36//!
37//! Once half of a channel has been deallocated, most operations can no longer
38//! continue to make progress, so [`Err`] will be returned. Many applications
39//! will continue to [`unwrap`] the results returned from this module,
40//! instigating a propagation of failure among threads if one unexpectedly dies.
41//!
42//! [`unwrap`]: Result::unwrap
43//!
44//! # Examples
45//!
46//! Simple usage:
47//!
48//! ```
49//! #![feature(mpmc_channel)]
50//!
51//! use std::thread;
52//! use std::sync::mpmc::channel;
53//!
54//! // Create a simple streaming channel
55//! let (tx, rx) = channel();
56//! thread::spawn(move || {
57//! tx.send(10).unwrap();
58//! });
59//! assert_eq!(rx.recv().unwrap(), 10);
60//! ```
61//!
62//! Shared usage:
63//!
64//! ```
65//! #![feature(mpmc_channel)]
66//!
67//! use std::thread;
68//! use std::sync::mpmc::channel;
69//!
70//! thread::scope(|s| {
71//! // Create a shared channel that can be sent along from many threads
72//! // where tx is the sending half (tx for transmission), and rx is the receiving
73//! // half (rx for receiving).
74//! let (tx, rx) = channel();
75//! for i in 0..10 {
76//! let tx = tx.clone();
77//! s.spawn(move || {
78//! tx.send(i).unwrap();
79//! });
80//! }
81//!
82//! for _ in 0..5 {
83//! let rx1 = rx.clone();
84//! let rx2 = rx.clone();
85//! s.spawn(move || {
86//! let j = rx1.recv().unwrap();
87//! assert!(0 <= j && j < 10);
88//! });
89//! s.spawn(move || {
90//! let j = rx2.recv().unwrap();
91//! assert!(0 <= j && j < 10);
92//! });
93//! }
94//! })
95//! ```
96//!
97//! Propagating panics:
98//!
99//! ```
100//! #![feature(mpmc_channel)]
101//!
102//! use std::sync::mpmc::channel;
103//!
104//! // The call to recv() will return an error because the channel has already
105//! // hung up (or been deallocated)
106//! let (tx, rx) = channel::<i32>();
107//! drop(tx);
108//! assert!(rx.recv().is_err());
109//! ```
110
111// This module is used as the implementation for the channels in `sync::mpsc`.
112// The implementation comes from the crossbeam-channel crate:
113//
114// Copyright (c) 2019 The Crossbeam Project Developers
115//
116// Permission is hereby granted, free of charge, to any
117// person obtaining a copy of this software and associated
118// documentation files (the "Software"), to deal in the
119// Software without restriction, including without
120// limitation the rights to use, copy, modify, merge,
121// publish, distribute, sublicense, and/or sell copies of
122// the Software, and to permit persons to whom the Software
123// is furnished to do so, subject to the following
124// conditions:
125//
126// The above copyright notice and this permission notice
127// shall be included in all copies or substantial portions
128// of the Software.
129//
130// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
131// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
132// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
133// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
134// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
135// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
136// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
137// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
138// DEALINGS IN THE SOFTWARE.
139
140mod array;
141mod context;
142mod counter;
143mod error;
144mod list;
145mod select;
146mod utils;
147mod waker;
148mod zero;
149
150pub use error::*;
151
152use crate::fmt;
153use crate::panic::{RefUnwindSafe, UnwindSafe};
154use crate::time::{Duration, Instant};
155
156/// Creates a new asynchronous channel, returning the sender/receiver halves.
157///
158/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
159/// the same order as it was sent, and no [`send`] will block the calling thread
160/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
161/// block after its buffer limit is reached). [`recv`] will block until a message
162/// is available while there is at least one [`Sender`] alive (including clones).
163///
164/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times.
165/// The [`Receiver`] also can be cloned to have multi receivers.
166///
167/// If the [`Receiver`] is disconnected while trying to [`send`] with the
168/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
169/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
170/// return a [`RecvError`].
171///
172/// [`send`]: Sender::send
173/// [`recv`]: Receiver::recv
174///
175/// # Examples
176///
177/// ```
178/// #![feature(mpmc_channel)]
179///
180/// use std::sync::mpmc::channel;
181/// use std::thread;
182///
183/// let (sender, receiver) = channel();
184///
185/// // Spawn off an expensive computation
186/// thread::spawn(move || {
187/// # fn expensive_computation() {}
188/// sender.send(expensive_computation()).unwrap();
189/// });
190///
191/// // Do some useful work for a while
192///
193/// // Let's see what that answer was
194/// println!("{:?}", receiver.recv().unwrap());
195/// ```
196#[must_use]
197#[unstable(feature = "mpmc_channel", issue = "126840")]
198pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
199 let (s, r) = counter::new(list::Channel::new());
200 let s = Sender { flavor: SenderFlavor::List(s) };
201 let r = Receiver { flavor: ReceiverFlavor::List(r) };
202 (s, r)
203}
204
205/// Creates a new synchronous, bounded channel.
206///
207/// All data sent on the [`Sender`] will become available on the [`Receiver`]
208/// in the same order as it was sent. Like asynchronous [`channel`]s, the
209/// [`Receiver`] will block until a message becomes available. `sync_channel`
210/// differs greatly in the semantics of the sender, however.
211///
212/// This channel has an internal buffer on which messages will be queued.
213/// `bound` specifies the buffer size. When the internal buffer becomes full,
214/// future sends will *block* waiting for the buffer to open up. Note that a
215/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
216/// where each [`send`] will not return until a [`recv`] is paired with it.
217///
218/// The [`Sender`] can be cloned to [`send`] to the same channel multiple
219/// times. The [`Receiver`] also can be cloned to have multi receivers.
220///
221/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
222/// to [`send`] with the [`Sender`], the [`send`] method will return a
223/// [`SendError`]. Similarly, If the [`Sender`] is disconnected while trying
224/// to [`recv`], the [`recv`] method will return a [`RecvError`].
225///
226/// [`send`]: Sender::send
227/// [`recv`]: Receiver::recv
228///
229/// # Examples
230///
231/// ```
232/// use std::sync::mpsc::sync_channel;
233/// use std::thread;
234///
235/// let (sender, receiver) = sync_channel(1);
236///
237/// // this returns immediately
238/// sender.send(1).unwrap();
239///
240/// thread::spawn(move || {
241/// // this will block until the previous message has been received
242/// sender.send(2).unwrap();
243/// });
244///
245/// assert_eq!(receiver.recv().unwrap(), 1);
246/// assert_eq!(receiver.recv().unwrap(), 2);
247/// ```
248#[must_use]
249#[unstable(feature = "mpmc_channel", issue = "126840")]
250pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
251 if cap == 0 {
252 let (s, r) = counter::new(zero::Channel::new());
253 let s = Sender { flavor: SenderFlavor::Zero(s) };
254 let r = Receiver { flavor: ReceiverFlavor::Zero(r) };
255 (s, r)
256 } else {
257 let (s, r) = counter::new(array::Channel::with_capacity(cap));
258 let s = Sender { flavor: SenderFlavor::Array(s) };
259 let r = Receiver { flavor: ReceiverFlavor::Array(r) };
260 (s, r)
261 }
262}
263
264/// The sending-half of Rust's synchronous [`channel`] type.
265///
266/// Messages can be sent through this channel with [`send`].
267///
268/// Note: all senders (the original and its clones) need to be dropped for the receiver
269/// to stop blocking to receive messages with [`Receiver::recv`].
270///
271/// [`send`]: Sender::send
272///
273/// # Examples
274///
275/// ```rust
276/// #![feature(mpmc_channel)]
277///
278/// use std::sync::mpmc::channel;
279/// use std::thread;
280///
281/// let (sender, receiver) = channel();
282/// let sender2 = sender.clone();
283///
284/// // First thread owns sender
285/// thread::spawn(move || {
286/// sender.send(1).unwrap();
287/// });
288///
289/// // Second thread owns sender2
290/// thread::spawn(move || {
291/// sender2.send(2).unwrap();
292/// });
293///
294/// let msg = receiver.recv().unwrap();
295/// let msg2 = receiver.recv().unwrap();
296///
297/// assert_eq!(3, msg + msg2);
298/// ```
299#[unstable(feature = "mpmc_channel", issue = "126840")]
300pub struct Sender<T> {
301 flavor: SenderFlavor<T>,
302}
303
304/// Sender flavors.
305enum SenderFlavor<T> {
306 /// Bounded channel based on a preallocated array.
307 Array(counter::Sender<array::Channel<T>>),
308
309 /// Unbounded channel implemented as a linked list.
310 List(counter::Sender<list::Channel<T>>),
311
312 /// Zero-capacity channel.
313 Zero(counter::Sender<zero::Channel<T>>),
314}
315
316#[unstable(feature = "mpmc_channel", issue = "126840")]
317unsafe impl<T: Send> Send for Sender<T> {}
318#[unstable(feature = "mpmc_channel", issue = "126840")]
319unsafe impl<T: Send> Sync for Sender<T> {}
320
321#[unstable(feature = "mpmc_channel", issue = "126840")]
322impl<T> UnwindSafe for Sender<T> {}
323#[unstable(feature = "mpmc_channel", issue = "126840")]
324impl<T> RefUnwindSafe for Sender<T> {}
325
326impl<T> Sender<T> {
327 /// Attempts to send a message into the channel without blocking.
328 ///
329 /// This method will either send a message into the channel immediately or return an error if
330 /// the channel is full or disconnected. The returned error contains the original message.
331 ///
332 /// If called on a zero-capacity channel, this method will send the message only if there
333 /// happens to be a receive operation on the other side of the channel at the same time.
334 ///
335 /// # Examples
336 ///
337 /// ```rust
338 /// #![feature(mpmc_channel)]
339 ///
340 /// use std::sync::mpmc::{channel, Receiver, Sender};
341 ///
342 /// let (sender, _receiver): (Sender<i32>, Receiver<i32>) = channel();
343 ///
344 /// assert!(sender.try_send(1).is_ok());
345 /// ```
346 #[unstable(feature = "mpmc_channel", issue = "126840")]
347 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
348 match &self.flavor {
349 SenderFlavor::Array(chan) => chan.try_send(msg),
350 SenderFlavor::List(chan) => chan.try_send(msg),
351 SenderFlavor::Zero(chan) => chan.try_send(msg),
352 }
353 }
354
355 /// Attempts to send a value on this channel, returning it back if it could
356 /// not be sent.
357 ///
358 /// A successful send occurs when it is determined that the other end of
359 /// the channel has not hung up already. An unsuccessful send would be one
360 /// where the corresponding receiver has already been deallocated. Note
361 /// that a return value of [`Err`] means that the data will never be
362 /// received, but a return value of [`Ok`] does *not* mean that the data
363 /// will be received. It is possible for the corresponding receiver to
364 /// hang up immediately after this function returns [`Ok`]. However, if
365 /// the channel is zero-capacity, it acts as a rendezvous channel and a
366 /// return value of [`Ok`] means that the data has been received.
367 ///
368 /// If the channel is full and not disconnected, this call will block until
369 /// the send operation can proceed. If the channel becomes disconnected,
370 /// this call will wake up and return an error. The returned error contains
371 /// the original message.
372 ///
373 /// If called on a zero-capacity channel, this method will wait for a receive
374 /// operation to appear on the other side of the channel.
375 ///
376 /// # Examples
377 ///
378 /// ```
379 /// #![feature(mpmc_channel)]
380 ///
381 /// use std::sync::mpmc::channel;
382 ///
383 /// let (tx, rx) = channel();
384 ///
385 /// // This send is always successful
386 /// tx.send(1).unwrap();
387 ///
388 /// // This send will fail because the receiver is gone
389 /// drop(rx);
390 /// assert!(tx.send(1).is_err());
391 /// ```
392 #[unstable(feature = "mpmc_channel", issue = "126840")]
393 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
394 match &self.flavor {
395 SenderFlavor::Array(chan) => chan.send(msg, None),
396 SenderFlavor::List(chan) => chan.send(msg, None),
397 SenderFlavor::Zero(chan) => chan.send(msg, None),
398 }
399 .map_err(|err| match err {
400 SendTimeoutError::Disconnected(msg) => SendError(msg),
401 SendTimeoutError::Timeout(_) => unreachable!(),
402 })
403 }
404}
405
406impl<T> Sender<T> {
407 /// Waits for a message to be sent into the channel, but only for a limited time.
408 ///
409 /// If the channel is full and not disconnected, this call will block until the send operation
410 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
411 /// wake up and return an error. The returned error contains the original message.
412 ///
413 /// If called on a zero-capacity channel, this method will wait for a receive operation to
414 /// appear on the other side of the channel.
415 ///
416 /// # Examples
417 ///
418 /// ```
419 /// #![feature(mpmc_channel)]
420 ///
421 /// use std::sync::mpmc::channel;
422 /// use std::time::Duration;
423 ///
424 /// let (tx, rx) = channel();
425 ///
426 /// tx.send_timeout(1, Duration::from_millis(400)).unwrap();
427 /// ```
428 #[unstable(feature = "mpmc_channel", issue = "126840")]
429 pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
430 match Instant::now().checked_add(timeout) {
431 Some(deadline) => self.send_deadline(msg, deadline),
432 // So far in the future that it's practically the same as waiting indefinitely.
433 None => self.send(msg).map_err(SendTimeoutError::from),
434 }
435 }
436
437 /// Waits for a message to be sent into the channel, but only until a given deadline.
438 ///
439 /// If the channel is full and not disconnected, this call will block until the send operation
440 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
441 /// wake up and return an error. The returned error contains the original message.
442 ///
443 /// If called on a zero-capacity channel, this method will wait for a receive operation to
444 /// appear on the other side of the channel.
445 ///
446 /// # Examples
447 ///
448 /// ```
449 /// #![feature(mpmc_channel)]
450 ///
451 /// use std::sync::mpmc::channel;
452 /// use std::time::{Duration, Instant};
453 ///
454 /// let (tx, rx) = channel();
455 ///
456 /// let t = Instant::now() + Duration::from_millis(400);
457 /// tx.send_deadline(1, t).unwrap();
458 /// ```
459 #[unstable(feature = "mpmc_channel", issue = "126840")]
460 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
461 match &self.flavor {
462 SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
463 SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
464 SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
465 }
466 }
467
468 /// Returns `true` if the channel is empty.
469 ///
470 /// Note: Zero-capacity channels are always empty.
471 ///
472 /// # Examples
473 ///
474 /// ```
475 /// #![feature(mpmc_channel)]
476 ///
477 /// use std::sync::mpmc;
478 /// use std::thread;
479 ///
480 /// let (send, _recv) = mpmc::channel();
481 ///
482 /// let tx1 = send.clone();
483 /// let tx2 = send.clone();
484 ///
485 /// assert!(tx1.is_empty());
486 ///
487 /// let handle = thread::spawn(move || {
488 /// tx2.send(1u8).unwrap();
489 /// });
490 ///
491 /// handle.join().unwrap();
492 ///
493 /// assert!(!tx1.is_empty());
494 /// ```
495 #[unstable(feature = "mpmc_channel", issue = "126840")]
496 pub fn is_empty(&self) -> bool {
497 match &self.flavor {
498 SenderFlavor::Array(chan) => chan.is_empty(),
499 SenderFlavor::List(chan) => chan.is_empty(),
500 SenderFlavor::Zero(chan) => chan.is_empty(),
501 }
502 }
503
504 /// Returns `true` if the channel is full.
505 ///
506 /// Note: Zero-capacity channels are always full.
507 ///
508 /// # Examples
509 ///
510 /// ```
511 /// #![feature(mpmc_channel)]
512 ///
513 /// use std::sync::mpmc;
514 /// use std::thread;
515 ///
516 /// let (send, _recv) = mpmc::sync_channel(1);
517 ///
518 /// let (tx1, tx2) = (send.clone(), send.clone());
519 /// assert!(!tx1.is_full());
520 ///
521 /// let handle = thread::spawn(move || {
522 /// tx2.send(1u8).unwrap();
523 /// });
524 ///
525 /// handle.join().unwrap();
526 ///
527 /// assert!(tx1.is_full());
528 /// ```
529 #[unstable(feature = "mpmc_channel", issue = "126840")]
530 pub fn is_full(&self) -> bool {
531 match &self.flavor {
532 SenderFlavor::Array(chan) => chan.is_full(),
533 SenderFlavor::List(chan) => chan.is_full(),
534 SenderFlavor::Zero(chan) => chan.is_full(),
535 }
536 }
537
538 /// Returns the number of messages in the channel.
539 ///
540 /// # Examples
541 ///
542 /// ```
543 /// #![feature(mpmc_channel)]
544 ///
545 /// use std::sync::mpmc;
546 /// use std::thread;
547 ///
548 /// let (send, _recv) = mpmc::channel();
549 /// let (tx1, tx2) = (send.clone(), send.clone());
550 ///
551 /// assert_eq!(tx1.len(), 0);
552 ///
553 /// let handle = thread::spawn(move || {
554 /// tx2.send(1u8).unwrap();
555 /// });
556 ///
557 /// handle.join().unwrap();
558 ///
559 /// assert_eq!(tx1.len(), 1);
560 /// ```
561 #[unstable(feature = "mpmc_channel", issue = "126840")]
562 pub fn len(&self) -> usize {
563 match &self.flavor {
564 SenderFlavor::Array(chan) => chan.len(),
565 SenderFlavor::List(chan) => chan.len(),
566 SenderFlavor::Zero(chan) => chan.len(),
567 }
568 }
569
570 /// If the channel is bounded, returns its capacity.
571 ///
572 /// # Examples
573 ///
574 /// ```
575 /// #![feature(mpmc_channel)]
576 ///
577 /// use std::sync::mpmc;
578 /// use std::thread;
579 ///
580 /// let (send, _recv) = mpmc::sync_channel(3);
581 /// let (tx1, tx2) = (send.clone(), send.clone());
582 ///
583 /// assert_eq!(tx1.capacity(), Some(3));
584 ///
585 /// let handle = thread::spawn(move || {
586 /// tx2.send(1u8).unwrap();
587 /// });
588 ///
589 /// handle.join().unwrap();
590 ///
591 /// assert_eq!(tx1.capacity(), Some(3));
592 /// ```
593 #[unstable(feature = "mpmc_channel", issue = "126840")]
594 pub fn capacity(&self) -> Option<usize> {
595 match &self.flavor {
596 SenderFlavor::Array(chan) => chan.capacity(),
597 SenderFlavor::List(chan) => chan.capacity(),
598 SenderFlavor::Zero(chan) => chan.capacity(),
599 }
600 }
601
602 /// Returns `true` if senders belong to the same channel.
603 ///
604 /// # Examples
605 ///
606 /// ```
607 /// #![feature(mpmc_channel)]
608 ///
609 /// use std::sync::mpmc;
610 ///
611 /// let (tx1, _) = mpmc::channel::<i32>();
612 /// let (tx2, _) = mpmc::channel::<i32>();
613 ///
614 /// assert!(tx1.same_channel(&tx1));
615 /// assert!(!tx1.same_channel(&tx2));
616 /// ```
617 #[unstable(feature = "mpmc_channel", issue = "126840")]
618 pub fn same_channel(&self, other: &Sender<T>) -> bool {
619 match (&self.flavor, &other.flavor) {
620 (SenderFlavor::Array(a), SenderFlavor::Array(b)) => a == b,
621 (SenderFlavor::List(a), SenderFlavor::List(b)) => a == b,
622 (SenderFlavor::Zero(a), SenderFlavor::Zero(b)) => a == b,
623 _ => false,
624 }
625 }
626
627 /// Returns `true` if the channel is disconnected.
628 ///
629 /// Note that a return value of `false` does not guarantee the channel will
630 /// remain connected. The channel may be disconnected immediately after this method
631 /// returns, so a subsequent [`Sender::send`] may still fail with [`SendError`].
632 ///
633 /// # Examples
634 ///
635 /// ```
636 /// #![feature(mpmc_channel)]
637 ///
638 /// use std::sync::mpmc::channel;
639 ///
640 /// let (tx, rx) = channel::<i32>();
641 /// assert!(!tx.is_disconnected());
642 /// drop(rx);
643 /// assert!(tx.is_disconnected());
644 /// ```
645 #[unstable(feature = "mpmc_channel", issue = "126840")]
646 pub fn is_disconnected(&self) -> bool {
647 match &self.flavor {
648 SenderFlavor::Array(chan) => chan.is_disconnected(),
649 SenderFlavor::List(chan) => chan.is_disconnected(),
650 SenderFlavor::Zero(chan) => chan.is_disconnected(),
651 }
652 }
653}
654
655#[unstable(feature = "mpmc_channel", issue = "126840")]
656impl<T> Drop for Sender<T> {
657 fn drop(&mut self) {
658 unsafe {
659 match &self.flavor {
660 SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()),
661 SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
662 SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
663 }
664 }
665 }
666}
667
668#[unstable(feature = "mpmc_channel", issue = "126840")]
669impl<T> Clone for Sender<T> {
670 fn clone(&self) -> Self {
671 let flavor = match &self.flavor {
672 SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
673 SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
674 SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
675 };
676
677 Sender { flavor }
678 }
679}
680
681#[unstable(feature = "mpmc_channel", issue = "126840")]
682impl<T> fmt::Debug for Sender<T> {
683 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
684 f.debug_struct("Sender").finish_non_exhaustive()
685 }
686}
687
688/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
689/// Different threads can share this [`Receiver`] by cloning it.
690///
691/// Messages sent to the channel can be retrieved using [`recv`].
692///
693/// [`recv`]: Receiver::recv
694///
695/// # Examples
696///
697/// ```rust
698/// #![feature(mpmc_channel)]
699///
700/// use std::sync::mpmc::channel;
701/// use std::thread;
702/// use std::time::Duration;
703///
704/// let (send, recv) = channel();
705///
706/// let tx_thread = thread::spawn(move || {
707/// send.send("Hello world!").unwrap();
708/// thread::sleep(Duration::from_secs(2)); // block for two seconds
709/// send.send("Delayed for 2 seconds").unwrap();
710/// });
711///
712/// let (rx1, rx2) = (recv.clone(), recv.clone());
713/// let rx_thread_1 = thread::spawn(move || {
714/// println!("{}", rx1.recv().unwrap()); // Received immediately
715/// });
716/// let rx_thread_2 = thread::spawn(move || {
717/// println!("{}", rx2.recv().unwrap()); // Received after 2 seconds
718/// });
719///
720/// tx_thread.join().unwrap();
721/// rx_thread_1.join().unwrap();
722/// rx_thread_2.join().unwrap();
723/// ```
724#[unstable(feature = "mpmc_channel", issue = "126840")]
725pub struct Receiver<T> {
726 flavor: ReceiverFlavor<T>,
727}
728
729/// An iterator over messages on a [`Receiver`], created by [`iter`].
730///
731/// This iterator will block whenever [`next`] is called,
732/// waiting for a new message, and [`None`] will be returned
733/// when the corresponding channel has hung up.
734///
735/// [`iter`]: Receiver::iter
736/// [`next`]: Iterator::next
737///
738/// # Examples
739///
740/// ```rust
741/// #![feature(mpmc_channel)]
742///
743/// use std::sync::mpmc::channel;
744/// use std::thread;
745///
746/// let (send, recv) = channel();
747///
748/// thread::spawn(move || {
749/// send.send(1u8).unwrap();
750/// send.send(2u8).unwrap();
751/// send.send(3u8).unwrap();
752/// });
753///
754/// for x in recv.iter() {
755/// println!("Got: {x}");
756/// }
757/// ```
758#[unstable(feature = "mpmc_channel", issue = "126840")]
759#[derive(Debug)]
760pub struct Iter<'a, T: 'a> {
761 rx: &'a Receiver<T>,
762}
763
764/// An iterator that attempts to yield all pending values for a [`Receiver`],
765/// created by [`try_iter`].
766///
767/// [`None`] will be returned when there are no pending values remaining or
768/// if the corresponding channel has hung up.
769///
770/// This iterator will never block the caller in order to wait for data to
771/// become available. Instead, it will return [`None`].
772///
773/// [`try_iter`]: Receiver::try_iter
774///
775/// # Examples
776///
777/// ```rust
778/// #![feature(mpmc_channel)]
779///
780/// use std::sync::mpmc::channel;
781/// use std::thread;
782/// use std::time::Duration;
783///
784/// let (sender, receiver) = channel();
785///
786/// // Nothing is in the buffer yet
787/// assert!(receiver.try_iter().next().is_none());
788/// println!("Nothing in the buffer...");
789///
790/// thread::spawn(move || {
791/// sender.send(1).unwrap();
792/// sender.send(2).unwrap();
793/// sender.send(3).unwrap();
794/// });
795///
796/// println!("Going to sleep...");
797/// thread::sleep(Duration::from_secs(2)); // block for two seconds
798///
799/// for x in receiver.try_iter() {
800/// println!("Got: {x}");
801/// }
802/// ```
803#[unstable(feature = "mpmc_channel", issue = "126840")]
804#[derive(Debug)]
805pub struct TryIter<'a, T: 'a> {
806 rx: &'a Receiver<T>,
807}
808
809/// An owning iterator over messages on a [`Receiver`],
810/// created by [`into_iter`].
811///
812/// This iterator will block whenever [`next`]
813/// is called, waiting for a new message, and [`None`] will be
814/// returned if the corresponding channel has hung up.
815///
816/// [`into_iter`]: Receiver::into_iter
817/// [`next`]: Iterator::next
818///
819/// # Examples
820///
821/// ```rust
822/// #![feature(mpmc_channel)]
823///
824/// use std::sync::mpmc::channel;
825/// use std::thread;
826///
827/// let (send, recv) = channel();
828///
829/// thread::spawn(move || {
830/// send.send(1u8).unwrap();
831/// send.send(2u8).unwrap();
832/// send.send(3u8).unwrap();
833/// });
834///
835/// for x in recv.into_iter() {
836/// println!("Got: {x}");
837/// }
838/// ```
839#[unstable(feature = "mpmc_channel", issue = "126840")]
840#[derive(Debug)]
841pub struct IntoIter<T> {
842 rx: Receiver<T>,
843}
844
845#[unstable(feature = "mpmc_channel", issue = "126840")]
846impl<'a, T> Iterator for Iter<'a, T> {
847 type Item = T;
848
849 fn next(&mut self) -> Option<T> {
850 self.rx.recv().ok()
851 }
852}
853
854#[unstable(feature = "mpmc_channel", issue = "126840")]
855impl<'a, T> Iterator for TryIter<'a, T> {
856 type Item = T;
857
858 fn next(&mut self) -> Option<T> {
859 self.rx.try_recv().ok()
860 }
861}
862
863#[unstable(feature = "mpmc_channel", issue = "126840")]
864impl<'a, T> IntoIterator for &'a Receiver<T> {
865 type Item = T;
866 type IntoIter = Iter<'a, T>;
867
868 fn into_iter(self) -> Iter<'a, T> {
869 self.iter()
870 }
871}
872
873#[unstable(feature = "mpmc_channel", issue = "126840")]
874impl<T> Iterator for IntoIter<T> {
875 type Item = T;
876 fn next(&mut self) -> Option<T> {
877 self.rx.recv().ok()
878 }
879}
880
881#[unstable(feature = "mpmc_channel", issue = "126840")]
882impl<T> IntoIterator for Receiver<T> {
883 type Item = T;
884 type IntoIter = IntoIter<T>;
885
886 fn into_iter(self) -> IntoIter<T> {
887 IntoIter { rx: self }
888 }
889}
890
891/// Receiver flavors.
892enum ReceiverFlavor<T> {
893 /// Bounded channel based on a preallocated array.
894 Array(counter::Receiver<array::Channel<T>>),
895
896 /// Unbounded channel implemented as a linked list.
897 List(counter::Receiver<list::Channel<T>>),
898
899 /// Zero-capacity channel.
900 Zero(counter::Receiver<zero::Channel<T>>),
901}
902
903#[unstable(feature = "mpmc_channel", issue = "126840")]
904unsafe impl<T: Send> Send for Receiver<T> {}
905#[unstable(feature = "mpmc_channel", issue = "126840")]
906unsafe impl<T: Send> Sync for Receiver<T> {}
907
908#[unstable(feature = "mpmc_channel", issue = "126840")]
909impl<T> UnwindSafe for Receiver<T> {}
910#[unstable(feature = "mpmc_channel", issue = "126840")]
911impl<T> RefUnwindSafe for Receiver<T> {}
912
913impl<T> Receiver<T> {
914 /// Attempts to receive a message from the channel without blocking.
915 ///
916 /// This method will never block the caller in order to wait for data to
917 /// become available. Instead, this will always return immediately with a
918 /// possible option of pending data on the channel.
919 ///
920 /// If called on a zero-capacity channel, this method will receive a message only if there
921 /// happens to be a send operation on the other side of the channel at the same time.
922 ///
923 /// This is useful for a flavor of "optimistic check" before deciding to
924 /// block on a receiver.
925 ///
926 /// Compared with [`recv`], this function has two failure cases instead of one
927 /// (one for disconnection, one for an empty buffer).
928 ///
929 /// [`recv`]: Self::recv
930 ///
931 /// # Examples
932 ///
933 /// ```rust
934 /// #![feature(mpmc_channel)]
935 ///
936 /// use std::sync::mpmc::{Receiver, channel};
937 ///
938 /// let (_, receiver): (_, Receiver<i32>) = channel();
939 ///
940 /// assert!(receiver.try_recv().is_err());
941 /// ```
942 #[unstable(feature = "mpmc_channel", issue = "126840")]
943 pub fn try_recv(&self) -> Result<T, TryRecvError> {
944 match &self.flavor {
945 ReceiverFlavor::Array(chan) => chan.try_recv(),
946 ReceiverFlavor::List(chan) => chan.try_recv(),
947 ReceiverFlavor::Zero(chan) => chan.try_recv(),
948 }
949 }
950
951 /// Attempts to wait for a value on this receiver, returning an error if the
952 /// corresponding channel has hung up.
953 ///
954 /// This function will always block the current thread if there is no data
955 /// available and it's possible for more data to be sent (at least one sender
956 /// still exists). Once a message is sent to the corresponding [`Sender`],
957 /// this receiver will wake up and return that message.
958 ///
959 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
960 /// this call is blocking, this call will wake up and return [`Err`] to
961 /// indicate that no more messages can ever be received on this channel.
962 /// However, since channels are buffered, messages sent before the disconnect
963 /// will still be properly received.
964 ///
965 /// # Examples
966 ///
967 /// ```
968 /// #![feature(mpmc_channel)]
969 ///
970 /// use std::sync::mpmc;
971 /// use std::thread;
972 ///
973 /// let (send, recv) = mpmc::channel();
974 /// let handle = thread::spawn(move || {
975 /// send.send(1u8).unwrap();
976 /// });
977 ///
978 /// handle.join().unwrap();
979 ///
980 /// assert_eq!(Ok(1), recv.recv());
981 /// ```
982 ///
983 /// Buffering behavior:
984 ///
985 /// ```
986 /// #![feature(mpmc_channel)]
987 ///
988 /// use std::sync::mpmc;
989 /// use std::thread;
990 /// use std::sync::mpmc::RecvError;
991 ///
992 /// let (send, recv) = mpmc::channel();
993 /// let handle = thread::spawn(move || {
994 /// send.send(1u8).unwrap();
995 /// send.send(2).unwrap();
996 /// send.send(3).unwrap();
997 /// drop(send);
998 /// });
999 ///
1000 /// // wait for the thread to join so we ensure the sender is dropped
1001 /// handle.join().unwrap();
1002 ///
1003 /// assert_eq!(Ok(1), recv.recv());
1004 /// assert_eq!(Ok(2), recv.recv());
1005 /// assert_eq!(Ok(3), recv.recv());
1006 /// assert_eq!(Err(RecvError), recv.recv());
1007 /// ```
1008 #[unstable(feature = "mpmc_channel", issue = "126840")]
1009 pub fn recv(&self) -> Result<T, RecvError> {
1010 match &self.flavor {
1011 ReceiverFlavor::Array(chan) => chan.recv(None),
1012 ReceiverFlavor::List(chan) => chan.recv(None),
1013 ReceiverFlavor::Zero(chan) => chan.recv(None),
1014 }
1015 .map_err(|_| RecvError)
1016 }
1017
1018 /// Attempts to wait for a value on this receiver, returning an error if the
1019 /// corresponding channel has hung up, or if it waits more than `timeout`.
1020 ///
1021 /// This function will always block the current thread if there is no data
1022 /// available and it's possible for more data to be sent (at least one sender
1023 /// still exists). Once a message is sent to the corresponding [`Sender`],
1024 /// this receiver will wake up and return that message.
1025 ///
1026 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1027 /// this call is blocking, this call will wake up and return [`Err`] to
1028 /// indicate that no more messages can ever be received on this channel.
1029 /// However, since channels are buffered, messages sent before the disconnect
1030 /// will still be properly received.
1031 ///
1032 /// # Examples
1033 ///
1034 /// Successfully receiving value before encountering timeout:
1035 ///
1036 /// ```no_run
1037 /// #![feature(mpmc_channel)]
1038 ///
1039 /// use std::thread;
1040 /// use std::time::Duration;
1041 /// use std::sync::mpmc;
1042 ///
1043 /// let (send, recv) = mpmc::channel();
1044 ///
1045 /// thread::spawn(move || {
1046 /// send.send('a').unwrap();
1047 /// });
1048 ///
1049 /// assert_eq!(
1050 /// recv.recv_timeout(Duration::from_millis(400)),
1051 /// Ok('a')
1052 /// );
1053 /// ```
1054 ///
1055 /// Receiving an error upon reaching timeout:
1056 ///
1057 /// ```no_run
1058 /// #![feature(mpmc_channel)]
1059 ///
1060 /// use std::thread;
1061 /// use std::time::Duration;
1062 /// use std::sync::mpmc;
1063 ///
1064 /// let (send, recv) = mpmc::channel();
1065 ///
1066 /// thread::spawn(move || {
1067 /// thread::sleep(Duration::from_millis(800));
1068 /// send.send('a').unwrap();
1069 /// });
1070 ///
1071 /// assert_eq!(
1072 /// recv.recv_timeout(Duration::from_millis(400)),
1073 /// Err(mpmc::RecvTimeoutError::Timeout)
1074 /// );
1075 /// ```
1076 #[unstable(feature = "mpmc_channel", issue = "126840")]
1077 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1078 match Instant::now().checked_add(timeout) {
1079 Some(deadline) => self.recv_deadline(deadline),
1080 // So far in the future that it's practically the same as waiting indefinitely.
1081 None => self.recv().map_err(RecvTimeoutError::from),
1082 }
1083 }
1084
1085 /// Attempts to wait for a value on this receiver, returning an error if the
1086 /// corresponding channel has hung up, or if `deadline` is reached.
1087 ///
1088 /// This function will always block the current thread if there is no data
1089 /// available and it's possible for more data to be sent. Once a message is
1090 /// sent to the corresponding [`Sender`], then this receiver will wake up
1091 /// and return that message.
1092 ///
1093 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1094 /// this call is blocking, this call will wake up and return [`Err`] to
1095 /// indicate that no more messages can ever be received on this channel.
1096 /// However, since channels are buffered, messages sent before the disconnect
1097 /// will still be properly received.
1098 ///
1099 /// # Examples
1100 ///
1101 /// Successfully receiving value before reaching deadline:
1102 ///
1103 /// ```no_run
1104 /// #![feature(mpmc_channel)]
1105 ///
1106 /// use std::thread;
1107 /// use std::time::{Duration, Instant};
1108 /// use std::sync::mpmc;
1109 ///
1110 /// let (send, recv) = mpmc::channel();
1111 ///
1112 /// thread::spawn(move || {
1113 /// send.send('a').unwrap();
1114 /// });
1115 ///
1116 /// assert_eq!(
1117 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1118 /// Ok('a')
1119 /// );
1120 /// ```
1121 ///
1122 /// Receiving an error upon reaching deadline:
1123 ///
1124 /// ```no_run
1125 /// #![feature(mpmc_channel)]
1126 ///
1127 /// use std::thread;
1128 /// use std::time::{Duration, Instant};
1129 /// use std::sync::mpmc;
1130 ///
1131 /// let (send, recv) = mpmc::channel();
1132 ///
1133 /// thread::spawn(move || {
1134 /// thread::sleep(Duration::from_millis(800));
1135 /// send.send('a').unwrap();
1136 /// });
1137 ///
1138 /// assert_eq!(
1139 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1140 /// Err(mpmc::RecvTimeoutError::Timeout)
1141 /// );
1142 /// ```
1143 #[unstable(feature = "mpmc_channel", issue = "126840")]
1144 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1145 match &self.flavor {
1146 ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
1147 ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
1148 ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
1149 }
1150 }
1151
1152 /// Returns an iterator that will attempt to yield all pending values.
1153 /// It will return `None` if there are no more pending values or if the
1154 /// channel has hung up. The iterator will never [`panic!`] or block the
1155 /// user by waiting for values.
1156 ///
1157 /// # Examples
1158 ///
1159 /// ```no_run
1160 /// #![feature(mpmc_channel)]
1161 ///
1162 /// use std::sync::mpmc::channel;
1163 /// use std::thread;
1164 /// use std::time::Duration;
1165 ///
1166 /// let (sender, receiver) = channel();
1167 ///
1168 /// // nothing is in the buffer yet
1169 /// assert!(receiver.try_iter().next().is_none());
1170 ///
1171 /// thread::spawn(move || {
1172 /// thread::sleep(Duration::from_secs(1));
1173 /// sender.send(1).unwrap();
1174 /// sender.send(2).unwrap();
1175 /// sender.send(3).unwrap();
1176 /// });
1177 ///
1178 /// // nothing is in the buffer yet
1179 /// assert!(receiver.try_iter().next().is_none());
1180 ///
1181 /// // block for two seconds
1182 /// thread::sleep(Duration::from_secs(2));
1183 ///
1184 /// let mut iter = receiver.try_iter();
1185 /// assert_eq!(iter.next(), Some(1));
1186 /// assert_eq!(iter.next(), Some(2));
1187 /// assert_eq!(iter.next(), Some(3));
1188 /// assert_eq!(iter.next(), None);
1189 /// ```
1190 #[unstable(feature = "mpmc_channel", issue = "126840")]
1191 pub fn try_iter(&self) -> TryIter<'_, T> {
1192 TryIter { rx: self }
1193 }
1194}
1195
1196impl<T> Receiver<T> {
1197 /// Returns `true` if the channel is empty.
1198 ///
1199 /// Note: Zero-capacity channels are always empty.
1200 ///
1201 /// # Examples
1202 ///
1203 /// ```
1204 /// #![feature(mpmc_channel)]
1205 ///
1206 /// use std::sync::mpmc;
1207 /// use std::thread;
1208 ///
1209 /// let (send, recv) = mpmc::channel();
1210 ///
1211 /// assert!(recv.is_empty());
1212 ///
1213 /// let handle = thread::spawn(move || {
1214 /// send.send(1u8).unwrap();
1215 /// });
1216 ///
1217 /// handle.join().unwrap();
1218 ///
1219 /// assert!(!recv.is_empty());
1220 /// ```
1221 #[unstable(feature = "mpmc_channel", issue = "126840")]
1222 pub fn is_empty(&self) -> bool {
1223 match &self.flavor {
1224 ReceiverFlavor::Array(chan) => chan.is_empty(),
1225 ReceiverFlavor::List(chan) => chan.is_empty(),
1226 ReceiverFlavor::Zero(chan) => chan.is_empty(),
1227 }
1228 }
1229
1230 /// Returns `true` if the channel is full.
1231 ///
1232 /// Note: Zero-capacity channels are always full.
1233 ///
1234 /// # Examples
1235 ///
1236 /// ```
1237 /// #![feature(mpmc_channel)]
1238 ///
1239 /// use std::sync::mpmc;
1240 /// use std::thread;
1241 ///
1242 /// let (send, recv) = mpmc::sync_channel(1);
1243 ///
1244 /// assert!(!recv.is_full());
1245 ///
1246 /// let handle = thread::spawn(move || {
1247 /// send.send(1u8).unwrap();
1248 /// });
1249 ///
1250 /// handle.join().unwrap();
1251 ///
1252 /// assert!(recv.is_full());
1253 /// ```
1254 #[unstable(feature = "mpmc_channel", issue = "126840")]
1255 pub fn is_full(&self) -> bool {
1256 match &self.flavor {
1257 ReceiverFlavor::Array(chan) => chan.is_full(),
1258 ReceiverFlavor::List(chan) => chan.is_full(),
1259 ReceiverFlavor::Zero(chan) => chan.is_full(),
1260 }
1261 }
1262
1263 /// Returns the number of messages in the channel.
1264 ///
1265 /// # Examples
1266 ///
1267 /// ```
1268 /// #![feature(mpmc_channel)]
1269 ///
1270 /// use std::sync::mpmc;
1271 /// use std::thread;
1272 ///
1273 /// let (send, recv) = mpmc::channel();
1274 ///
1275 /// assert_eq!(recv.len(), 0);
1276 ///
1277 /// let handle = thread::spawn(move || {
1278 /// send.send(1u8).unwrap();
1279 /// });
1280 ///
1281 /// handle.join().unwrap();
1282 ///
1283 /// assert_eq!(recv.len(), 1);
1284 /// ```
1285 #[unstable(feature = "mpmc_channel", issue = "126840")]
1286 pub fn len(&self) -> usize {
1287 match &self.flavor {
1288 ReceiverFlavor::Array(chan) => chan.len(),
1289 ReceiverFlavor::List(chan) => chan.len(),
1290 ReceiverFlavor::Zero(chan) => chan.len(),
1291 }
1292 }
1293
1294 /// If the channel is bounded, returns its capacity.
1295 ///
1296 /// # Examples
1297 ///
1298 /// ```
1299 /// #![feature(mpmc_channel)]
1300 ///
1301 /// use std::sync::mpmc;
1302 /// use std::thread;
1303 ///
1304 /// let (send, recv) = mpmc::sync_channel(3);
1305 ///
1306 /// assert_eq!(recv.capacity(), Some(3));
1307 ///
1308 /// let handle = thread::spawn(move || {
1309 /// send.send(1u8).unwrap();
1310 /// });
1311 ///
1312 /// handle.join().unwrap();
1313 ///
1314 /// assert_eq!(recv.capacity(), Some(3));
1315 /// ```
1316 #[unstable(feature = "mpmc_channel", issue = "126840")]
1317 pub fn capacity(&self) -> Option<usize> {
1318 match &self.flavor {
1319 ReceiverFlavor::Array(chan) => chan.capacity(),
1320 ReceiverFlavor::List(chan) => chan.capacity(),
1321 ReceiverFlavor::Zero(chan) => chan.capacity(),
1322 }
1323 }
1324
1325 /// Returns `true` if receivers belong to the same channel.
1326 ///
1327 /// # Examples
1328 ///
1329 /// ```
1330 /// #![feature(mpmc_channel)]
1331 ///
1332 /// use std::sync::mpmc;
1333 ///
1334 /// let (_, rx1) = mpmc::channel::<i32>();
1335 /// let (_, rx2) = mpmc::channel::<i32>();
1336 ///
1337 /// assert!(rx1.same_channel(&rx1));
1338 /// assert!(!rx1.same_channel(&rx2));
1339 /// ```
1340 #[unstable(feature = "mpmc_channel", issue = "126840")]
1341 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1342 match (&self.flavor, &other.flavor) {
1343 (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1344 (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1345 (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1346 _ => false,
1347 }
1348 }
1349
1350 /// Returns an iterator that will block waiting for messages, but never
1351 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1352 ///
1353 /// # Examples
1354 ///
1355 /// ```rust
1356 /// #![feature(mpmc_channel)]
1357 ///
1358 /// use std::sync::mpmc::channel;
1359 /// use std::thread;
1360 ///
1361 /// let (send, recv) = channel();
1362 ///
1363 /// thread::spawn(move || {
1364 /// send.send(1).unwrap();
1365 /// send.send(2).unwrap();
1366 /// send.send(3).unwrap();
1367 /// });
1368 ///
1369 /// let mut iter = recv.iter();
1370 /// assert_eq!(iter.next(), Some(1));
1371 /// assert_eq!(iter.next(), Some(2));
1372 /// assert_eq!(iter.next(), Some(3));
1373 /// assert_eq!(iter.next(), None);
1374 /// ```
1375 #[unstable(feature = "mpmc_channel", issue = "126840")]
1376 pub fn iter(&self) -> Iter<'_, T> {
1377 Iter { rx: self }
1378 }
1379
1380 /// Returns `true` if the channel is disconnected.
1381 ///
1382 /// Note that a return value of `false` does not guarantee the channel will
1383 /// remain connected. The channel may be disconnected immediately after this method
1384 /// returns, so a subsequent [`Receiver::recv`] may still fail with [`RecvError`].
1385 ///
1386 /// # Examples
1387 ///
1388 /// ```
1389 /// #![feature(mpmc_channel)]
1390 ///
1391 /// use std::sync::mpmc::channel;
1392 ///
1393 /// let (tx, rx) = channel::<i32>();
1394 /// assert!(!rx.is_disconnected());
1395 /// drop(tx);
1396 /// assert!(rx.is_disconnected());
1397 /// ```
1398 #[unstable(feature = "mpmc_channel", issue = "126840")]
1399 pub fn is_disconnected(&self) -> bool {
1400 match &self.flavor {
1401 ReceiverFlavor::Array(chan) => chan.is_disconnected(),
1402 ReceiverFlavor::List(chan) => chan.is_disconnected(),
1403 ReceiverFlavor::Zero(chan) => chan.is_disconnected(),
1404 }
1405 }
1406}
1407
1408#[unstable(feature = "mpmc_channel", issue = "126840")]
1409impl<T> Drop for Receiver<T> {
1410 fn drop(&mut self) {
1411 unsafe {
1412 match &self.flavor {
1413 ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()),
1414 ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1415 ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1416 }
1417 }
1418 }
1419}
1420
1421#[unstable(feature = "mpmc_channel", issue = "126840")]
1422impl<T> Clone for Receiver<T> {
1423 fn clone(&self) -> Self {
1424 let flavor = match &self.flavor {
1425 ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1426 ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1427 ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1428 };
1429
1430 Receiver { flavor }
1431 }
1432}
1433
1434#[unstable(feature = "mpmc_channel", issue = "126840")]
1435impl<T> fmt::Debug for Receiver<T> {
1436 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1437 f.debug_struct("Receiver").finish_non_exhaustive()
1438 }
1439}
1440
1441#[cfg(test)]
1442mod tests;