std/sync/
mpsc.rs

1//! Multi-producer, single-consumer FIFO queue communication primitives.
2//!
3//! This module provides message-based communication over channels, concretely
4//! defined among three types:
5//!
6//! * [`Sender`]
7//! * [`SyncSender`]
8//! * [`Receiver`]
9//!
10//! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
11//! senders are clone-able (multi-producer) such that many threads can send
12//! simultaneously to one receiver (single-consumer).
13//!
14//! These channels come in two flavors:
15//!
16//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
17//!    will return a `(Sender, Receiver)` tuple where all sends will be
18//!    **asynchronous** (they never block). The channel conceptually has an
19//!    infinite buffer.
20//!
21//! 2. A synchronous, bounded channel. The [`sync_channel`] function will
22//!    return a `(SyncSender, Receiver)` tuple where the storage for pending
23//!    messages is a pre-allocated buffer of a fixed size. All sends will be
24//!    **synchronous** by blocking until there is buffer space available. Note
25//!    that a bound of 0 is allowed, causing the channel to become a "rendezvous"
26//!    channel where each sender atomically hands off a message to a receiver.
27//!
28//! [`send`]: Sender::send
29//!
30//! ## Disconnection
31//!
32//! The send and receive operations on channels will all return a [`Result`]
33//! indicating whether the operation succeeded or not. An unsuccessful operation
34//! is normally indicative of the other half of a channel having "hung up" by
35//! being dropped in its corresponding thread.
36//!
37//! Once half of a channel has been deallocated, most operations can no longer
38//! continue to make progress, so [`Err`] will be returned. Many applications
39//! will continue to [`unwrap`] the results returned from this module,
40//! instigating a propagation of failure among threads if one unexpectedly dies.
41//!
42//! [`unwrap`]: Result::unwrap
43//!
44//! # Examples
45//!
46//! Simple usage:
47//!
48//! ```
49//! use std::thread;
50//! use std::sync::mpsc::channel;
51//!
52//! // Create a simple streaming channel
53//! let (tx, rx) = channel();
54//! thread::spawn(move || {
55//!     tx.send(10).unwrap();
56//! });
57//! assert_eq!(rx.recv().unwrap(), 10);
58//! ```
59//!
60//! Shared usage:
61//!
62//! ```
63//! use std::thread;
64//! use std::sync::mpsc::channel;
65//!
66//! // Create a shared channel that can be sent along from many threads
67//! // where tx is the sending half (tx for transmission), and rx is the receiving
68//! // half (rx for receiving).
69//! let (tx, rx) = channel();
70//! for i in 0..10 {
71//!     let tx = tx.clone();
72//!     thread::spawn(move || {
73//!         tx.send(i).unwrap();
74//!     });
75//! }
76//!
77//! for _ in 0..10 {
78//!     let j = rx.recv().unwrap();
79//!     assert!(0 <= j && j < 10);
80//! }
81//! ```
82//!
83//! Propagating panics:
84//!
85//! ```
86//! use std::sync::mpsc::channel;
87//!
88//! // The call to recv() will return an error because the channel has already
89//! // hung up (or been deallocated)
90//! let (tx, rx) = channel::<i32>();
91//! drop(tx);
92//! assert!(rx.recv().is_err());
93//! ```
94//!
95//! Synchronous channels:
96//!
97//! ```
98//! use std::thread;
99//! use std::sync::mpsc::sync_channel;
100//!
101//! let (tx, rx) = sync_channel::<i32>(0);
102//! thread::spawn(move || {
103//!     // This will wait for the parent thread to start receiving
104//!     tx.send(53).unwrap();
105//! });
106//! rx.recv().unwrap();
107//! ```
108//!
109//! Unbounded receive loop:
110//!
111//! ```
112//! use std::sync::mpsc::sync_channel;
113//! use std::thread;
114//!
115//! let (tx, rx) = sync_channel(3);
116//!
117//! for _ in 0..3 {
118//!     // It would be the same without thread and clone here
119//!     // since there will still be one `tx` left.
120//!     let tx = tx.clone();
121//!     // cloned tx dropped within thread
122//!     thread::spawn(move || tx.send("ok").unwrap());
123//! }
124//!
125//! // Drop the last sender to stop `rx` waiting for message.
126//! // The program will not complete if we comment this out.
127//! // **All** `tx` needs to be dropped for `rx` to have `Err`.
128//! drop(tx);
129//!
130//! // Unbounded receiver waiting for all senders to complete.
131//! while let Ok(msg) = rx.recv() {
132//!     println!("{msg}");
133//! }
134//!
135//! println!("completed");
136//! ```
137
138#![stable(feature = "rust1", since = "1.0.0")]
139
140// MPSC channels are built as a wrapper around MPMC channels, which
141// were ported from the `crossbeam-channel` crate. MPMC channels are
142// not exposed publicly, but if you are curious about the implementation,
143// that's where everything is.
144
145use crate::sync::mpmc;
146use crate::time::{Duration, Instant};
147use crate::{error, fmt};
148
149/// The receiving half of Rust's [`channel`] (or [`sync_channel`]) type.
150/// This half can only be owned by one thread.
151///
152/// Messages sent to the channel can be retrieved using [`recv`].
153///
154/// [`recv`]: Receiver::recv
155///
156/// # Examples
157///
158/// ```rust
159/// use std::sync::mpsc::channel;
160/// use std::thread;
161/// use std::time::Duration;
162///
163/// let (send, recv) = channel();
164///
165/// thread::spawn(move || {
166///     send.send("Hello world!").unwrap();
167///     thread::sleep(Duration::from_secs(2)); // block for two seconds
168///     send.send("Delayed for 2 seconds").unwrap();
169/// });
170///
171/// println!("{}", recv.recv().unwrap()); // Received immediately
172/// println!("Waiting...");
173/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
174/// ```
175#[stable(feature = "rust1", since = "1.0.0")]
176#[cfg_attr(not(test), rustc_diagnostic_item = "Receiver")]
177pub struct Receiver<T> {
178    inner: mpmc::Receiver<T>,
179}
180
181// The receiver port can be sent from place to place, so long as it
182// is not used to receive non-sendable things.
183#[stable(feature = "rust1", since = "1.0.0")]
184unsafe impl<T: Send> Send for Receiver<T> {}
185
186#[stable(feature = "rust1", since = "1.0.0")]
187impl<T> !Sync for Receiver<T> {}
188
189/// An iterator over messages on a [`Receiver`], created by [`iter`].
190///
191/// This iterator will block whenever [`next`] is called,
192/// waiting for a new message, and [`None`] will be returned
193/// when the corresponding channel has hung up.
194///
195/// [`iter`]: Receiver::iter
196/// [`next`]: Iterator::next
197///
198/// # Examples
199///
200/// ```rust
201/// use std::sync::mpsc::channel;
202/// use std::thread;
203///
204/// let (send, recv) = channel();
205///
206/// thread::spawn(move || {
207///     send.send(1u8).unwrap();
208///     send.send(2u8).unwrap();
209///     send.send(3u8).unwrap();
210/// });
211///
212/// for x in recv.iter() {
213///     println!("Got: {x}");
214/// }
215/// ```
216#[stable(feature = "rust1", since = "1.0.0")]
217#[derive(Debug)]
218pub struct Iter<'a, T: 'a> {
219    rx: &'a Receiver<T>,
220}
221
222/// An iterator that attempts to yield all pending values for a [`Receiver`],
223/// created by [`try_iter`].
224///
225/// [`None`] will be returned when there are no pending values remaining or
226/// if the corresponding channel has hung up.
227///
228/// This iterator will never block the caller in order to wait for data to
229/// become available. Instead, it will return [`None`].
230///
231/// [`try_iter`]: Receiver::try_iter
232///
233/// # Examples
234///
235/// ```rust
236/// use std::sync::mpsc::channel;
237/// use std::thread;
238/// use std::time::Duration;
239///
240/// let (sender, receiver) = channel();
241///
242/// // Nothing is in the buffer yet
243/// assert!(receiver.try_iter().next().is_none());
244/// println!("Nothing in the buffer...");
245///
246/// thread::spawn(move || {
247///     sender.send(1).unwrap();
248///     sender.send(2).unwrap();
249///     sender.send(3).unwrap();
250/// });
251///
252/// println!("Going to sleep...");
253/// thread::sleep(Duration::from_secs(2)); // block for two seconds
254///
255/// for x in receiver.try_iter() {
256///     println!("Got: {x}");
257/// }
258/// ```
259#[stable(feature = "receiver_try_iter", since = "1.15.0")]
260#[derive(Debug)]
261pub struct TryIter<'a, T: 'a> {
262    rx: &'a Receiver<T>,
263}
264
265/// An owning iterator over messages on a [`Receiver`],
266/// created by [`into_iter`].
267///
268/// This iterator will block whenever [`next`]
269/// is called, waiting for a new message, and [`None`] will be
270/// returned if the corresponding channel has hung up.
271///
272/// [`into_iter`]: Receiver::into_iter
273/// [`next`]: Iterator::next
274///
275/// # Examples
276///
277/// ```rust
278/// use std::sync::mpsc::channel;
279/// use std::thread;
280///
281/// let (send, recv) = channel();
282///
283/// thread::spawn(move || {
284///     send.send(1u8).unwrap();
285///     send.send(2u8).unwrap();
286///     send.send(3u8).unwrap();
287/// });
288///
289/// for x in recv.into_iter() {
290///     println!("Got: {x}");
291/// }
292/// ```
293#[stable(feature = "receiver_into_iter", since = "1.1.0")]
294#[derive(Debug)]
295pub struct IntoIter<T> {
296    rx: Receiver<T>,
297}
298
299/// The sending-half of Rust's asynchronous [`channel`] type.
300///
301/// Messages can be sent through this channel with [`send`].
302///
303/// Note: all senders (the original and its clones) need to be dropped for the receiver
304/// to stop blocking to receive messages with [`Receiver::recv`].
305///
306/// [`send`]: Sender::send
307///
308/// # Examples
309///
310/// ```rust
311/// use std::sync::mpsc::channel;
312/// use std::thread;
313///
314/// let (sender, receiver) = channel();
315/// let sender2 = sender.clone();
316///
317/// // First thread owns sender
318/// thread::spawn(move || {
319///     sender.send(1).unwrap();
320/// });
321///
322/// // Second thread owns sender2
323/// thread::spawn(move || {
324///     sender2.send(2).unwrap();
325/// });
326///
327/// let msg = receiver.recv().unwrap();
328/// let msg2 = receiver.recv().unwrap();
329///
330/// assert_eq!(3, msg + msg2);
331/// ```
332#[stable(feature = "rust1", since = "1.0.0")]
333pub struct Sender<T> {
334    inner: mpmc::Sender<T>,
335}
336
337// The send port can be sent from place to place, so long as it
338// is not used to send non-sendable things.
339#[stable(feature = "rust1", since = "1.0.0")]
340unsafe impl<T: Send> Send for Sender<T> {}
341
342#[stable(feature = "mpsc_sender_sync", since = "1.72.0")]
343unsafe impl<T: Send> Sync for Sender<T> {}
344
345/// The sending-half of Rust's synchronous [`sync_channel`] type.
346///
347/// Messages can be sent through this channel with [`send`] or [`try_send`].
348///
349/// [`send`] will block if there is no space in the internal buffer.
350///
351/// [`send`]: SyncSender::send
352/// [`try_send`]: SyncSender::try_send
353///
354/// # Examples
355///
356/// ```rust
357/// use std::sync::mpsc::sync_channel;
358/// use std::thread;
359///
360/// // Create a sync_channel with buffer size 2
361/// let (sync_sender, receiver) = sync_channel(2);
362/// let sync_sender2 = sync_sender.clone();
363///
364/// // First thread owns sync_sender
365/// thread::spawn(move || {
366///     sync_sender.send(1).unwrap();
367///     sync_sender.send(2).unwrap();
368/// });
369///
370/// // Second thread owns sync_sender2
371/// thread::spawn(move || {
372///     sync_sender2.send(3).unwrap();
373///     // thread will now block since the buffer is full
374///     println!("Thread unblocked!");
375/// });
376///
377/// let mut msg;
378///
379/// msg = receiver.recv().unwrap();
380/// println!("message {msg} received");
381///
382/// // "Thread unblocked!" will be printed now
383///
384/// msg = receiver.recv().unwrap();
385/// println!("message {msg} received");
386///
387/// msg = receiver.recv().unwrap();
388///
389/// println!("message {msg} received");
390/// ```
391#[stable(feature = "rust1", since = "1.0.0")]
392pub struct SyncSender<T> {
393    inner: mpmc::Sender<T>,
394}
395
396#[stable(feature = "rust1", since = "1.0.0")]
397unsafe impl<T: Send> Send for SyncSender<T> {}
398
399/// An error returned from the [`Sender::send`] or [`SyncSender::send`]
400/// function on **channel**s.
401///
402/// A **send** operation can only fail if the receiving end of a channel is
403/// disconnected, implying that the data could never be received. The error
404/// contains the data being sent as a payload so it can be recovered.
405#[stable(feature = "rust1", since = "1.0.0")]
406#[derive(PartialEq, Eq, Clone, Copy)]
407pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
408
409/// An error returned from the [`recv`] function on a [`Receiver`].
410///
411/// The [`recv`] operation can only fail if the sending half of a
412/// [`channel`] (or [`sync_channel`]) is disconnected, implying that no further
413/// messages will ever be received.
414///
415/// [`recv`]: Receiver::recv
416#[derive(PartialEq, Eq, Clone, Copy, Debug)]
417#[stable(feature = "rust1", since = "1.0.0")]
418pub struct RecvError;
419
420/// This enumeration is the list of the possible reasons that [`try_recv`] could
421/// not return data when called. This can occur with both a [`channel`] and
422/// a [`sync_channel`].
423///
424/// [`try_recv`]: Receiver::try_recv
425#[derive(PartialEq, Eq, Clone, Copy, Debug)]
426#[stable(feature = "rust1", since = "1.0.0")]
427pub enum TryRecvError {
428    /// This **channel** is currently empty, but the **Sender**(s) have not yet
429    /// disconnected, so data may yet become available.
430    #[stable(feature = "rust1", since = "1.0.0")]
431    Empty,
432
433    /// The **channel**'s sending half has become disconnected, and there will
434    /// never be any more data received on it.
435    #[stable(feature = "rust1", since = "1.0.0")]
436    Disconnected,
437}
438
439/// This enumeration is the list of possible errors that made [`recv_timeout`]
440/// unable to return data when called. This can occur with both a [`channel`] and
441/// a [`sync_channel`].
442///
443/// [`recv_timeout`]: Receiver::recv_timeout
444#[derive(PartialEq, Eq, Clone, Copy, Debug)]
445#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
446pub enum RecvTimeoutError {
447    /// This **channel** is currently empty, but the **Sender**(s) have not yet
448    /// disconnected, so data may yet become available.
449    #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
450    Timeout,
451    /// The **channel**'s sending half has become disconnected, and there will
452    /// never be any more data received on it.
453    #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
454    Disconnected,
455}
456
457/// This enumeration is the list of the possible error outcomes for the
458/// [`try_send`] method.
459///
460/// [`try_send`]: SyncSender::try_send
461#[stable(feature = "rust1", since = "1.0.0")]
462#[derive(PartialEq, Eq, Clone, Copy)]
463pub enum TrySendError<T> {
464    /// The data could not be sent on the [`sync_channel`] because it would require that
465    /// the callee block to send the data.
466    ///
467    /// If this is a buffered channel, then the buffer is full at this time. If
468    /// this is not a buffered channel, then there is no [`Receiver`] available to
469    /// acquire the data.
470    #[stable(feature = "rust1", since = "1.0.0")]
471    Full(#[stable(feature = "rust1", since = "1.0.0")] T),
472
473    /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
474    /// sent. The data is returned back to the callee in this case.
475    #[stable(feature = "rust1", since = "1.0.0")]
476    Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
477}
478
479/// Creates a new asynchronous channel, returning the sender/receiver halves.
480///
481/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
482/// the same order as it was sent, and no [`send`] will block the calling thread
483/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
484/// block after its buffer limit is reached). [`recv`] will block until a message
485/// is available while there is at least one [`Sender`] alive (including clones).
486///
487/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
488/// only one [`Receiver`] is supported.
489///
490/// If the [`Receiver`] is disconnected while trying to [`send`] with the
491/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, if the
492/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
493/// return a [`RecvError`].
494///
495/// [`send`]: Sender::send
496/// [`recv`]: Receiver::recv
497///
498/// # Examples
499///
500/// ```
501/// use std::sync::mpsc::channel;
502/// use std::thread;
503///
504/// let (sender, receiver) = channel();
505///
506/// // Spawn off an expensive computation
507/// thread::spawn(move || {
508/// #   fn expensive_computation() {}
509///     sender.send(expensive_computation()).unwrap();
510/// });
511///
512/// // Do some useful work for awhile
513///
514/// // Let's see what that answer was
515/// println!("{:?}", receiver.recv().unwrap());
516/// ```
517#[must_use]
518#[stable(feature = "rust1", since = "1.0.0")]
519pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
520    let (tx, rx) = mpmc::channel();
521    (Sender { inner: tx }, Receiver { inner: rx })
522}
523
524/// Creates a new synchronous, bounded channel.
525///
526/// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
527/// in the same order as it was sent. Like asynchronous [`channel`]s, the
528/// [`Receiver`] will block until a message becomes available. `sync_channel`
529/// differs greatly in the semantics of the sender, however.
530///
531/// This channel has an internal buffer on which messages will be queued.
532/// `bound` specifies the buffer size. When the internal buffer becomes full,
533/// future sends will *block* waiting for the buffer to open up. Note that a
534/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
535/// where each [`send`] will not return until a [`recv`] is paired with it.
536///
537/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
538/// times, but only one [`Receiver`] is supported.
539///
540/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
541/// to [`send`] with the [`SyncSender`], the [`send`] method will return a
542/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
543/// to [`recv`], the [`recv`] method will return a [`RecvError`].
544///
545/// [`send`]: SyncSender::send
546/// [`recv`]: Receiver::recv
547///
548/// # Examples
549///
550/// ```
551/// use std::sync::mpsc::sync_channel;
552/// use std::thread;
553///
554/// let (sender, receiver) = sync_channel(1);
555///
556/// // this returns immediately
557/// sender.send(1).unwrap();
558///
559/// thread::spawn(move || {
560///     // this will block until the previous message has been received
561///     sender.send(2).unwrap();
562/// });
563///
564/// assert_eq!(receiver.recv().unwrap(), 1);
565/// assert_eq!(receiver.recv().unwrap(), 2);
566/// ```
567#[must_use]
568#[stable(feature = "rust1", since = "1.0.0")]
569pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
570    let (tx, rx) = mpmc::sync_channel(bound);
571    (SyncSender { inner: tx }, Receiver { inner: rx })
572}
573
574////////////////////////////////////////////////////////////////////////////////
575// Sender
576////////////////////////////////////////////////////////////////////////////////
577
578impl<T> Sender<T> {
579    /// Attempts to send a value on this channel, returning it back if it could
580    /// not be sent.
581    ///
582    /// A successful send occurs when it is determined that the other end of
583    /// the channel has not hung up already. An unsuccessful send would be one
584    /// where the corresponding receiver has already been deallocated. Note
585    /// that a return value of [`Err`] means that the data will never be
586    /// received, but a return value of [`Ok`] does *not* mean that the data
587    /// will be received. It is possible for the corresponding receiver to
588    /// hang up immediately after this function returns [`Ok`].
589    ///
590    /// This method will never block the current thread.
591    ///
592    /// # Examples
593    ///
594    /// ```
595    /// use std::sync::mpsc::channel;
596    ///
597    /// let (tx, rx) = channel();
598    ///
599    /// // This send is always successful
600    /// tx.send(1).unwrap();
601    ///
602    /// // This send will fail because the receiver is gone
603    /// drop(rx);
604    /// assert_eq!(tx.send(1).unwrap_err().0, 1);
605    /// ```
606    #[stable(feature = "rust1", since = "1.0.0")]
607    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
608        self.inner.send(t)
609    }
610}
611
612#[stable(feature = "rust1", since = "1.0.0")]
613impl<T> Clone for Sender<T> {
614    /// Clone a sender to send to other threads.
615    ///
616    /// Note, be aware of the lifetime of the sender because all senders
617    /// (including the original) need to be dropped in order for
618    /// [`Receiver::recv`] to stop blocking.
619    fn clone(&self) -> Sender<T> {
620        Sender { inner: self.inner.clone() }
621    }
622}
623
624#[stable(feature = "mpsc_debug", since = "1.8.0")]
625impl<T> fmt::Debug for Sender<T> {
626    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627        f.debug_struct("Sender").finish_non_exhaustive()
628    }
629}
630
631////////////////////////////////////////////////////////////////////////////////
632// SyncSender
633////////////////////////////////////////////////////////////////////////////////
634
635impl<T> SyncSender<T> {
636    /// Sends a value on this synchronous channel.
637    ///
638    /// This function will *block* until space in the internal buffer becomes
639    /// available or a receiver is available to hand off the message to.
640    ///
641    /// Note that a successful send does *not* guarantee that the receiver will
642    /// ever see the data if there is a buffer on this channel. Items may be
643    /// enqueued in the internal buffer for the receiver to receive at a later
644    /// time. If the buffer size is 0, however, the channel becomes a rendezvous
645    /// channel and it guarantees that the receiver has indeed received
646    /// the data if this function returns success.
647    ///
648    /// This function will never panic, but it may return [`Err`] if the
649    /// [`Receiver`] has disconnected and is no longer able to receive
650    /// information.
651    ///
652    /// # Examples
653    ///
654    /// ```rust
655    /// use std::sync::mpsc::sync_channel;
656    /// use std::thread;
657    ///
658    /// // Create a rendezvous sync_channel with buffer size 0
659    /// let (sync_sender, receiver) = sync_channel(0);
660    ///
661    /// thread::spawn(move || {
662    ///    println!("sending message...");
663    ///    sync_sender.send(1).unwrap();
664    ///    // Thread is now blocked until the message is received
665    ///
666    ///    println!("...message received!");
667    /// });
668    ///
669    /// let msg = receiver.recv().unwrap();
670    /// assert_eq!(1, msg);
671    /// ```
672    #[stable(feature = "rust1", since = "1.0.0")]
673    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
674        self.inner.send(t)
675    }
676
677    /// Attempts to send a value on this channel without blocking.
678    ///
679    /// This method differs from [`send`] by returning immediately if the
680    /// channel's buffer is full or no receiver is waiting to acquire some
681    /// data. Compared with [`send`], this function has two failure cases
682    /// instead of one (one for disconnection, one for a full buffer).
683    ///
684    /// See [`send`] for notes about guarantees of whether the
685    /// receiver has received the data or not if this function is successful.
686    ///
687    /// [`send`]: Self::send
688    ///
689    /// # Examples
690    ///
691    /// ```rust
692    /// use std::sync::mpsc::sync_channel;
693    /// use std::thread;
694    ///
695    /// // Create a sync_channel with buffer size 1
696    /// let (sync_sender, receiver) = sync_channel(1);
697    /// let sync_sender2 = sync_sender.clone();
698    ///
699    /// // First thread owns sync_sender
700    /// thread::spawn(move || {
701    ///     sync_sender.send(1).unwrap();
702    ///     sync_sender.send(2).unwrap();
703    ///     // Thread blocked
704    /// });
705    ///
706    /// // Second thread owns sync_sender2
707    /// thread::spawn(move || {
708    ///     // This will return an error and send
709    ///     // no message if the buffer is full
710    ///     let _ = sync_sender2.try_send(3);
711    /// });
712    ///
713    /// let mut msg;
714    /// msg = receiver.recv().unwrap();
715    /// println!("message {msg} received");
716    ///
717    /// msg = receiver.recv().unwrap();
718    /// println!("message {msg} received");
719    ///
720    /// // Third message may have never been sent
721    /// match receiver.try_recv() {
722    ///     Ok(msg) => println!("message {msg} received"),
723    ///     Err(_) => println!("the third message was never sent"),
724    /// }
725    /// ```
726    #[stable(feature = "rust1", since = "1.0.0")]
727    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
728        self.inner.try_send(t)
729    }
730
731    // Attempts to send for a value on this receiver, returning an error if the
732    // corresponding channel has hung up, or if it waits more than `timeout`.
733    //
734    // This method is currently only used for tests.
735    #[unstable(issue = "none", feature = "std_internals")]
736    #[doc(hidden)]
737    pub fn send_timeout(&self, t: T, timeout: Duration) -> Result<(), mpmc::SendTimeoutError<T>> {
738        self.inner.send_timeout(t, timeout)
739    }
740}
741
742#[stable(feature = "rust1", since = "1.0.0")]
743impl<T> Clone for SyncSender<T> {
744    fn clone(&self) -> SyncSender<T> {
745        SyncSender { inner: self.inner.clone() }
746    }
747}
748
749#[stable(feature = "mpsc_debug", since = "1.8.0")]
750impl<T> fmt::Debug for SyncSender<T> {
751    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
752        f.debug_struct("SyncSender").finish_non_exhaustive()
753    }
754}
755
756////////////////////////////////////////////////////////////////////////////////
757// Receiver
758////////////////////////////////////////////////////////////////////////////////
759
760impl<T> Receiver<T> {
761    /// Attempts to return a pending value on this receiver without blocking.
762    ///
763    /// This method will never block the caller in order to wait for data to
764    /// become available. Instead, this will always return immediately with a
765    /// possible option of pending data on the channel.
766    ///
767    /// This is useful for a flavor of "optimistic check" before deciding to
768    /// block on a receiver.
769    ///
770    /// Compared with [`recv`], this function has two failure cases instead of one
771    /// (one for disconnection, one for an empty buffer).
772    ///
773    /// [`recv`]: Self::recv
774    ///
775    /// # Examples
776    ///
777    /// ```rust
778    /// use std::sync::mpsc::{Receiver, channel};
779    ///
780    /// let (_, receiver): (_, Receiver<i32>) = channel();
781    ///
782    /// assert!(receiver.try_recv().is_err());
783    /// ```
784    #[stable(feature = "rust1", since = "1.0.0")]
785    pub fn try_recv(&self) -> Result<T, TryRecvError> {
786        self.inner.try_recv()
787    }
788
789    /// Attempts to wait for a value on this receiver, returning an error if the
790    /// corresponding channel has hung up.
791    ///
792    /// This function will always block the current thread if there is no data
793    /// available and it's possible for more data to be sent (at least one sender
794    /// still exists). Once a message is sent to the corresponding [`Sender`]
795    /// (or [`SyncSender`]), this receiver will wake up and return that
796    /// message.
797    ///
798    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
799    /// this call is blocking, this call will wake up and return [`Err`] to
800    /// indicate that no more messages can ever be received on this channel.
801    /// However, since channels are buffered, messages sent before the disconnect
802    /// will still be properly received.
803    ///
804    /// # Examples
805    ///
806    /// ```
807    /// use std::sync::mpsc;
808    /// use std::thread;
809    ///
810    /// let (send, recv) = mpsc::channel();
811    /// let handle = thread::spawn(move || {
812    ///     send.send(1u8).unwrap();
813    /// });
814    ///
815    /// handle.join().unwrap();
816    ///
817    /// assert_eq!(Ok(1), recv.recv());
818    /// ```
819    ///
820    /// Buffering behavior:
821    ///
822    /// ```
823    /// use std::sync::mpsc;
824    /// use std::thread;
825    /// use std::sync::mpsc::RecvError;
826    ///
827    /// let (send, recv) = mpsc::channel();
828    /// let handle = thread::spawn(move || {
829    ///     send.send(1u8).unwrap();
830    ///     send.send(2).unwrap();
831    ///     send.send(3).unwrap();
832    ///     drop(send);
833    /// });
834    ///
835    /// // wait for the thread to join so we ensure the sender is dropped
836    /// handle.join().unwrap();
837    ///
838    /// assert_eq!(Ok(1), recv.recv());
839    /// assert_eq!(Ok(2), recv.recv());
840    /// assert_eq!(Ok(3), recv.recv());
841    /// assert_eq!(Err(RecvError), recv.recv());
842    /// ```
843    #[stable(feature = "rust1", since = "1.0.0")]
844    pub fn recv(&self) -> Result<T, RecvError> {
845        self.inner.recv()
846    }
847
848    /// Attempts to wait for a value on this receiver, returning an error if the
849    /// corresponding channel has hung up, or if it waits more than `timeout`.
850    ///
851    /// This function will always block the current thread if there is no data
852    /// available and it's possible for more data to be sent (at least one sender
853    /// still exists). Once a message is sent to the corresponding [`Sender`]
854    /// (or [`SyncSender`]), this receiver will wake up and return that
855    /// message.
856    ///
857    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
858    /// this call is blocking, this call will wake up and return [`Err`] to
859    /// indicate that no more messages can ever be received on this channel.
860    /// However, since channels are buffered, messages sent before the disconnect
861    /// will still be properly received.
862    ///
863    /// # Examples
864    ///
865    /// Successfully receiving value before encountering timeout:
866    ///
867    /// ```no_run
868    /// use std::thread;
869    /// use std::time::Duration;
870    /// use std::sync::mpsc;
871    ///
872    /// let (send, recv) = mpsc::channel();
873    ///
874    /// thread::spawn(move || {
875    ///     send.send('a').unwrap();
876    /// });
877    ///
878    /// assert_eq!(
879    ///     recv.recv_timeout(Duration::from_millis(400)),
880    ///     Ok('a')
881    /// );
882    /// ```
883    ///
884    /// Receiving an error upon reaching timeout:
885    ///
886    /// ```no_run
887    /// use std::thread;
888    /// use std::time::Duration;
889    /// use std::sync::mpsc;
890    ///
891    /// let (send, recv) = mpsc::channel();
892    ///
893    /// thread::spawn(move || {
894    ///     thread::sleep(Duration::from_millis(800));
895    ///     send.send('a').unwrap();
896    /// });
897    ///
898    /// assert_eq!(
899    ///     recv.recv_timeout(Duration::from_millis(400)),
900    ///     Err(mpsc::RecvTimeoutError::Timeout)
901    /// );
902    /// ```
903    #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
904    pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
905        self.inner.recv_timeout(timeout)
906    }
907
908    /// Attempts to wait for a value on this receiver, returning an error if the
909    /// corresponding channel has hung up, or if `deadline` is reached.
910    ///
911    /// This function will always block the current thread if there is no data
912    /// available and it's possible for more data to be sent. Once a message is
913    /// sent to the corresponding [`Sender`] (or [`SyncSender`]), then this
914    /// receiver will wake up and return that message.
915    ///
916    /// If the corresponding [`Sender`] has disconnected, or it disconnects while
917    /// this call is blocking, this call will wake up and return [`Err`] to
918    /// indicate that no more messages can ever be received on this channel.
919    /// However, since channels are buffered, messages sent before the disconnect
920    /// will still be properly received.
921    ///
922    /// # Examples
923    ///
924    /// Successfully receiving value before reaching deadline:
925    ///
926    /// ```no_run
927    /// #![feature(deadline_api)]
928    /// use std::thread;
929    /// use std::time::{Duration, Instant};
930    /// use std::sync::mpsc;
931    ///
932    /// let (send, recv) = mpsc::channel();
933    ///
934    /// thread::spawn(move || {
935    ///     send.send('a').unwrap();
936    /// });
937    ///
938    /// assert_eq!(
939    ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
940    ///     Ok('a')
941    /// );
942    /// ```
943    ///
944    /// Receiving an error upon reaching deadline:
945    ///
946    /// ```no_run
947    /// #![feature(deadline_api)]
948    /// use std::thread;
949    /// use std::time::{Duration, Instant};
950    /// use std::sync::mpsc;
951    ///
952    /// let (send, recv) = mpsc::channel();
953    ///
954    /// thread::spawn(move || {
955    ///     thread::sleep(Duration::from_millis(800));
956    ///     send.send('a').unwrap();
957    /// });
958    ///
959    /// assert_eq!(
960    ///     recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
961    ///     Err(mpsc::RecvTimeoutError::Timeout)
962    /// );
963    /// ```
964    #[unstable(feature = "deadline_api", issue = "46316")]
965    pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
966        self.inner.recv_deadline(deadline)
967    }
968
969    /// Returns an iterator that will block waiting for messages, but never
970    /// [`panic!`]. It will return [`None`] when the channel has hung up.
971    ///
972    /// # Examples
973    ///
974    /// ```rust
975    /// use std::sync::mpsc::channel;
976    /// use std::thread;
977    ///
978    /// let (send, recv) = channel();
979    ///
980    /// thread::spawn(move || {
981    ///     send.send(1).unwrap();
982    ///     send.send(2).unwrap();
983    ///     send.send(3).unwrap();
984    /// });
985    ///
986    /// let mut iter = recv.iter();
987    /// assert_eq!(iter.next(), Some(1));
988    /// assert_eq!(iter.next(), Some(2));
989    /// assert_eq!(iter.next(), Some(3));
990    /// assert_eq!(iter.next(), None);
991    /// ```
992    #[stable(feature = "rust1", since = "1.0.0")]
993    pub fn iter(&self) -> Iter<'_, T> {
994        Iter { rx: self }
995    }
996
997    /// Returns an iterator that will attempt to yield all pending values.
998    /// It will return `None` if there are no more pending values or if the
999    /// channel has hung up. The iterator will never [`panic!`] or block the
1000    /// user by waiting for values.
1001    ///
1002    /// # Examples
1003    ///
1004    /// ```no_run
1005    /// use std::sync::mpsc::channel;
1006    /// use std::thread;
1007    /// use std::time::Duration;
1008    ///
1009    /// let (sender, receiver) = channel();
1010    ///
1011    /// // nothing is in the buffer yet
1012    /// assert!(receiver.try_iter().next().is_none());
1013    ///
1014    /// thread::spawn(move || {
1015    ///     thread::sleep(Duration::from_secs(1));
1016    ///     sender.send(1).unwrap();
1017    ///     sender.send(2).unwrap();
1018    ///     sender.send(3).unwrap();
1019    /// });
1020    ///
1021    /// // nothing is in the buffer yet
1022    /// assert!(receiver.try_iter().next().is_none());
1023    ///
1024    /// // block for two seconds
1025    /// thread::sleep(Duration::from_secs(2));
1026    ///
1027    /// let mut iter = receiver.try_iter();
1028    /// assert_eq!(iter.next(), Some(1));
1029    /// assert_eq!(iter.next(), Some(2));
1030    /// assert_eq!(iter.next(), Some(3));
1031    /// assert_eq!(iter.next(), None);
1032    /// ```
1033    #[stable(feature = "receiver_try_iter", since = "1.15.0")]
1034    pub fn try_iter(&self) -> TryIter<'_, T> {
1035        TryIter { rx: self }
1036    }
1037}
1038
1039#[stable(feature = "rust1", since = "1.0.0")]
1040impl<'a, T> Iterator for Iter<'a, T> {
1041    type Item = T;
1042
1043    fn next(&mut self) -> Option<T> {
1044        self.rx.recv().ok()
1045    }
1046}
1047
1048#[stable(feature = "receiver_try_iter", since = "1.15.0")]
1049impl<'a, T> Iterator for TryIter<'a, T> {
1050    type Item = T;
1051
1052    fn next(&mut self) -> Option<T> {
1053        self.rx.try_recv().ok()
1054    }
1055}
1056
1057#[stable(feature = "receiver_into_iter", since = "1.1.0")]
1058impl<'a, T> IntoIterator for &'a Receiver<T> {
1059    type Item = T;
1060    type IntoIter = Iter<'a, T>;
1061
1062    fn into_iter(self) -> Iter<'a, T> {
1063        self.iter()
1064    }
1065}
1066
1067#[stable(feature = "receiver_into_iter", since = "1.1.0")]
1068impl<T> Iterator for IntoIter<T> {
1069    type Item = T;
1070    fn next(&mut self) -> Option<T> {
1071        self.rx.recv().ok()
1072    }
1073}
1074
1075#[stable(feature = "receiver_into_iter", since = "1.1.0")]
1076impl<T> IntoIterator for Receiver<T> {
1077    type Item = T;
1078    type IntoIter = IntoIter<T>;
1079
1080    fn into_iter(self) -> IntoIter<T> {
1081        IntoIter { rx: self }
1082    }
1083}
1084
1085#[stable(feature = "mpsc_debug", since = "1.8.0")]
1086impl<T> fmt::Debug for Receiver<T> {
1087    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1088        f.debug_struct("Receiver").finish_non_exhaustive()
1089    }
1090}
1091
1092#[stable(feature = "rust1", since = "1.0.0")]
1093impl<T> fmt::Debug for SendError<T> {
1094    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1095        f.debug_struct("SendError").finish_non_exhaustive()
1096    }
1097}
1098
1099#[stable(feature = "rust1", since = "1.0.0")]
1100impl<T> fmt::Display for SendError<T> {
1101    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1102        "sending on a closed channel".fmt(f)
1103    }
1104}
1105
1106#[stable(feature = "rust1", since = "1.0.0")]
1107impl<T> error::Error for SendError<T> {
1108    #[allow(deprecated)]
1109    fn description(&self) -> &str {
1110        "sending on a closed channel"
1111    }
1112}
1113
1114#[stable(feature = "rust1", since = "1.0.0")]
1115impl<T> fmt::Debug for TrySendError<T> {
1116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1117        match *self {
1118            TrySendError::Full(..) => "Full(..)".fmt(f),
1119            TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
1120        }
1121    }
1122}
1123
1124#[stable(feature = "rust1", since = "1.0.0")]
1125impl<T> fmt::Display for TrySendError<T> {
1126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1127        match *self {
1128            TrySendError::Full(..) => "sending on a full channel".fmt(f),
1129            TrySendError::Disconnected(..) => "sending on a closed channel".fmt(f),
1130        }
1131    }
1132}
1133
1134#[stable(feature = "rust1", since = "1.0.0")]
1135impl<T> error::Error for TrySendError<T> {
1136    #[allow(deprecated)]
1137    fn description(&self) -> &str {
1138        match *self {
1139            TrySendError::Full(..) => "sending on a full channel",
1140            TrySendError::Disconnected(..) => "sending on a closed channel",
1141        }
1142    }
1143}
1144
1145#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1146impl<T> From<SendError<T>> for TrySendError<T> {
1147    /// Converts a `SendError<T>` into a `TrySendError<T>`.
1148    ///
1149    /// This conversion always returns a `TrySendError::Disconnected` containing the data in the `SendError<T>`.
1150    ///
1151    /// No data is allocated on the heap.
1152    fn from(err: SendError<T>) -> TrySendError<T> {
1153        match err {
1154            SendError(t) => TrySendError::Disconnected(t),
1155        }
1156    }
1157}
1158
1159#[stable(feature = "rust1", since = "1.0.0")]
1160impl fmt::Display for RecvError {
1161    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1162        "receiving on a closed channel".fmt(f)
1163    }
1164}
1165
1166#[stable(feature = "rust1", since = "1.0.0")]
1167impl error::Error for RecvError {
1168    #[allow(deprecated)]
1169    fn description(&self) -> &str {
1170        "receiving on a closed channel"
1171    }
1172}
1173
1174#[stable(feature = "rust1", since = "1.0.0")]
1175impl fmt::Display for TryRecvError {
1176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1177        match *self {
1178            TryRecvError::Empty => "receiving on an empty channel".fmt(f),
1179            TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
1180        }
1181    }
1182}
1183
1184#[stable(feature = "rust1", since = "1.0.0")]
1185impl error::Error for TryRecvError {
1186    #[allow(deprecated)]
1187    fn description(&self) -> &str {
1188        match *self {
1189            TryRecvError::Empty => "receiving on an empty channel",
1190            TryRecvError::Disconnected => "receiving on a closed channel",
1191        }
1192    }
1193}
1194
1195#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1196impl From<RecvError> for TryRecvError {
1197    /// Converts a `RecvError` into a `TryRecvError`.
1198    ///
1199    /// This conversion always returns `TryRecvError::Disconnected`.
1200    ///
1201    /// No data is allocated on the heap.
1202    fn from(err: RecvError) -> TryRecvError {
1203        match err {
1204            RecvError => TryRecvError::Disconnected,
1205        }
1206    }
1207}
1208
1209#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1210impl fmt::Display for RecvTimeoutError {
1211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1212        match *self {
1213            RecvTimeoutError::Timeout => "timed out waiting on channel".fmt(f),
1214            RecvTimeoutError::Disconnected => "channel is empty and sending half is closed".fmt(f),
1215        }
1216    }
1217}
1218
1219#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
1220impl error::Error for RecvTimeoutError {
1221    #[allow(deprecated)]
1222    fn description(&self) -> &str {
1223        match *self {
1224            RecvTimeoutError::Timeout => "timed out waiting on channel",
1225            RecvTimeoutError::Disconnected => "channel is empty and sending half is closed",
1226        }
1227    }
1228}
1229
1230#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
1231impl From<RecvError> for RecvTimeoutError {
1232    /// Converts a `RecvError` into a `RecvTimeoutError`.
1233    ///
1234    /// This conversion always returns `RecvTimeoutError::Disconnected`.
1235    ///
1236    /// No data is allocated on the heap.
1237    fn from(err: RecvError) -> RecvTimeoutError {
1238        match err {
1239            RecvError => RecvTimeoutError::Disconnected,
1240        }
1241    }
1242}