Skip to main content

std/sync/
mpsc.rs

1//! Multi-producer, single-consumer FIFO queue communication primitives.
2//!
3//! This module provides message-based communication over channels, concretely
4//! defined among three types:
5//!
6//! * [`Sender`]
7//! * [`SyncSender`]
8//! * [`Receiver`]
9//!
10//! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
11//! senders are clone-able (multi-producer) such that many threads can send
12//! simultaneously to one receiver (single-consumer).
13//!
14//! These channels come in two flavors:
15//!
16//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
17//!    will return a `(Sender, Receiver)` tuple where all sends will be
18//!    **asynchronous** (they never block). The channel conceptually has an
19//!    infinite buffer.
20//!
21//! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22//!    return a `(SyncSender, Receiver)` tuple where the storage for pending
23//!    messages is a pre-allocated buffer of a fixed size. All sends will be
24//!    **synchronous** by blocking until there is buffer space available. Note
25//!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26//!    channel where each sender atomically hands off a message to a receiver.
27//!
28//! [`send`]: Sender::send
29//!
30//! ## Disconnection
31//!
32//! The send and receive operations on channels will all return a [`Result`]
33//! indicating whether the operation succeeded or not. An unsuccessful operation
34//! is normally indicative of the other half of a channel having "hung up" by
35//! being dropped in its corresponding thread.
36//!
37//! Once half of a channel has been deallocated, most operations can no longer
38//! continue to make progress, so [`Err`] will be returned. Many applications
39//! will continue to [`unwrap`] the results returned from this module,
40//! instigating a propagation of failure among threads if one unexpectedly dies.
41//!
42//! [`unwrap`]: Result::unwrap
43//!
44//! # Examples
45//!
46//! Simple usage:
47//!
48//! ```
49//! use std::thread;
50//! use std::sync::mpsc::channel;
51//!
52//! // Create a simple streaming channel
53//! let (tx, rx) = channel();
54//! thread::spawn(move || {
55//!     tx.send(10).unwrap();
56//! });
57//! assert_eq!(rx.recv().unwrap(), 10);
58//! ```
59//!
60//! Shared usage:
61//!
62//! ```
63//! use std::thread;
64//! use std::sync::mpsc::channel;
65//!
66//! // Create a shared channel that can be sent along from many threads
67//! // where tx is the sending half (tx for transmission), and rx is the receiving
68//! // half (rx for receiving).
69//! let (tx, rx) = channel();
70//! for i in 0..10 {
71//!     let tx = tx.clone();
72//!     thread::spawn(move || {
73//!         tx.send(i).unwrap();
74//!     });
75//! }
76//!
77//! for _ in 0..10 {
78//!     let j = rx.recv().unwrap();
79//!     assert!(0 <= j && j < 10);
80//! }
81//! ```
82//!
83//! Propagating panics:
84//!
85//! ```
86//! use std::sync::mpsc::channel;
87//!
88//! // The call to recv() will return an error because the channel has already
89//! // hung up (or been deallocated)
90//! let (tx, rx) = channel::<i32>();
91//! drop(tx);
92//! assert!(rx.recv().is_err());
93//! ```
94//!
95//! Synchronous channels:
96//!
97//! ```
98//! use std::thread;
99//! use std::sync::mpsc::sync_channel;
100//!
101//! let (tx, rx) = sync_channel::<i32>(0);
102//! thread::spawn(move || {
103//!     // This will wait for the parent thread to start receiving
104//!     tx.send(53).unwrap();
105//! });
106//! rx.recv().unwrap();
107//! ```
108//!
109//! Unbounded receive loop:
110//!
111//! ```
112//! use std::sync::mpsc::sync_channel;
113//! use std::thread;
114//!
115//! let (tx, rx) = sync_channel(3);
116//!
117//! for _ in 0..3 {
118//!     // It would be the same without thread and clone here
119//!     // since there will still be one `tx` left.
120//!     let tx = tx.clone();
121//!     // cloned tx dropped within thread
122//!     thread::spawn(move || tx.send("ok").unwrap());
123//! }
124//!
125//! // Drop the last sender to stop `rx` waiting for message.
126//! // The program will not complete if we comment this out.
127//! // **All** `tx` needs to be dropped for `rx` to have `Err`.
128//! drop(tx);
129//!
130//! // Unbounded receiver waiting for all senders to complete.
131//! while let Ok(msg) = rx.recv() {
132//!     println!("{msg}");
133//! }
134//!
135//! println!("completed");
136//! ```
137
138#![stable(feature = "rust1", since = "1.0.0")]
139
140// MPSC channels are built as a wrapper around MPMC channels, which
141// were ported from the `crossbeam-channel` crate. MPMC channels are
142// not exposed publicly, but if you are curious about the implementation,
143// that's where everything is.
144
145use crate::sync::mpmc;
146use crate::time::{Duration, Instant};
147use crate::{error, fmt};
148
149/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
150/// This half can only be owned by one thread.
151///
152/// Messages sent to the channel can be retrieved using [`recv`].
153///
154/// [`recv`]: Receiver::recv
155///
156/// # Examples
157///
158/// ```rust
159/// use std::sync::mpsc::channel;
160/// use std::thread;
161/// use std::time::Duration;
162///
163/// let (send, recv) = channel();
164///
165/// thread::spawn(move || {
166///     send.send("Hello world!").unwrap();
167///     thread::sleep(Duration::from_secs(2)); // block for two seconds
168///     send.send("Delayed for 2 seconds").unwrap();
169/// });
170///
171/// println!("{}", recv.recv().unwrap()); // Received immediately
172/// println!("Waiting...");
173/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
174/// ```
175#[stable(feature = "rust1", since = "1.0.0")]
176#[cfg_attr(not(test), rustc_diagnostic_item = "MpscReceiver")]
177pub struct Receiver<T> {
178    inner: mpmc::Receiver<T>,
179}
180
181// The receiver port can be sent from place to place, so long as it
182// is not used to receive non-sendable things.
183#[stable(feature = "rust1", since = "1.0.0")]
184unsafe impl<T: Send> Send for Receiver<T> {}
185
186#[stable(feature = "rust1", since = "1.0.0")]
187impl<T> !Sync for Receiver<T> {}
188
189/// An iterator over messages on a [`Receiver`], created by [`iter`].
190///
191/// This iterator will block whenever [`next`] is called,
192/// waiting for a new message, and [`None`] will be returned
193/// when the corresponding channel has hung up.
194///
195/// [`iter`]: Receiver::iter
196/// [`next`]: Iterator::next
197///
198/// # Examples
199///
200/// ```rust
201/// use std::sync::mpsc::channel;
202/// use std::thread;
203///
204/// let (send, recv) = channel();
205///
206/// thread::spawn(move || {
207///     send.send(1u8).unwrap();
208///     send.send(2u8).unwrap();
209///     send.send(3u8).unwrap();
210/// });
211///
212/// for x in recv.iter() {
213///     println!("Got: {x}");
214/// }
215/// ```
216#[stable(feature = "rust1", since = "1.0.0")]
217#[derive(Debug)]
218pub struct Iter<'a, T: 'a> {
219    rx: &'a Receiver<T>,
220}
221
222/// An iterator that attempts to yield all pending values for a [`Receiver`],
223/// created by [`try_iter`].
224///
225/// [`None`] will be returned when there are no pending values remaining or
226/// if the corresponding channel has hung up.
227///
228/// This iterator will never block the caller in order to wait for data to
229/// become available. Instead, it will return [`None`].
230///
231/// [`try_iter`]: Receiver::try_iter
232///
233/// # Examples
234///
235/// ```rust
236/// use std::sync::mpsc::channel;
237/// use std::thread;
238/// use std::time::Duration;
239///
240/// let (sender, receiver) = channel();
241///
242/// // Nothing is in the buffer yet
243/// assert!(receiver.try_iter().next().is_none());
244/// println!("Nothing in the buffer...");
245///
246/// thread::spawn(move || {
247///     sender.send(1).unwrap();
248///     sender.send(2).unwrap();
249///     sender.send(3).unwrap();
250/// });
251///
252/// println!("Going to sleep...");
253/// thread::sleep(Duration::from_secs(2)); // block for two seconds
254///
255/// for x in receiver.try_iter() {
256///     println!("Got: {x}");
257/// }
258/// ```
259#[stable(feature = "receiver_try_iter", since = "1.15.0")]
260#[derive(Debug)]
261pub struct TryIter<'a, T: 'a> {
262    rx: &'a Receiver<T>,
263}
264
265/// An owning iterator over messages on a [`Receiver`],
266/// created by [`into_iter`].
267///
268/// This iterator will block whenever [`next`]
269/// is called, waiting for a new message, and [`None`] will be
270/// returned if the corresponding channel has hung up.
271///
272/// [`into_iter`]: Receiver::into_iter
273/// [`next`]: Iterator::next
274///
275/// # Examples
276///
277/// ```rust
278/// use std::sync::mpsc::channel;
279/// use std::thread;
280///
281/// let (send, recv) = channel();
282///
283/// thread::spawn(move || {
284///     send.send(1u8).unwrap();
285///     send.send(2u8).unwrap();
286///     send.send(3u8).unwrap();
287/// });
288///
289/// for x in recv.into_iter() {
290///     println!("Got: {x}");
291/// }
292/// ```
293#[stable(feature = "receiver_into_iter", since = "1.1.0")]
294#[derive(Debug)]
295pub struct IntoIter<T> {
296    rx: Receiver<T>,
297}
298
299/// The sending-half of Rust's asynchronous [`channel`] type.
300///
301/// Messages can be sent through this channel with [`send`].
302///
303/// Note: all senders (the original and its clones) need to be dropped for the receiver
304/// to stop blocking to receive messages with [`Receiver::recv`].
305///
306/// [`send`]: Sender::send
307///
308/// # Examples
309///
310/// ```rust
311/// use std::sync::mpsc::channel;
312/// use std::thread;
313///
314/// let (sender, receiver) = channel();
315/// let sender2 = sender.clone();
316///
317/// // First thread owns sender
318/// thread::spawn(move || {
319///     sender.send(1).unwrap();
320/// });
321///
322/// // Second thread owns sender2
323/// thread::spawn(move || {
324///     sender2.send(2).unwrap();
325/// });
326///
327/// let msg = receiver.recv().unwrap();
328/// let msg2 = receiver.recv().unwrap();
329///
330/// assert_eq!(3, msg + msg2);
331/// ```
332#[stable(feature = "rust1", since = "1.0.0")]
333#[cfg_attr(not(test), rustc_diagnostic_item = "MpscSender")]
334pub struct Sender<T> {
335    inner: mpmc::Sender<T>,
336}
337
338// The send port can be sent from place to place, so long as it
339// is not used to send non-sendable things.
340#[stable(feature = "rust1", since = "1.0.0")]
341unsafe impl<T: Send> Send for Sender<T> {}
342
343#[stable(feature = "mpsc_sender_sync", since = "1.72.0")]
344unsafe impl<T: Send> Sync for Sender<T> {}
345
346/// The sending-half of Rust's synchronous [`sync_channel`] type.
347///
348/// Messages can be sent through this channel with [`send`] or [`try_send`].
349///
350/// [`send`] will block if there is no space in the internal buffer.
351///
352/// [`send`]: SyncSender::send
353/// [`try_send`]: SyncSender::try_send
354///
355/// # Examples
356///
357/// ```rust
358/// use std::sync::mpsc::sync_channel;
359/// use std::thread;
360///
361/// // Create a sync_channel with buffer size 2
362/// let (sync_sender, receiver) = sync_channel(2);
363/// let sync_sender2 = sync_sender.clone();
364///
365/// // First thread owns sync_sender
366/// thread::spawn(move || {
367///     sync_sender.send(1).unwrap();
368///     sync_sender.send(2).unwrap();
369/// });
370///
371/// // Second thread owns sync_sender2
372/// thread::spawn(move || {
373///     sync_sender2.send(3).unwrap();
374///     // thread will now block since the buffer is full
375///     println!("Thread unblocked!");
376/// });
377///
378/// let mut msg;
379///
380/// msg = receiver.recv().unwrap();
381/// println!("message {msg} received");
382///
383/// // "Thread unblocked!" will be printed now
384///
385/// msg = receiver.recv().unwrap();
386/// println!("message {msg} received");
387///
388/// msg = receiver.recv().unwrap();
389///
390/// println!("message {msg} received");
391/// ```
392#[stable(feature = "rust1", since = "1.0.0")]
393pub struct SyncSender<T> {
394    inner: mpmc::Sender<T>,
395}
396
397#[stable(feature = "rust1", since = "1.0.0")]
398unsafe impl<T: Send> Send for SyncSender<T> {}
399
400/// An error returned from the [`Sender::send`] or [`SyncSender::send`]
401/// function on **channel**s.
402///
403/// A **send** operation can only fail if the receiving end of a channel is
404/// disconnected, implying that the data could never be received. The error
405/// contains the data being sent as a payload so it can be recovered.
406#[stable(feature = "rust1", since = "1.0.0")]
407#[derive(PartialEq, Eq, Clone, Copy)]
408pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
409
410/// An error returned from the [`recv`] function on a [`Receiver`].
411///
412/// The [`recv`] operation can only fail if the sending half of a
413/// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further
414/// messages will ever be received.
415///
416/// [`recv`]: Receiver::recv
417#[derive(PartialEq, Eq, Clone, Copy, Debug)]
418#[stable(feature = "rust1", since = "1.0.0")]
419pub struct RecvError;
420
421/// This enumeration is the list of the possible reasons that [`try_recv`] could
422/// not return data when called. This can occur with both a [`channel`] and
423/// a [`sync_channel`].
424///
425/// [`try_recv`]: Receiver::try_recv
426#[derive(PartialEq, Eq, Clone, Copy, Debug)]
427#[stable(feature = "rust1", since = "1.0.0")]
428pub enum TryRecvError {
429    /// This **channel** is currently empty, but the **Sender**(s) have not yet
430    /// disconnected, so data may yet become available.
431    #[stable(feature = "rust1", since = "1.0.0")]
432    Empty,
433
434    /// The **channel**'s sending half has become disconnected, and there will
435    /// never be any more data received on it.
436    #[stable(feature = "rust1", since = "1.0.0")]
437    Disconnected,
438}
439
440/// This enumeration is the list of possible errors that made [`recv_timeout`]
441/// unable to return data when called. This can occur with both a [`channel`] and
442/// a [`sync_channel`].
443///
444/// [`recv_timeout`]: Receiver::recv_timeout
445#[derive(PartialEq, Eq, Clone, Copy, Debug)]
446#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
447pub enum RecvTimeoutError {
448    /// This **channel** is currently empty, but the **Sender**(s) have not yet
449    /// disconnected, so data may yet become available.
450    #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
451    Timeout,
452    /// The **channel**'s sending half has become disconnected, and there will
453    /// never be any more data received on it.
454    #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
455    Disconnected,
456}
457
458/// This enumeration is the list of the possible error outcomes for the
459/// [`try_send`] method.
460///
461/// [`try_send`]: SyncSender::try_send
462#[stable(feature = "rust1", since = "1.0.0")]
463#[derive(PartialEq, Eq, Clone, Copy)]
464pub enum TrySendError<T> {
465    /// The data could not be sent on the [`sync_channel`] because it would require that
466    /// the callee block to send the data.
467    ///
468    /// If this is a buffered channel, then the buffer is full at this time. If
469    /// this is not a buffered channel, then there is no [`Receiver`] available to
470    /// acquire the data.
471    #[stable(feature = "rust1", since = "1.0.0")]
472    Full(#[stable(feature = "rust1", since = "1.0.0")] T),
473
474    /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
475    /// sent. The data is returned back to the callee in this case.
476    #[stable(feature = "rust1", since = "1.0.0")]
477    Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
478}
479
480/// Creates a new asynchronous channel, returning the sender/receiver halves.
481///
482/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
483/// the same order as it was sent, and no [`send`] will block the calling thread
484/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
485/// block after its buffer limit is reached). [`recv`] will block until a message
486/// is available while there is at least one [`Sender`] alive (including clones).
487///
488/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
489/// only one [`Receiver`] is supported.
490///
491/// If the [`Receiver`] is disconnected while trying to [`send`] with the
492/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
493/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
494/// return a [`RecvError`].
495///
496/// [`send`]: Sender::send
497/// [`recv`]: Receiver::recv
498///
499/// # Examples
500///
501/// ```
502/// use std::sync::mpsc::channel;
503/// use std::thread;
504///
505/// let (sender, receiver) = channel();
506///
507/// // Spawn off an expensive computation
508/// thread::spawn(move || {
509/// #   fn expensive_computation() {}
510///     sender.send(expensive_computation()).unwrap();
511/// });
512///
513/// // Do some useful work for a while
514///
515/// // Let's see what that answer was
516/// println!("{:?}", receiver.recv().unwrap());
517/// ```
518#[must_use]
519#[stable(feature = "rust1", since = "1.0.0")]
520pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
521    let (tx, rx) = mpmc::channel();
522    (Sender { inner: tx }, Receiver { inner: rx })
523}
524
525/// Creates a new synchronous, bounded channel.
526///
527/// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
528/// in the same order as it was sent. Like asynchronous [`channel`]s, the
529/// [`Receiver`] will block until a message becomes available. `sync_channel`
530/// differs greatly in the semantics of the sender, however.
531///
532/// This channel has an internal buffer on which messages will be queued.
533/// `bound` specifies the buffer size. When the internal buffer becomes full,
534/// future sends will *block* waiting for the buffer to open up. Note that a
535/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
536/// where each [`send`] will not return until a [`recv`] is paired with it.
537///
538/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
539/// times, but only one [`Receiver`] is supported.
540///
541/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
542/// to [`send`] with the [`SyncSender`], the [`send`] method will return a
543/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
544/// to [`recv`], the [`recv`] method will return a [`RecvError`].
545///
546/// [`send`]: SyncSender::send
547/// [`recv`]: Receiver::recv
548///
549/// # Examples
550///
551/// ```
552/// use std::sync::mpsc::sync_channel;
553/// use std::thread;
554///
555/// let (sender, receiver) = sync_channel(1);
556///
557/// // this returns immediately
558/// sender.send(1).unwrap();
559///
560/// thread::spawn(move || {
561///     // this will block until the previous message has been received
562///     sender.send(2).unwrap();
563/// });
564///
565/// assert_eq!(receiver.recv().unwrap(), 1);
566/// assert_eq!(receiver.recv().unwrap(), 2);
567/// ```
568#[must_use]
569#[stable(feature = "rust1", since = "1.0.0")]
570pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
571    let (tx, rx) = mpmc::sync_channel(bound);
572    (SyncSender { inner: tx }, Receiver { inner: rx })
573}
574
575////////////////////////////////////////////////////////////////////////////////
576// Sender
577////////////////////////////////////////////////////////////////////////////////
578
579impl<T> Sender<T> {
580    /// Attempts to send a value on this channel, returning it back if it could
581    /// not be sent.
582    ///
583    /// A successful send occurs when it is determined that the other end of
584    /// the channel has not hung up already. An unsuccessful send would be one
585    /// where the corresponding receiver has already been deallocated. Note
586    /// that a return value of [`Err`] means that the data will never be
587    /// received, but a return value of [`Ok`] does *not* mean that the data
588    /// will be received. It is possible for the corresponding receiver to
589    /// hang up immediately after this function returns [`Ok`].
590    ///
591    /// This method will never block the current thread.
592    ///
593    /// # Examples
594    ///
595    /// ```
596    /// use std::sync::mpsc::channel;
597    ///
598    /// let (tx, rx) = channel();
599    ///
600    /// // This send is always successful
601    /// tx.send(1).unwrap();
602    ///
603    /// // This send will fail because the receiver is gone
604    /// drop(rx);
605    /// assert_eq!(tx.send(1).unwrap_err().0, 1);
606    /// ```
607    #[stable(feature = "rust1", since = "1.0.0")]
608    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
609        self.inner.send(t)
610    }
611
612    /// Returns `true` if the channel is disconnected.
613    ///
614    /// Note that a return value of `false` does not guarantee the channel will
615    /// remain connected. The channel may be disconnected immediately after this method
616    /// returns, so a subsequent [`Sender::send`] may still fail with [`SendError`].
617    ///
618    /// # Examples
619    ///
620    /// ```
621    /// #![feature(mpsc_is_disconnected)]
622    ///
623    /// use std::sync::mpsc::channel;
624    ///
625    /// let (tx, rx) = channel::<i32>();
626    /// assert!(!tx.is_disconnected());
627    /// drop(rx);
628    /// assert!(tx.is_disconnected());
629    /// ```
630    #[unstable(feature = "mpsc_is_disconnected", issue = "153668")]
631    pub fn is_disconnected(&self) -> bool {
632        self.inner.is_disconnected()
633    }
634}
635
636#[stable(feature = "rust1", since = "1.0.0")]
637impl<T> Clone for Sender<T> {
638    /// Clone a sender to send to other threads.
639    ///
640    /// Note, be aware of the lifetime of the sender because all senders
641    /// (including the original) need to be dropped in order for
642    /// [`Receiver::recv`] to stop blocking.
643    fn clone(&self) -> Sender<T> {
644        Sender { inner: self.inner.clone() }
645    }
646}
647
648#[stable(feature = "mpsc_debug", since = "1.8.0")]
649impl<T> fmt::Debug for Sender<T> {
650    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
651        f.debug_struct("Sender").finish_non_exhaustive()
652    }
653}
654
655////////////////////////////////////////////////////////////////////////////////
656// SyncSender
657////////////////////////////////////////////////////////////////////////////////
658
659impl<T> SyncSender<T> {
660    /// Sends a value on this synchronous channel.
661    ///
662    /// This function will *block* until space in the internal buffer becomes
663    /// available or a receiver is available to hand off the message to.
664    ///
665    /// Note that a successful send does *not* guarantee that the receiver will
666    /// ever see the data if there is a buffer on this channel. Items may be
667    /// enqueued in the internal buffer for the receiver to receive at a later
668    /// time. If the buffer size is 0, however, the channel becomes a rendezvous
669    /// channel and it guarantees that the receiver has indeed received
670    /// the data if this function returns success.
671    ///
672    /// This function will never panic, but it may return [`Err`] if the
673    /// [`Receiver`] has disconnected and is no longer able to receive
674    /// information.
675    ///
676    /// # Examples
677    ///
678    /// ```rust
679    /// use std::sync::mpsc::sync_channel;
680    /// use std::thread;
681    ///
682    /// // Create a rendezvous sync_channel with buffer size 0
683    /// let (sync_sender, receiver) = sync_channel(0);
684    ///
685    /// thread::spawn(move || {
686    ///    println!("sending message...");
687    ///    sync_sender.send(1).unwrap();
688    ///    // Thread is now blocked until the message is received
689    ///
690    ///    println!("...message received!");
691    /// });
692    ///
693    /// let msg = receiver.recv().unwrap();
694    /// assert_eq!(1, msg);
695    /// ```
696    #[stable(feature = "rust1", since = "1.0.0")]
697    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
698        self.inner.send(t)
699    }
700
701    /// Attempts to send a value on this channel without blocking.
702    ///
703    /// This method differs from [`send`] by returning immediately if the
704    /// channel's buffer is full or no receiver is waiting to acquire some
705    /// data. Compared with [`send`], this function has two failure cases
706    /// instead of one (one for disconnection, one for a full buffer).
707    ///
708    /// See [`send`] for notes about guarantees of whether the
709    /// receiver has received the data or not if this function is successful.
710    ///
711    /// [`send`]: Self::send
712    ///
713    /// # Examples
714    ///
715    /// ```rust
716    /// use std::sync::mpsc::sync_channel;
717    /// use std::thread;
718    ///
719    /// // Create a sync_channel with buffer size 1
720    /// let (sync_sender, receiver) = sync_channel(1);
721    /// let sync_sender2 = sync_sender.clone();
722    ///
723    /// // First thread owns sync_sender
724    /// let handle1 = thread::spawn(move || {
725    ///     sync_sender.send(1).unwrap();
726    ///     sync_sender.send(2).unwrap();
727    ///     // Thread blocked
728    /// });
729    ///
730    /// // Second thread owns sync_sender2
731    /// let handle2 = thread::spawn(move || {
732    ///     // This will return an error and send
733    ///     // no message if the buffer is full
734    ///     let _ = sync_sender2.try_send(3);
735    /// });
736    ///
737    /// let mut msg;
738    /// msg = receiver.recv().unwrap();
739    /// println!("message {msg} received");
740    ///
741    /// msg = receiver.recv().unwrap();
742    /// println!("message {msg} received");
743    ///
744    /// // Third message may have never been sent
745    /// match receiver.try_recv() {
746    ///     Ok(msg) => println!("message {msg} received"),
747    ///     Err(_) => println!("the third message was never sent"),
748    /// }
749    ///
750    /// // Wait for threads to complete
751    /// handle1.join().unwrap();
752    /// handle2.join().unwrap();
753    /// ```
754    #[stable(feature = "rust1", since = "1.0.0")]
755    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
756        self.inner.try_send(t)
757    }
758
759    // Attempts to send for a value on this receiver, returning an error if the
760    // corresponding channel has hung up, or if it waits more than `timeout`.
761    //
762    // This method is currently only used for tests.
763    #[unstable(issue = "none", feature = "std_internals")]
764    #[doc(hidden)]
765    pub fn send_timeout(&self, t: T, timeout: Duration) -> Result<(), mpmc::SendTimeoutError<T>> {
766        self.inner.send_timeout(t, timeout)
767    }
768}
769
770#[stable(feature = "rust1", since = "1.0.0")]
771impl<T> Clone for SyncSender<T> {
772    fn clone(&self) -> SyncSender<T> {
773        SyncSender { inner: self.inner.clone() }
774    }
775}
776
777#[stable(feature = "mpsc_debug", since = "1.8.0")]
778impl<T> fmt::Debug for SyncSender<T> {
779    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
780        f.debug_struct("SyncSender").finish_non_exhaustive()
781    }
782}
783
784////////////////////////////////////////////////////////////////////////////////
785// Receiver
786////////////////////////////////////////////////////////////////////////////////
787
788impl<T> Receiver<T> {
789    /// Attempts to return a pending value on this receiver without blocking.
790    ///
791    /// This method will never block the caller in order to wait for data to
792    /// become available. Instead, this will always return immediately with a
793    /// possible option of pending data on the channel.
794    ///
795    /// This is useful for a flavor of "optimistic check" before deciding to
796    /// block on a receiver.
797    ///
798    /// Compared with [`recv`], this function has two failure cases instead of one
799    /// (one for disconnection, one for an empty buffer).
800    ///
801    /// [`recv`]: Self::recv
802    ///
803    /// # Examples
804    ///
805    /// ```rust
806    /// use std::sync::mpsc::{Receiver, channel};
807    ///
808    /// let (_, receiver): (_, Receiver<i32>) = channel();
809    ///
810    /// assert!(receiver.try_recv().is_err());
811    /// ```
812    #[stable(feature = "rust1", since = "1.0.0")]
813    pub fn try_recv(&self) -> Result<T, TryRecvError> {
814        self.inner.try_recv()
815    }
816
817    /// Attempts to wait for a value on this receiver, returning an error if the
818    /// corresponding channel has hung up.
819    ///
820    /// This function will always block the current thread if there is no data
821    /// available and it's possible for more data to be sent (at least one sender
822    /// still exists). Once a message is sent to the corresponding [`Sender`]
823    /// (or [`SyncSender`]), this receiver will wake up and return that
824    /// message.
825    ///
826    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
827    /// this call is blocking, this call will wake up and return [`Err`] to
828    /// indicate that no more messages can ever be received on this channel.
829    /// However, since channels are buffered, messages sent before the disconnect
830    /// will still be properly received.
831    ///
832    /// # Examples
833    ///
834    /// ```
835    /// use std::sync::mpsc;
836    /// use std::thread;
837    ///
838    /// let (send, recv) = mpsc::channel();
839    /// let handle = thread::spawn(move || {
840    ///     send.send(1u8).unwrap();
841    /// });
842    ///
843    /// handle.join().unwrap();
844    ///
845    /// assert_eq!(Ok(1), recv.recv());
846    /// ```
847    ///
848    /// Buffering behavior:
849    ///
850    /// ```
851    /// use std::sync::mpsc;
852    /// use std::thread;
853    /// use std::sync::mpsc::RecvError;
854    ///
855    /// let (send, recv) = mpsc::channel();
856    /// let handle = thread::spawn(move || {
857    ///     send.send(1u8).unwrap();
858    ///     send.send(2).unwrap();
859    ///     send.send(3).unwrap();
860    ///     drop(send);
861    /// });
862    ///
863    /// // wait for the thread to join so we ensure the sender is dropped
864    /// handle.join().unwrap();
865    ///
866    /// assert_eq!(Ok(1), recv.recv());
867    /// assert_eq!(Ok(2), recv.recv());
868    /// assert_eq!(Ok(3), recv.recv());
869    /// assert_eq!(Err(RecvError), recv.recv());
870    /// ```
871    #[stable(feature = "rust1", since = "1.0.0")]
872    pub fn recv(&self) -> Result<T, RecvError> {
873        self.inner.recv()
874    }
875
876    /// Attempts to wait for a value on this receiver, returning an error if the
877    /// corresponding channel has hung up, or if it waits more than `timeout`.
878    ///
879    /// This function will always block the current thread if there is no data
880    /// available and it's possible for more data to be sent (at least one sender
881    /// still exists). Once a message is sent to the corresponding [`Sender`]
882    /// (or [`SyncSender`]), this receiver will wake up and return that
883    /// message.
884    ///
885    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
886    /// this call is blocking, this call will wake up and return [`Err`] to
887    /// indicate that no more messages can ever be received on this channel.
888    /// However, since channels are buffered, messages sent before the disconnect
889    /// will still be properly received.
890    ///
891    /// # Examples
892    ///
893    /// Successfully receiving value before encountering timeout:
894    ///
895    /// ```no_run
896    /// use std::thread;
897    /// use std::time::Duration;
898    /// use std::sync::mpsc;
899    ///
900    /// let (send, recv) = mpsc::channel();
901    ///
902    /// thread::spawn(move || {
903    ///     send.send('a').unwrap();
904    /// });
905    ///
906    /// assert_eq!(
907    ///     recv.recv_timeout(Duration::from_millis(400)),
908    ///     Ok('a')
909    /// );
910    /// ```
911    ///
912    /// Receiving an error upon reaching timeout:
913    ///
914    /// ```no_run
915    /// use std::thread;
916    /// use std::time::Duration;
917    /// use std::sync::mpsc;
918    ///
919    /// let (send, recv) = mpsc::channel();
920    ///
921    /// thread::spawn(move || {
922    ///     thread::sleep(Duration::from_millis(800));
923    ///     send.send('a').unwrap();
924    /// });
925    ///
926    /// assert_eq!(
927    ///     recv.recv_timeout(Duration::from_millis(400)),
928    ///     Err(mpsc::RecvTimeoutError::Timeout)
929    /// );
930    /// ```
931    #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
932    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
933        self.inner.recv_timeout(timeout)
934    }
935
936    /// Attempts to wait for a value on this receiver, returning an error if the
937    /// corresponding channel has hung up, or if `deadline` is reached.
938    ///
939    /// This function will always block the current thread if there is no data
940    /// available and it's possible for more data to be sent. Once a message is
941    /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
942    /// receiver will wake up and return that message.
943    ///
944    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
945    /// this call is blocking, this call will wake up and return [`Err`] to
946    /// indicate that no more messages can ever be received on this channel.
947    /// However, since channels are buffered, messages sent before the disconnect
948    /// will still be properly received.
949    ///
950    /// # Examples
951    ///
952    /// Successfully receiving value before reaching deadline:
953    ///
954    /// ```no_run
955    /// #![feature(deadline_api)]
956    /// use std::thread;
957    /// use std::time::{Duration, Instant};
958    /// use std::sync::mpsc;
959    ///
960    /// let (send, recv) = mpsc::channel();
961    ///
962    /// thread::spawn(move || {
963    ///     send.send('a').unwrap();
964    /// });
965    ///
966    /// assert_eq!(
967    ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
968    ///     Ok('a')
969    /// );
970    /// ```
971    ///
972    /// Receiving an error upon reaching deadline:
973    ///
974    /// ```no_run
975    /// #![feature(deadline_api)]
976    /// use std::thread;
977    /// use std::time::{Duration, Instant};
978    /// use std::sync::mpsc;
979    ///
980    /// let (send, recv) = mpsc::channel();
981    ///
982    /// thread::spawn(move || {
983    ///     thread::sleep(Duration::from_millis(800));
984    ///     send.send('a').unwrap();
985    /// });
986    ///
987    /// assert_eq!(
988    ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
989    ///     Err(mpsc::RecvTimeoutError::Timeout)
990    /// );
991    /// ```
992    #[unstable(feature = "deadline_api", issue = "46316")]
993    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
994        self.inner.recv_deadline(deadline)
995    }
996
997    /// Returns an iterator that will block waiting for messages, but never
998    /// [`panic!`]. It will return [`None`] when the channel has hung up.
999    ///
1000    /// # Examples
1001    ///
1002    /// ```rust
1003    /// use std::sync::mpsc::channel;
1004    /// use std::thread;
1005    ///
1006    /// let (send, recv) = channel();
1007    ///
1008    /// thread::spawn(move || {
1009    ///     send.send(1).unwrap();
1010    ///     send.send(2).unwrap();
1011    ///     send.send(3).unwrap();
1012    /// });
1013    ///
1014    /// let mut iter = recv.iter();
1015    /// assert_eq!(iter.next(), Some(1));
1016    /// assert_eq!(iter.next(), Some(2));
1017    /// assert_eq!(iter.next(), Some(3));
1018    /// assert_eq!(iter.next(), None);
1019    /// ```
1020    #[stable(feature = "rust1", since = "1.0.0")]
1021    pub fn iter(&self) -> Iter<'_, T> {
1022        Iter { rx: self }
1023    }
1024
1025    /// Returns an iterator that will attempt to yield all pending values.
1026    /// It will return `None` if there are no more pending values or if the
1027    /// channel has hung up. The iterator will never [`panic!`] or block the
1028    /// user by waiting for values.
1029    ///
1030    /// # Examples
1031    ///
1032    /// ```no_run
1033    /// use std::sync::mpsc::channel;
1034    /// use std::thread;
1035    /// use std::time::Duration;
1036    ///
1037    /// let (sender, receiver) = channel();
1038    ///
1039    /// // nothing is in the buffer yet
1040    /// assert!(receiver.try_iter().next().is_none());
1041    ///
1042    /// thread::spawn(move || {
1043    ///     thread::sleep(Duration::from_secs(1));
1044    ///     sender.send(1).unwrap();
1045    ///     sender.send(2).unwrap();
1046    ///     sender.send(3).unwrap();
1047    /// });
1048    ///
1049    /// // nothing is in the buffer yet
1050    /// assert!(receiver.try_iter().next().is_none());
1051    ///
1052    /// // block for two seconds
1053    /// thread::sleep(Duration::from_secs(2));
1054    ///
1055    /// let mut iter = receiver.try_iter();
1056    /// assert_eq!(iter.next(), Some(1));
1057    /// assert_eq!(iter.next(), Some(2));
1058    /// assert_eq!(iter.next(), Some(3));
1059    /// assert_eq!(iter.next(), None);
1060    /// ```
1061    #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1062    pub fn try_iter(&self) -> TryIter<'_, T> {
1063        TryIter { rx: self }
1064    }
1065
1066    /// Returns `true` if the channel is disconnected.
1067    ///
1068    /// Note that a return value of `false` does not guarantee the channel will
1069    /// remain connected. The channel may be disconnected immediately after this method
1070    /// returns, so a subsequent [`Receiver::recv`] may still fail with [`RecvError`].
1071    ///
1072    /// # Examples
1073    ///
1074    /// ```
1075    /// #![feature(mpsc_is_disconnected)]
1076    ///
1077    /// use std::sync::mpsc::channel;
1078    ///
1079    /// let (tx, rx) = channel::<i32>();
1080    /// assert!(!rx.is_disconnected());
1081    /// drop(tx);
1082    /// assert!(rx.is_disconnected());
1083    /// ```
1084    #[unstable(feature = "mpsc_is_disconnected", issue = "153668")]
1085    pub fn is_disconnected(&self) -> bool {
1086        self.inner.is_disconnected()
1087    }
1088}
1089
1090#[stable(feature = "rust1", since = "1.0.0")]
1091impl<'a, T> Iterator for Iter<'a, T> {
1092    type Item = T;
1093
1094    fn next(&mut self) -> Option<T> {
1095        self.rx.recv().ok()
1096    }
1097}
1098
1099#[stable(feature = "receiver_try_iter", since = "1.15.0")]
1100impl<'a, T> Iterator for TryIter<'a, T> {
1101    type Item = T;
1102
1103    fn next(&mut self) -> Option<T> {
1104        self.rx.try_recv().ok()
1105    }
1106}
1107
1108#[stable(feature = "receiver_into_iter", since = "1.1.0")]
1109impl<'a, T> IntoIterator for &'a Receiver<T> {
1110    type Item = T;
1111    type IntoIter = Iter<'a, T>;
1112
1113    fn into_iter(self) -> Iter<'a, T> {
1114        self.iter()
1115    }
1116}
1117
1118#[stable(feature = "receiver_into_iter", since = "1.1.0")]
1119impl<T> Iterator for IntoIter<T> {
1120    type Item = T;
1121    fn next(&mut self) -> Option<T> {
1122        self.rx.recv().ok()
1123    }
1124}
1125
1126#[stable(feature = "receiver_into_iter", since = "1.1.0")]
1127impl<T> IntoIterator for Receiver<T> {
1128    type Item = T;
1129    type IntoIter = IntoIter<T>;
1130
1131    fn into_iter(self) -> IntoIter<T> {
1132        IntoIter { rx: self }
1133    }
1134}
1135
1136#[stable(feature = "mpsc_debug", since = "1.8.0")]
1137impl<T> fmt::Debug for Receiver<T> {
1138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1139        f.debug_struct("Receiver").finish_non_exhaustive()
1140    }
1141}
1142
1143#[stable(feature = "rust1", since = "1.0.0")]
1144impl<T> fmt::Debug for SendError<T> {
1145    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1146        f.debug_struct("SendError").finish_non_exhaustive()
1147    }
1148}
1149
1150#[stable(feature = "rust1", since = "1.0.0")]
1151impl<T> fmt::Display for SendError<T> {
1152    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1153        "sending on a closed channel".fmt(f)
1154    }
1155}
1156
1157#[stable(feature = "rust1", since = "1.0.0")]
1158impl<T> error::Error for SendError<T> {}
1159
1160#[stable(feature = "rust1", since = "1.0.0")]
1161impl<T> fmt::Debug for TrySendError<T> {
1162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1163        match *self {
1164            TrySendError::Full(..) => f.debug_tuple("TrySendError::Full").finish_non_exhaustive(),
1165            TrySendError::Disconnected(..) => {
1166                f.debug_tuple("TrySendError::Disconnected").finish_non_exhaustive()
1167            }
1168        }
1169    }
1170}
1171
1172#[stable(feature = "rust1", since = "1.0.0")]
1173impl<T> fmt::Display for TrySendError<T> {
1174    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1175        match *self {
1176            TrySendError::Full(..) => "sending on a full channel".fmt(f),
1177            TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
1178        }
1179    }
1180}
1181
1182#[stable(feature = "rust1", since = "1.0.0")]
1183impl<T> error::Error for TrySendError<T> {}
1184
1185#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1186impl<T> From<SendError<T>> for TrySendError<T> {
1187    /// Converts a `SendError<T>` into a `TrySendError<T>`.
1188    ///
1189    /// This conversion always returns a `TrySendError::Disconnected` containing the data in the `SendError<T>`.
1190    ///
1191    /// No data is allocated on the heap.
1192    fn from(err: SendError<T>) -> TrySendError<T> {
1193        match err {
1194            SendError(t) => TrySendError::Disconnected(t),
1195        }
1196    }
1197}
1198
1199#[stable(feature = "rust1", since = "1.0.0")]
1200impl fmt::Display for RecvError {
1201    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1202        "receiving on a closed channel".fmt(f)
1203    }
1204}
1205
1206#[stable(feature = "rust1", since = "1.0.0")]
1207impl error::Error for RecvError {}
1208
1209#[stable(feature = "rust1", since = "1.0.0")]
1210impl fmt::Display for TryRecvError {
1211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1212        match *self {
1213            TryRecvError::Empty => "receiving on an empty channel".fmt(f),
1214            TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
1215        }
1216    }
1217}
1218
1219#[stable(feature = "rust1", since = "1.0.0")]
1220impl error::Error for TryRecvError {}
1221
1222#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1223impl From<RecvError> for TryRecvError {
1224    /// Converts a `RecvError` into a `TryRecvError`.
1225    ///
1226    /// This conversion always returns `TryRecvError::Disconnected`.
1227    ///
1228    /// No data is allocated on the heap.
1229    fn from(err: RecvError) -> TryRecvError {
1230        match err {
1231            RecvError => TryRecvError::Disconnected,
1232        }
1233    }
1234}
1235
1236#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1237impl fmt::Display for RecvTimeoutError {
1238    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1239        match *self {
1240            RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
1241            RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
1242        }
1243    }
1244}
1245
1246#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1247impl error::Error for RecvTimeoutError {}
1248
1249#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1250impl From<RecvError> for RecvTimeoutError {
1251    /// Converts a `RecvError` into a `RecvTimeoutError`.
1252    ///
1253    /// This conversion always returns `RecvTimeoutError::Disconnected`.
1254    ///
1255    /// No data is allocated on the heap.
1256    fn from(err: RecvError) -> RecvTimeoutError {
1257        match err {
1258            RecvError => RecvTimeoutError::Disconnected,
1259        }
1260    }
1261}