std/sync/mpmc/mod.rs
1//! Multi-producer, multi-consumer FIFO queue communication primitives.
2//!
3//! This module provides message-based communication over channels, concretely
4//! defined by two types:
5//!
6//! * [`Sender`]
7//! * [`Receiver`]
8//!
9//! [`Sender`]s are used to send data to a set of [`Receiver`]s where each item
10//! sent is delivered to (at most) one receiver. Both sender and receiver are
11//! cloneable (multi-producer) such that many threads can send simultaneously
12//! to receivers (multi-consumer).
13//!
14//! These channels come in two flavors:
15//!
16//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
17//! will return a `(Sender, Receiver)` tuple where all sends will be
18//! **asynchronous** (they never block). The channel conceptually has an
19//! infinite buffer.
20//!
21//! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22//! return a `(Sender, Receiver)` tuple where the storage for pending
23//! messages is a pre-allocated buffer of a fixed size. All sends will be
24//! **synchronous** by blocking until there is buffer space available. Note
25//! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26//! channel where each sender atomically hands off a message to a receiver.
27//!
28//! [`send`]: Sender::send
29//!
30//! ## Disconnection
31//!
32//! The send and receive operations on channels will all return a [`Result`]
33//! indicating whether the operation succeeded or not. An unsuccessful operation
34//! is normally indicative of the other half of a channel having "hung up" by
35//! being dropped in its corresponding thread.
36//!
37//! Once half of a channel has been deallocated, most operations can no longer
38//! continue to make progress, so [`Err`] will be returned. Many applications
39//! will continue to [`unwrap`] the results returned from this module,
40//! instigating a propagation of failure among threads if one unexpectedly dies.
41//!
42//! [`unwrap`]: Result::unwrap
43//!
44//! # Examples
45//!
46//! Simple usage:
47//!
48//! ```
49//! #![feature(mpmc_channel)]
50//!
51//! use std::thread;
52//! use std::sync::mpmc::channel;
53//!
54//! // Create a simple streaming channel
55//! let (tx, rx) = channel();
56//! thread::spawn(move || {
57//! tx.send(10).unwrap();
58//! });
59//! assert_eq!(rx.recv().unwrap(), 10);
60//! ```
61//!
62//! Shared usage:
63//!
64//! ```
65//! #![feature(mpmc_channel)]
66//!
67//! use std::thread;
68//! use std::sync::mpmc::channel;
69//!
70//! thread::scope(|s| {
71//! // Create a shared channel that can be sent along from many threads
72//! // where tx is the sending half (tx for transmission), and rx is the receiving
73//! // half (rx for receiving).
74//! let (tx, rx) = channel();
75//! for i in 0..10 {
76//! let tx = tx.clone();
77//! s.spawn(move || {
78//! tx.send(i).unwrap();
79//! });
80//! }
81//!
82//! for _ in 0..5 {
83//! let rx1 = rx.clone();
84//! let rx2 = rx.clone();
85//! s.spawn(move || {
86//! let j = rx1.recv().unwrap();
87//! assert!(0 <= j && j < 10);
88//! });
89//! s.spawn(move || {
90//! let j = rx2.recv().unwrap();
91//! assert!(0 <= j && j < 10);
92//! });
93//! }
94//! })
95//! ```
96//!
97//! Propagating panics:
98//!
99//! ```
100//! #![feature(mpmc_channel)]
101//!
102//! use std::sync::mpmc::channel;
103//!
104//! // The call to recv() will return an error because the channel has already
105//! // hung up (or been deallocated)
106//! let (tx, rx) = channel::<i32>();
107//! drop(tx);
108//! assert!(rx.recv().is_err());
109//! ```
110
111// This module is used as the implementation for the channels in `sync::mpsc`.
112// The implementation comes from the crossbeam-channel crate:
113//
114// Copyright (c) 2019 The Crossbeam Project Developers
115//
116// Permission is hereby granted, free of charge, to any
117// person obtaining a copy of this software and associated
118// documentation files (the "Software"), to deal in the
119// Software without restriction, including without
120// limitation the rights to use, copy, modify, merge,
121// publish, distribute, sublicense, and/or sell copies of
122// the Software, and to permit persons to whom the Software
123// is furnished to do so, subject to the following
124// conditions:
125//
126// The above copyright notice and this permission notice
127// shall be included in all copies or substantial portions
128// of the Software.
129//
130// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
131// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
132// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
133// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
134// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
135// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
136// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
137// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
138// DEALINGS IN THE SOFTWARE.
139
140mod array;
141mod context;
142mod counter;
143mod error;
144mod list;
145mod select;
146mod utils;
147mod waker;
148mod zero;
149
150pub use error::*;
151
152use crate::fmt;
153use crate::panic::{RefUnwindSafe, UnwindSafe};
154use crate::time::{Duration, Instant};
155
156/// Creates a new asynchronous channel, returning the sender/receiver halves.
157///
158/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
159/// the same order as it was sent, and no [`send`] will block the calling thread
160/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
161/// block after its buffer limit is reached). [`recv`] will block until a message
162/// is available while there is at least one [`Sender`] alive (including clones).
163///
164/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times.
165/// The [`Receiver`] also can be cloned to have multi receivers.
166///
167/// If the [`Receiver`] is disconnected while trying to [`send`] with the
168/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
169/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
170/// return a [`RecvError`].
171///
172/// [`send`]: Sender::send
173/// [`recv`]: Receiver::recv
174///
175/// # Examples
176///
177/// ```
178/// #![feature(mpmc_channel)]
179///
180/// use std::sync::mpmc::channel;
181/// use std::thread;
182///
183/// let (sender, receiver) = channel();
184///
185/// // Spawn off an expensive computation
186/// thread::spawn(move || {
187/// # fn expensive_computation() {}
188/// sender.send(expensive_computation()).unwrap();
189/// });
190///
191/// // Do some useful work for a while
192///
193/// // Let's see what that answer was
194/// println!("{:?}", receiver.recv().unwrap());
195/// ```
196#[must_use]
197#[unstable(feature = "mpmc_channel", issue = "126840")]
198pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
199 let (s, r) = counter::new(list::Channel::new());
200 let s = Sender { flavor: SenderFlavor::List(s) };
201 let r = Receiver { flavor: ReceiverFlavor::List(r) };
202 (s, r)
203}
204
205/// Creates a new synchronous, bounded channel.
206///
207/// All data sent on the [`Sender`] will become available on the [`Receiver`]
208/// in the same order as it was sent. Like asynchronous [`channel`]s, the
209/// [`Receiver`] will block until a message becomes available. `sync_channel`
210/// differs greatly in the semantics of the sender, however.
211///
212/// This channel has an internal buffer on which messages will be queued.
213/// `bound` specifies the buffer size. When the internal buffer becomes full,
214/// future sends will *block* waiting for the buffer to open up. Note that a
215/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
216/// where each [`send`] will not return until a [`recv`] is paired with it.
217///
218/// The [`Sender`] can be cloned to [`send`] to the same channel multiple
219/// times. The [`Receiver`] also can be cloned to have multi receivers.
220///
221/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
222/// to [`send`] with the [`Sender`], the [`send`] method will return a
223/// [`SendError`]. Similarly, If the [`Sender`] is disconnected while trying
224/// to [`recv`], the [`recv`] method will return a [`RecvError`].
225///
226/// [`send`]: Sender::send
227/// [`recv`]: Receiver::recv
228///
229/// # Examples
230///
231/// ```
232/// use std::sync::mpsc::sync_channel;
233/// use std::thread;
234///
235/// let (sender, receiver) = sync_channel(1);
236///
237/// // this returns immediately
238/// sender.send(1).unwrap();
239///
240/// thread::spawn(move || {
241/// // this will block until the previous message has been received
242/// sender.send(2).unwrap();
243/// });
244///
245/// assert_eq!(receiver.recv().unwrap(), 1);
246/// assert_eq!(receiver.recv().unwrap(), 2);
247/// ```
248#[must_use]
249#[unstable(feature = "mpmc_channel", issue = "126840")]
250pub fn sync_channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
251 if cap == 0 {
252 let (s, r) = counter::new(zero::Channel::new());
253 let s = Sender { flavor: SenderFlavor::Zero(s) };
254 let r = Receiver { flavor: ReceiverFlavor::Zero(r) };
255 (s, r)
256 } else {
257 let (s, r) = counter::new(array::Channel::with_capacity(cap));
258 let s = Sender { flavor: SenderFlavor::Array(s) };
259 let r = Receiver { flavor: ReceiverFlavor::Array(r) };
260 (s, r)
261 }
262}
263
264/// The sending-half of Rust's synchronous [`channel`] type.
265///
266/// Messages can be sent through this channel with [`send`].
267///
268/// Note: all senders (the original and its clones) need to be dropped for the receiver
269/// to stop blocking to receive messages with [`Receiver::recv`].
270///
271/// [`send`]: Sender::send
272///
273/// # Examples
274///
275/// ```rust
276/// #![feature(mpmc_channel)]
277///
278/// use std::sync::mpmc::channel;
279/// use std::thread;
280///
281/// let (sender, receiver) = channel();
282/// let sender2 = sender.clone();
283///
284/// // First thread owns sender
285/// thread::spawn(move || {
286/// sender.send(1).unwrap();
287/// });
288///
289/// // Second thread owns sender2
290/// thread::spawn(move || {
291/// sender2.send(2).unwrap();
292/// });
293///
294/// let msg = receiver.recv().unwrap();
295/// let msg2 = receiver.recv().unwrap();
296///
297/// assert_eq!(3, msg + msg2);
298/// ```
299#[unstable(feature = "mpmc_channel", issue = "126840")]
300#[cfg_attr(not(test), rustc_diagnostic_item = "MpmcSender")]
301pub struct Sender<T> {
302 flavor: SenderFlavor<T>,
303}
304
305/// Sender flavors.
306enum SenderFlavor<T> {
307 /// Bounded channel based on a preallocated array.
308 Array(counter::Sender<array::Channel<T>>),
309
310 /// Unbounded channel implemented as a linked list.
311 List(counter::Sender<list::Channel<T>>),
312
313 /// Zero-capacity channel.
314 Zero(counter::Sender<zero::Channel<T>>),
315}
316
317#[unstable(feature = "mpmc_channel", issue = "126840")]
318unsafe impl<T: Send> Send for Sender<T> {}
319#[unstable(feature = "mpmc_channel", issue = "126840")]
320unsafe impl<T: Send> Sync for Sender<T> {}
321
322#[unstable(feature = "mpmc_channel", issue = "126840")]
323impl<T> UnwindSafe for Sender<T> {}
324#[unstable(feature = "mpmc_channel", issue = "126840")]
325impl<T> RefUnwindSafe for Sender<T> {}
326
327impl<T> Sender<T> {
328 /// Attempts to send a message into the channel without blocking.
329 ///
330 /// This method will either send a message into the channel immediately or return an error if
331 /// the channel is full or disconnected. The returned error contains the original message.
332 ///
333 /// If called on a zero-capacity channel, this method will send the message only if there
334 /// happens to be a receive operation on the other side of the channel at the same time.
335 ///
336 /// # Examples
337 ///
338 /// ```rust
339 /// #![feature(mpmc_channel)]
340 ///
341 /// use std::sync::mpmc::{channel, Receiver, Sender};
342 ///
343 /// let (sender, _receiver): (Sender<i32>, Receiver<i32>) = channel();
344 ///
345 /// assert!(sender.try_send(1).is_ok());
346 /// ```
347 #[unstable(feature = "mpmc_channel", issue = "126840")]
348 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
349 match &self.flavor {
350 SenderFlavor::Array(chan) => chan.try_send(msg),
351 SenderFlavor::List(chan) => chan.try_send(msg),
352 SenderFlavor::Zero(chan) => chan.try_send(msg),
353 }
354 }
355
356 /// Attempts to send a value on this channel, returning it back if it could
357 /// not be sent.
358 ///
359 /// A successful send occurs when it is determined that the other end of
360 /// the channel has not hung up already. An unsuccessful send would be one
361 /// where the corresponding receiver has already been deallocated. Note
362 /// that a return value of [`Err`] means that the data will never be
363 /// received, but a return value of [`Ok`] does *not* mean that the data
364 /// will be received. It is possible for the corresponding receiver to
365 /// hang up immediately after this function returns [`Ok`]. However, if
366 /// the channel is zero-capacity, it acts as a rendezvous channel and a
367 /// return value of [`Ok`] means that the data has been received.
368 ///
369 /// If the channel is full and not disconnected, this call will block until
370 /// the send operation can proceed. If the channel becomes disconnected,
371 /// this call will wake up and return an error. The returned error contains
372 /// the original message.
373 ///
374 /// If called on a zero-capacity channel, this method will wait for a receive
375 /// operation to appear on the other side of the channel.
376 ///
377 /// # Examples
378 ///
379 /// ```
380 /// #![feature(mpmc_channel)]
381 ///
382 /// use std::sync::mpmc::channel;
383 ///
384 /// let (tx, rx) = channel();
385 ///
386 /// // This send is always successful
387 /// tx.send(1).unwrap();
388 ///
389 /// // This send will fail because the receiver is gone
390 /// drop(rx);
391 /// assert!(tx.send(1).is_err());
392 /// ```
393 #[unstable(feature = "mpmc_channel", issue = "126840")]
394 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
395 match &self.flavor {
396 SenderFlavor::Array(chan) => chan.send(msg, None),
397 SenderFlavor::List(chan) => chan.send(msg, None),
398 SenderFlavor::Zero(chan) => chan.send(msg, None),
399 }
400 .map_err(|err| match err {
401 SendTimeoutError::Disconnected(msg) => SendError(msg),
402 SendTimeoutError::Timeout(_) => unreachable!(),
403 })
404 }
405}
406
407impl<T> Sender<T> {
408 /// Waits for a message to be sent into the channel, but only for a limited time.
409 ///
410 /// If the channel is full and not disconnected, this call will block until the send operation
411 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
412 /// wake up and return an error. The returned error contains the original message.
413 ///
414 /// If called on a zero-capacity channel, this method will wait for a receive operation to
415 /// appear on the other side of the channel.
416 ///
417 /// # Examples
418 ///
419 /// ```
420 /// #![feature(mpmc_channel)]
421 ///
422 /// use std::sync::mpmc::channel;
423 /// use std::time::Duration;
424 ///
425 /// let (tx, rx) = channel();
426 ///
427 /// tx.send_timeout(1, Duration::from_millis(400)).unwrap();
428 /// ```
429 #[unstable(feature = "mpmc_channel", issue = "126840")]
430 pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
431 match Instant::now().checked_add(timeout) {
432 Some(deadline) => self.send_deadline(msg, deadline),
433 // So far in the future that it's practically the same as waiting indefinitely.
434 None => self.send(msg).map_err(SendTimeoutError::from),
435 }
436 }
437
438 /// Waits for a message to be sent into the channel, but only until a given deadline.
439 ///
440 /// If the channel is full and not disconnected, this call will block until the send operation
441 /// can proceed or the operation times out. If the channel becomes disconnected, this call will
442 /// wake up and return an error. The returned error contains the original message.
443 ///
444 /// If called on a zero-capacity channel, this method will wait for a receive operation to
445 /// appear on the other side of the channel.
446 ///
447 /// # Examples
448 ///
449 /// ```
450 /// #![feature(mpmc_channel)]
451 ///
452 /// use std::sync::mpmc::channel;
453 /// use std::time::{Duration, Instant};
454 ///
455 /// let (tx, rx) = channel();
456 ///
457 /// let t = Instant::now() + Duration::from_millis(400);
458 /// tx.send_deadline(1, t).unwrap();
459 /// ```
460 #[unstable(feature = "mpmc_channel", issue = "126840")]
461 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
462 match &self.flavor {
463 SenderFlavor::Array(chan) => chan.send(msg, Some(deadline)),
464 SenderFlavor::List(chan) => chan.send(msg, Some(deadline)),
465 SenderFlavor::Zero(chan) => chan.send(msg, Some(deadline)),
466 }
467 }
468
469 /// Returns `true` if the channel is empty.
470 ///
471 /// Note: Zero-capacity channels are always empty.
472 ///
473 /// # Examples
474 ///
475 /// ```
476 /// #![feature(mpmc_channel)]
477 ///
478 /// use std::sync::mpmc;
479 /// use std::thread;
480 ///
481 /// let (send, _recv) = mpmc::channel();
482 ///
483 /// let tx1 = send.clone();
484 /// let tx2 = send.clone();
485 ///
486 /// assert!(tx1.is_empty());
487 ///
488 /// let handle = thread::spawn(move || {
489 /// tx2.send(1u8).unwrap();
490 /// });
491 ///
492 /// handle.join().unwrap();
493 ///
494 /// assert!(!tx1.is_empty());
495 /// ```
496 #[unstable(feature = "mpmc_channel", issue = "126840")]
497 pub fn is_empty(&self) -> bool {
498 match &self.flavor {
499 SenderFlavor::Array(chan) => chan.is_empty(),
500 SenderFlavor::List(chan) => chan.is_empty(),
501 SenderFlavor::Zero(chan) => chan.is_empty(),
502 }
503 }
504
505 /// Returns `true` if the channel is full.
506 ///
507 /// Note: Zero-capacity channels are always full.
508 ///
509 /// # Examples
510 ///
511 /// ```
512 /// #![feature(mpmc_channel)]
513 ///
514 /// use std::sync::mpmc;
515 /// use std::thread;
516 ///
517 /// let (send, _recv) = mpmc::sync_channel(1);
518 ///
519 /// let (tx1, tx2) = (send.clone(), send.clone());
520 /// assert!(!tx1.is_full());
521 ///
522 /// let handle = thread::spawn(move || {
523 /// tx2.send(1u8).unwrap();
524 /// });
525 ///
526 /// handle.join().unwrap();
527 ///
528 /// assert!(tx1.is_full());
529 /// ```
530 #[unstable(feature = "mpmc_channel", issue = "126840")]
531 pub fn is_full(&self) -> bool {
532 match &self.flavor {
533 SenderFlavor::Array(chan) => chan.is_full(),
534 SenderFlavor::List(chan) => chan.is_full(),
535 SenderFlavor::Zero(chan) => chan.is_full(),
536 }
537 }
538
539 /// Returns the number of messages in the channel.
540 ///
541 /// # Examples
542 ///
543 /// ```
544 /// #![feature(mpmc_channel)]
545 ///
546 /// use std::sync::mpmc;
547 /// use std::thread;
548 ///
549 /// let (send, _recv) = mpmc::channel();
550 /// let (tx1, tx2) = (send.clone(), send.clone());
551 ///
552 /// assert_eq!(tx1.len(), 0);
553 ///
554 /// let handle = thread::spawn(move || {
555 /// tx2.send(1u8).unwrap();
556 /// });
557 ///
558 /// handle.join().unwrap();
559 ///
560 /// assert_eq!(tx1.len(), 1);
561 /// ```
562 #[unstable(feature = "mpmc_channel", issue = "126840")]
563 pub fn len(&self) -> usize {
564 match &self.flavor {
565 SenderFlavor::Array(chan) => chan.len(),
566 SenderFlavor::List(chan) => chan.len(),
567 SenderFlavor::Zero(chan) => chan.len(),
568 }
569 }
570
571 /// If the channel is bounded, returns its capacity.
572 ///
573 /// # Examples
574 ///
575 /// ```
576 /// #![feature(mpmc_channel)]
577 ///
578 /// use std::sync::mpmc;
579 /// use std::thread;
580 ///
581 /// let (send, _recv) = mpmc::sync_channel(3);
582 /// let (tx1, tx2) = (send.clone(), send.clone());
583 ///
584 /// assert_eq!(tx1.capacity(), Some(3));
585 ///
586 /// let handle = thread::spawn(move || {
587 /// tx2.send(1u8).unwrap();
588 /// });
589 ///
590 /// handle.join().unwrap();
591 ///
592 /// assert_eq!(tx1.capacity(), Some(3));
593 /// ```
594 #[unstable(feature = "mpmc_channel", issue = "126840")]
595 pub fn capacity(&self) -> Option<usize> {
596 match &self.flavor {
597 SenderFlavor::Array(chan) => chan.capacity(),
598 SenderFlavor::List(chan) => chan.capacity(),
599 SenderFlavor::Zero(chan) => chan.capacity(),
600 }
601 }
602
603 /// Returns `true` if senders belong to the same channel.
604 ///
605 /// # Examples
606 ///
607 /// ```
608 /// #![feature(mpmc_channel)]
609 ///
610 /// use std::sync::mpmc;
611 ///
612 /// let (tx1, _) = mpmc::channel::<i32>();
613 /// let (tx2, _) = mpmc::channel::<i32>();
614 ///
615 /// assert!(tx1.same_channel(&tx1));
616 /// assert!(!tx1.same_channel(&tx2));
617 /// ```
618 #[unstable(feature = "mpmc_channel", issue = "126840")]
619 pub fn same_channel(&self, other: &Sender<T>) -> bool {
620 match (&self.flavor, &other.flavor) {
621 (SenderFlavor::Array(a), SenderFlavor::Array(b)) => a == b,
622 (SenderFlavor::List(a), SenderFlavor::List(b)) => a == b,
623 (SenderFlavor::Zero(a), SenderFlavor::Zero(b)) => a == b,
624 _ => false,
625 }
626 }
627
628 /// Returns `true` if the channel is disconnected.
629 ///
630 /// Note that a return value of `false` does not guarantee the channel will
631 /// remain connected. The channel may be disconnected immediately after this method
632 /// returns, so a subsequent [`Sender::send`] may still fail with [`SendError`].
633 ///
634 /// # Examples
635 ///
636 /// ```
637 /// #![feature(mpmc_channel)]
638 ///
639 /// use std::sync::mpmc::channel;
640 ///
641 /// let (tx, rx) = channel::<i32>();
642 /// assert!(!tx.is_disconnected());
643 /// drop(rx);
644 /// assert!(tx.is_disconnected());
645 /// ```
646 #[unstable(feature = "mpmc_channel", issue = "126840")]
647 pub fn is_disconnected(&self) -> bool {
648 match &self.flavor {
649 SenderFlavor::Array(chan) => chan.is_disconnected(),
650 SenderFlavor::List(chan) => chan.is_disconnected(),
651 SenderFlavor::Zero(chan) => chan.is_disconnected(),
652 }
653 }
654}
655
656#[unstable(feature = "mpmc_channel", issue = "126840")]
657impl<T> Drop for Sender<T> {
658 fn drop(&mut self) {
659 unsafe {
660 match &self.flavor {
661 SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()),
662 SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
663 SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
664 }
665 }
666 }
667}
668
669#[unstable(feature = "mpmc_channel", issue = "126840")]
670impl<T> Clone for Sender<T> {
671 fn clone(&self) -> Self {
672 let flavor = match &self.flavor {
673 SenderFlavor::Array(chan) => SenderFlavor::Array(chan.acquire()),
674 SenderFlavor::List(chan) => SenderFlavor::List(chan.acquire()),
675 SenderFlavor::Zero(chan) => SenderFlavor::Zero(chan.acquire()),
676 };
677
678 Sender { flavor }
679 }
680}
681
682#[unstable(feature = "mpmc_channel", issue = "126840")]
683impl<T> fmt::Debug for Sender<T> {
684 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
685 f.debug_struct("Sender").finish_non_exhaustive()
686 }
687}
688
689/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
690/// Different threads can share this [`Receiver`] by cloning it.
691///
692/// Messages sent to the channel can be retrieved using [`recv`].
693///
694/// [`recv`]: Receiver::recv
695///
696/// # Examples
697///
698/// ```rust
699/// #![feature(mpmc_channel)]
700///
701/// use std::sync::mpmc::channel;
702/// use std::thread;
703/// use std::time::Duration;
704///
705/// let (send, recv) = channel();
706///
707/// let tx_thread = thread::spawn(move || {
708/// send.send("Hello world!").unwrap();
709/// thread::sleep(Duration::from_secs(2)); // block for two seconds
710/// send.send("Delayed for 2 seconds").unwrap();
711/// });
712///
713/// let (rx1, rx2) = (recv.clone(), recv.clone());
714/// let rx_thread_1 = thread::spawn(move || {
715/// println!("{}", rx1.recv().unwrap()); // Received immediately
716/// });
717/// let rx_thread_2 = thread::spawn(move || {
718/// println!("{}", rx2.recv().unwrap()); // Received after 2 seconds
719/// });
720///
721/// tx_thread.join().unwrap();
722/// rx_thread_1.join().unwrap();
723/// rx_thread_2.join().unwrap();
724/// ```
725#[unstable(feature = "mpmc_channel", issue = "126840")]
726#[cfg_attr(not(test), rustc_diagnostic_item = "MpmcReceiver")]
727pub struct Receiver<T> {
728 flavor: ReceiverFlavor<T>,
729}
730
731/// An iterator over messages on a [`Receiver`], created by [`iter`].
732///
733/// This iterator will block whenever [`next`] is called,
734/// waiting for a new message, and [`None`] will be returned
735/// when the corresponding channel has hung up.
736///
737/// [`iter`]: Receiver::iter
738/// [`next`]: Iterator::next
739///
740/// # Examples
741///
742/// ```rust
743/// #![feature(mpmc_channel)]
744///
745/// use std::sync::mpmc::channel;
746/// use std::thread;
747///
748/// let (send, recv) = channel();
749///
750/// thread::spawn(move || {
751/// send.send(1u8).unwrap();
752/// send.send(2u8).unwrap();
753/// send.send(3u8).unwrap();
754/// });
755///
756/// for x in recv.iter() {
757/// println!("Got: {x}");
758/// }
759/// ```
760#[unstable(feature = "mpmc_channel", issue = "126840")]
761#[derive(Debug)]
762pub struct Iter<'a, T: 'a> {
763 rx: &'a Receiver<T>,
764}
765
766/// An iterator that attempts to yield all pending values for a [`Receiver`],
767/// created by [`try_iter`].
768///
769/// [`None`] will be returned when there are no pending values remaining or
770/// if the corresponding channel has hung up.
771///
772/// This iterator will never block the caller in order to wait for data to
773/// become available. Instead, it will return [`None`].
774///
775/// [`try_iter`]: Receiver::try_iter
776///
777/// # Examples
778///
779/// ```rust
780/// #![feature(mpmc_channel)]
781///
782/// use std::sync::mpmc::channel;
783/// use std::thread;
784/// use std::time::Duration;
785///
786/// let (sender, receiver) = channel();
787///
788/// // Nothing is in the buffer yet
789/// assert!(receiver.try_iter().next().is_none());
790/// println!("Nothing in the buffer...");
791///
792/// thread::spawn(move || {
793/// sender.send(1).unwrap();
794/// sender.send(2).unwrap();
795/// sender.send(3).unwrap();
796/// });
797///
798/// println!("Going to sleep...");
799/// thread::sleep(Duration::from_secs(2)); // block for two seconds
800///
801/// for x in receiver.try_iter() {
802/// println!("Got: {x}");
803/// }
804/// ```
805#[unstable(feature = "mpmc_channel", issue = "126840")]
806#[derive(Debug)]
807pub struct TryIter<'a, T: 'a> {
808 rx: &'a Receiver<T>,
809}
810
811/// An owning iterator over messages on a [`Receiver`],
812/// created by [`into_iter`].
813///
814/// This iterator will block whenever [`next`]
815/// is called, waiting for a new message, and [`None`] will be
816/// returned if the corresponding channel has hung up.
817///
818/// [`into_iter`]: Receiver::into_iter
819/// [`next`]: Iterator::next
820///
821/// # Examples
822///
823/// ```rust
824/// #![feature(mpmc_channel)]
825///
826/// use std::sync::mpmc::channel;
827/// use std::thread;
828///
829/// let (send, recv) = channel();
830///
831/// thread::spawn(move || {
832/// send.send(1u8).unwrap();
833/// send.send(2u8).unwrap();
834/// send.send(3u8).unwrap();
835/// });
836///
837/// for x in recv.into_iter() {
838/// println!("Got: {x}");
839/// }
840/// ```
841#[unstable(feature = "mpmc_channel", issue = "126840")]
842#[derive(Debug)]
843pub struct IntoIter<T> {
844 rx: Receiver<T>,
845}
846
847#[unstable(feature = "mpmc_channel", issue = "126840")]
848impl<'a, T> Iterator for Iter<'a, T> {
849 type Item = T;
850
851 fn next(&mut self) -> Option<T> {
852 self.rx.recv().ok()
853 }
854}
855
856#[unstable(feature = "mpmc_channel", issue = "126840")]
857impl<'a, T> Iterator for TryIter<'a, T> {
858 type Item = T;
859
860 fn next(&mut self) -> Option<T> {
861 self.rx.try_recv().ok()
862 }
863}
864
865#[unstable(feature = "mpmc_channel", issue = "126840")]
866impl<'a, T> IntoIterator for &'a Receiver<T> {
867 type Item = T;
868 type IntoIter = Iter<'a, T>;
869
870 fn into_iter(self) -> Iter<'a, T> {
871 self.iter()
872 }
873}
874
875#[unstable(feature = "mpmc_channel", issue = "126840")]
876impl<T> Iterator for IntoIter<T> {
877 type Item = T;
878 fn next(&mut self) -> Option<T> {
879 self.rx.recv().ok()
880 }
881}
882
883#[unstable(feature = "mpmc_channel", issue = "126840")]
884impl<T> IntoIterator for Receiver<T> {
885 type Item = T;
886 type IntoIter = IntoIter<T>;
887
888 fn into_iter(self) -> IntoIter<T> {
889 IntoIter { rx: self }
890 }
891}
892
893/// Receiver flavors.
894enum ReceiverFlavor<T> {
895 /// Bounded channel based on a preallocated array.
896 Array(counter::Receiver<array::Channel<T>>),
897
898 /// Unbounded channel implemented as a linked list.
899 List(counter::Receiver<list::Channel<T>>),
900
901 /// Zero-capacity channel.
902 Zero(counter::Receiver<zero::Channel<T>>),
903}
904
905#[unstable(feature = "mpmc_channel", issue = "126840")]
906unsafe impl<T: Send> Send for Receiver<T> {}
907#[unstable(feature = "mpmc_channel", issue = "126840")]
908unsafe impl<T: Send> Sync for Receiver<T> {}
909
910#[unstable(feature = "mpmc_channel", issue = "126840")]
911impl<T> UnwindSafe for Receiver<T> {}
912#[unstable(feature = "mpmc_channel", issue = "126840")]
913impl<T> RefUnwindSafe for Receiver<T> {}
914
915impl<T> Receiver<T> {
916 /// Attempts to receive a message from the channel without blocking.
917 ///
918 /// This method will never block the caller in order to wait for data to
919 /// become available. Instead, this will always return immediately with a
920 /// possible option of pending data on the channel.
921 ///
922 /// If called on a zero-capacity channel, this method will receive a message only if there
923 /// happens to be a send operation on the other side of the channel at the same time.
924 ///
925 /// This is useful for a flavor of "optimistic check" before deciding to
926 /// block on a receiver.
927 ///
928 /// Compared with [`recv`], this function has two failure cases instead of one
929 /// (one for disconnection, one for an empty buffer).
930 ///
931 /// [`recv`]: Self::recv
932 ///
933 /// # Examples
934 ///
935 /// ```rust
936 /// #![feature(mpmc_channel)]
937 ///
938 /// use std::sync::mpmc::{Receiver, channel};
939 ///
940 /// let (_, receiver): (_, Receiver<i32>) = channel();
941 ///
942 /// assert!(receiver.try_recv().is_err());
943 /// ```
944 #[unstable(feature = "mpmc_channel", issue = "126840")]
945 pub fn try_recv(&self) -> Result<T, TryRecvError> {
946 match &self.flavor {
947 ReceiverFlavor::Array(chan) => chan.try_recv(),
948 ReceiverFlavor::List(chan) => chan.try_recv(),
949 ReceiverFlavor::Zero(chan) => chan.try_recv(),
950 }
951 }
952
953 /// Attempts to wait for a value on this receiver, returning an error if the
954 /// corresponding channel has hung up.
955 ///
956 /// This function will always block the current thread if there is no data
957 /// available and it's possible for more data to be sent (at least one sender
958 /// still exists). Once a message is sent to the corresponding [`Sender`],
959 /// this receiver will wake up and return that message.
960 ///
961 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
962 /// this call is blocking, this call will wake up and return [`Err`] to
963 /// indicate that no more messages can ever be received on this channel.
964 /// However, since channels are buffered, messages sent before the disconnect
965 /// will still be properly received.
966 ///
967 /// # Examples
968 ///
969 /// ```
970 /// #![feature(mpmc_channel)]
971 ///
972 /// use std::sync::mpmc;
973 /// use std::thread;
974 ///
975 /// let (send, recv) = mpmc::channel();
976 /// let handle = thread::spawn(move || {
977 /// send.send(1u8).unwrap();
978 /// });
979 ///
980 /// handle.join().unwrap();
981 ///
982 /// assert_eq!(Ok(1), recv.recv());
983 /// ```
984 ///
985 /// Buffering behavior:
986 ///
987 /// ```
988 /// #![feature(mpmc_channel)]
989 ///
990 /// use std::sync::mpmc;
991 /// use std::thread;
992 /// use std::sync::mpmc::RecvError;
993 ///
994 /// let (send, recv) = mpmc::channel();
995 /// let handle = thread::spawn(move || {
996 /// send.send(1u8).unwrap();
997 /// send.send(2).unwrap();
998 /// send.send(3).unwrap();
999 /// drop(send);
1000 /// });
1001 ///
1002 /// // wait for the thread to join so we ensure the sender is dropped
1003 /// handle.join().unwrap();
1004 ///
1005 /// assert_eq!(Ok(1), recv.recv());
1006 /// assert_eq!(Ok(2), recv.recv());
1007 /// assert_eq!(Ok(3), recv.recv());
1008 /// assert_eq!(Err(RecvError), recv.recv());
1009 /// ```
1010 #[unstable(feature = "mpmc_channel", issue = "126840")]
1011 pub fn recv(&self) -> Result<T, RecvError> {
1012 match &self.flavor {
1013 ReceiverFlavor::Array(chan) => chan.recv(None),
1014 ReceiverFlavor::List(chan) => chan.recv(None),
1015 ReceiverFlavor::Zero(chan) => chan.recv(None),
1016 }
1017 .map_err(|_| RecvError)
1018 }
1019
1020 /// Attempts to wait for a value on this receiver, returning an error if the
1021 /// corresponding channel has hung up, or if it waits more than `timeout`.
1022 ///
1023 /// This function will always block the current thread if there is no data
1024 /// available and it's possible for more data to be sent (at least one sender
1025 /// still exists). Once a message is sent to the corresponding [`Sender`],
1026 /// this receiver will wake up and return that message.
1027 ///
1028 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1029 /// this call is blocking, this call will wake up and return [`Err`] to
1030 /// indicate that no more messages can ever be received on this channel.
1031 /// However, since channels are buffered, messages sent before the disconnect
1032 /// will still be properly received.
1033 ///
1034 /// # Examples
1035 ///
1036 /// Successfully receiving value before encountering timeout:
1037 ///
1038 /// ```no_run
1039 /// #![feature(mpmc_channel)]
1040 ///
1041 /// use std::thread;
1042 /// use std::time::Duration;
1043 /// use std::sync::mpmc;
1044 ///
1045 /// let (send, recv) = mpmc::channel();
1046 ///
1047 /// thread::spawn(move || {
1048 /// send.send('a').unwrap();
1049 /// });
1050 ///
1051 /// assert_eq!(
1052 /// recv.recv_timeout(Duration::from_millis(400)),
1053 /// Ok('a')
1054 /// );
1055 /// ```
1056 ///
1057 /// Receiving an error upon reaching timeout:
1058 ///
1059 /// ```no_run
1060 /// #![feature(mpmc_channel)]
1061 ///
1062 /// use std::thread;
1063 /// use std::time::Duration;
1064 /// use std::sync::mpmc;
1065 ///
1066 /// let (send, recv) = mpmc::channel();
1067 ///
1068 /// thread::spawn(move || {
1069 /// thread::sleep(Duration::from_millis(800));
1070 /// send.send('a').unwrap();
1071 /// });
1072 ///
1073 /// assert_eq!(
1074 /// recv.recv_timeout(Duration::from_millis(400)),
1075 /// Err(mpmc::RecvTimeoutError::Timeout)
1076 /// );
1077 /// ```
1078 #[unstable(feature = "mpmc_channel", issue = "126840")]
1079 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
1080 match Instant::now().checked_add(timeout) {
1081 Some(deadline) => self.recv_deadline(deadline),
1082 // So far in the future that it's practically the same as waiting indefinitely.
1083 None => self.recv().map_err(RecvTimeoutError::from),
1084 }
1085 }
1086
1087 /// Attempts to wait for a value on this receiver, returning an error if the
1088 /// corresponding channel has hung up, or if `deadline` is reached.
1089 ///
1090 /// This function will always block the current thread if there is no data
1091 /// available and it's possible for more data to be sent. Once a message is
1092 /// sent to the corresponding [`Sender`], then this receiver will wake up
1093 /// and return that message.
1094 ///
1095 /// If the corresponding [`Sender`] has disconnected, or it disconnects while
1096 /// this call is blocking, this call will wake up and return [`Err`] to
1097 /// indicate that no more messages can ever be received on this channel.
1098 /// However, since channels are buffered, messages sent before the disconnect
1099 /// will still be properly received.
1100 ///
1101 /// # Examples
1102 ///
1103 /// Successfully receiving value before reaching deadline:
1104 ///
1105 /// ```no_run
1106 /// #![feature(mpmc_channel)]
1107 ///
1108 /// use std::thread;
1109 /// use std::time::{Duration, Instant};
1110 /// use std::sync::mpmc;
1111 ///
1112 /// let (send, recv) = mpmc::channel();
1113 ///
1114 /// thread::spawn(move || {
1115 /// send.send('a').unwrap();
1116 /// });
1117 ///
1118 /// assert_eq!(
1119 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1120 /// Ok('a')
1121 /// );
1122 /// ```
1123 ///
1124 /// Receiving an error upon reaching deadline:
1125 ///
1126 /// ```no_run
1127 /// #![feature(mpmc_channel)]
1128 ///
1129 /// use std::thread;
1130 /// use std::time::{Duration, Instant};
1131 /// use std::sync::mpmc;
1132 ///
1133 /// let (send, recv) = mpmc::channel();
1134 ///
1135 /// thread::spawn(move || {
1136 /// thread::sleep(Duration::from_millis(800));
1137 /// send.send('a').unwrap();
1138 /// });
1139 ///
1140 /// assert_eq!(
1141 /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
1142 /// Err(mpmc::RecvTimeoutError::Timeout)
1143 /// );
1144 /// ```
1145 #[unstable(feature = "mpmc_channel", issue = "126840")]
1146 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
1147 match &self.flavor {
1148 ReceiverFlavor::Array(chan) => chan.recv(Some(deadline)),
1149 ReceiverFlavor::List(chan) => chan.recv(Some(deadline)),
1150 ReceiverFlavor::Zero(chan) => chan.recv(Some(deadline)),
1151 }
1152 }
1153
1154 /// Returns an iterator that will attempt to yield all pending values.
1155 /// It will return `None` if there are no more pending values or if the
1156 /// channel has hung up. The iterator will never [`panic!`] or block the
1157 /// user by waiting for values.
1158 ///
1159 /// # Examples
1160 ///
1161 /// ```no_run
1162 /// #![feature(mpmc_channel)]
1163 ///
1164 /// use std::sync::mpmc::channel;
1165 /// use std::thread;
1166 /// use std::time::Duration;
1167 ///
1168 /// let (sender, receiver) = channel();
1169 ///
1170 /// // nothing is in the buffer yet
1171 /// assert!(receiver.try_iter().next().is_none());
1172 ///
1173 /// thread::spawn(move || {
1174 /// thread::sleep(Duration::from_secs(1));
1175 /// sender.send(1).unwrap();
1176 /// sender.send(2).unwrap();
1177 /// sender.send(3).unwrap();
1178 /// });
1179 ///
1180 /// // nothing is in the buffer yet
1181 /// assert!(receiver.try_iter().next().is_none());
1182 ///
1183 /// // block for two seconds
1184 /// thread::sleep(Duration::from_secs(2));
1185 ///
1186 /// let mut iter = receiver.try_iter();
1187 /// assert_eq!(iter.next(), Some(1));
1188 /// assert_eq!(iter.next(), Some(2));
1189 /// assert_eq!(iter.next(), Some(3));
1190 /// assert_eq!(iter.next(), None);
1191 /// ```
1192 #[unstable(feature = "mpmc_channel", issue = "126840")]
1193 pub fn try_iter(&self) -> TryIter<'_, T> {
1194 TryIter { rx: self }
1195 }
1196}
1197
1198impl<T> Receiver<T> {
1199 /// Returns `true` if the channel is empty.
1200 ///
1201 /// Note: Zero-capacity channels are always empty.
1202 ///
1203 /// # Examples
1204 ///
1205 /// ```
1206 /// #![feature(mpmc_channel)]
1207 ///
1208 /// use std::sync::mpmc;
1209 /// use std::thread;
1210 ///
1211 /// let (send, recv) = mpmc::channel();
1212 ///
1213 /// assert!(recv.is_empty());
1214 ///
1215 /// let handle = thread::spawn(move || {
1216 /// send.send(1u8).unwrap();
1217 /// });
1218 ///
1219 /// handle.join().unwrap();
1220 ///
1221 /// assert!(!recv.is_empty());
1222 /// ```
1223 #[unstable(feature = "mpmc_channel", issue = "126840")]
1224 pub fn is_empty(&self) -> bool {
1225 match &self.flavor {
1226 ReceiverFlavor::Array(chan) => chan.is_empty(),
1227 ReceiverFlavor::List(chan) => chan.is_empty(),
1228 ReceiverFlavor::Zero(chan) => chan.is_empty(),
1229 }
1230 }
1231
1232 /// Returns `true` if the channel is full.
1233 ///
1234 /// Note: Zero-capacity channels are always full.
1235 ///
1236 /// # Examples
1237 ///
1238 /// ```
1239 /// #![feature(mpmc_channel)]
1240 ///
1241 /// use std::sync::mpmc;
1242 /// use std::thread;
1243 ///
1244 /// let (send, recv) = mpmc::sync_channel(1);
1245 ///
1246 /// assert!(!recv.is_full());
1247 ///
1248 /// let handle = thread::spawn(move || {
1249 /// send.send(1u8).unwrap();
1250 /// });
1251 ///
1252 /// handle.join().unwrap();
1253 ///
1254 /// assert!(recv.is_full());
1255 /// ```
1256 #[unstable(feature = "mpmc_channel", issue = "126840")]
1257 pub fn is_full(&self) -> bool {
1258 match &self.flavor {
1259 ReceiverFlavor::Array(chan) => chan.is_full(),
1260 ReceiverFlavor::List(chan) => chan.is_full(),
1261 ReceiverFlavor::Zero(chan) => chan.is_full(),
1262 }
1263 }
1264
1265 /// Returns the number of messages in the channel.
1266 ///
1267 /// # Examples
1268 ///
1269 /// ```
1270 /// #![feature(mpmc_channel)]
1271 ///
1272 /// use std::sync::mpmc;
1273 /// use std::thread;
1274 ///
1275 /// let (send, recv) = mpmc::channel();
1276 ///
1277 /// assert_eq!(recv.len(), 0);
1278 ///
1279 /// let handle = thread::spawn(move || {
1280 /// send.send(1u8).unwrap();
1281 /// });
1282 ///
1283 /// handle.join().unwrap();
1284 ///
1285 /// assert_eq!(recv.len(), 1);
1286 /// ```
1287 #[unstable(feature = "mpmc_channel", issue = "126840")]
1288 pub fn len(&self) -> usize {
1289 match &self.flavor {
1290 ReceiverFlavor::Array(chan) => chan.len(),
1291 ReceiverFlavor::List(chan) => chan.len(),
1292 ReceiverFlavor::Zero(chan) => chan.len(),
1293 }
1294 }
1295
1296 /// If the channel is bounded, returns its capacity.
1297 ///
1298 /// # Examples
1299 ///
1300 /// ```
1301 /// #![feature(mpmc_channel)]
1302 ///
1303 /// use std::sync::mpmc;
1304 /// use std::thread;
1305 ///
1306 /// let (send, recv) = mpmc::sync_channel(3);
1307 ///
1308 /// assert_eq!(recv.capacity(), Some(3));
1309 ///
1310 /// let handle = thread::spawn(move || {
1311 /// send.send(1u8).unwrap();
1312 /// });
1313 ///
1314 /// handle.join().unwrap();
1315 ///
1316 /// assert_eq!(recv.capacity(), Some(3));
1317 /// ```
1318 #[unstable(feature = "mpmc_channel", issue = "126840")]
1319 pub fn capacity(&self) -> Option<usize> {
1320 match &self.flavor {
1321 ReceiverFlavor::Array(chan) => chan.capacity(),
1322 ReceiverFlavor::List(chan) => chan.capacity(),
1323 ReceiverFlavor::Zero(chan) => chan.capacity(),
1324 }
1325 }
1326
1327 /// Returns `true` if receivers belong to the same channel.
1328 ///
1329 /// # Examples
1330 ///
1331 /// ```
1332 /// #![feature(mpmc_channel)]
1333 ///
1334 /// use std::sync::mpmc;
1335 ///
1336 /// let (_, rx1) = mpmc::channel::<i32>();
1337 /// let (_, rx2) = mpmc::channel::<i32>();
1338 ///
1339 /// assert!(rx1.same_channel(&rx1));
1340 /// assert!(!rx1.same_channel(&rx2));
1341 /// ```
1342 #[unstable(feature = "mpmc_channel", issue = "126840")]
1343 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
1344 match (&self.flavor, &other.flavor) {
1345 (ReceiverFlavor::Array(a), ReceiverFlavor::Array(b)) => a == b,
1346 (ReceiverFlavor::List(a), ReceiverFlavor::List(b)) => a == b,
1347 (ReceiverFlavor::Zero(a), ReceiverFlavor::Zero(b)) => a == b,
1348 _ => false,
1349 }
1350 }
1351
1352 /// Returns an iterator that will block waiting for messages, but never
1353 /// [`panic!`]. It will return [`None`] when the channel has hung up.
1354 ///
1355 /// # Examples
1356 ///
1357 /// ```rust
1358 /// #![feature(mpmc_channel)]
1359 ///
1360 /// use std::sync::mpmc::channel;
1361 /// use std::thread;
1362 ///
1363 /// let (send, recv) = channel();
1364 ///
1365 /// thread::spawn(move || {
1366 /// send.send(1).unwrap();
1367 /// send.send(2).unwrap();
1368 /// send.send(3).unwrap();
1369 /// });
1370 ///
1371 /// let mut iter = recv.iter();
1372 /// assert_eq!(iter.next(), Some(1));
1373 /// assert_eq!(iter.next(), Some(2));
1374 /// assert_eq!(iter.next(), Some(3));
1375 /// assert_eq!(iter.next(), None);
1376 /// ```
1377 #[unstable(feature = "mpmc_channel", issue = "126840")]
1378 pub fn iter(&self) -> Iter<'_, T> {
1379 Iter { rx: self }
1380 }
1381
1382 /// Returns `true` if the channel is disconnected.
1383 ///
1384 /// Note that a return value of `false` does not guarantee the channel will
1385 /// remain connected. The channel may be disconnected immediately after this method
1386 /// returns, so a subsequent [`Receiver::recv`] may still fail with [`RecvError`].
1387 ///
1388 /// # Examples
1389 ///
1390 /// ```
1391 /// #![feature(mpmc_channel)]
1392 ///
1393 /// use std::sync::mpmc::channel;
1394 ///
1395 /// let (tx, rx) = channel::<i32>();
1396 /// assert!(!rx.is_disconnected());
1397 /// drop(tx);
1398 /// assert!(rx.is_disconnected());
1399 /// ```
1400 #[unstable(feature = "mpmc_channel", issue = "126840")]
1401 pub fn is_disconnected(&self) -> bool {
1402 match &self.flavor {
1403 ReceiverFlavor::Array(chan) => chan.is_disconnected(),
1404 ReceiverFlavor::List(chan) => chan.is_disconnected(),
1405 ReceiverFlavor::Zero(chan) => chan.is_disconnected(),
1406 }
1407 }
1408}
1409
1410#[unstable(feature = "mpmc_channel", issue = "126840")]
1411impl<T> Drop for Receiver<T> {
1412 fn drop(&mut self) {
1413 unsafe {
1414 match &self.flavor {
1415 ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()),
1416 ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1417 ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1418 }
1419 }
1420 }
1421}
1422
1423#[unstable(feature = "mpmc_channel", issue = "126840")]
1424impl<T> Clone for Receiver<T> {
1425 fn clone(&self) -> Self {
1426 let flavor = match &self.flavor {
1427 ReceiverFlavor::Array(chan) => ReceiverFlavor::Array(chan.acquire()),
1428 ReceiverFlavor::List(chan) => ReceiverFlavor::List(chan.acquire()),
1429 ReceiverFlavor::Zero(chan) => ReceiverFlavor::Zero(chan.acquire()),
1430 };
1431
1432 Receiver { flavor }
1433 }
1434}
1435
1436#[unstable(feature = "mpmc_channel", issue = "126840")]
1437impl<T> fmt::Debug for Receiver<T> {
1438 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1439 f.debug_struct("Receiver").finish_non_exhaustive()
1440 }
1441}
1442
1443#[cfg(test)]
1444mod tests;