std/sync/
oneshot.rs

1//! A single-producer, single-consumer (oneshot) channel.
2//!
3//! This is an experimental module, so the API will likely change.
4
5use crate::sync::mpmc;
6use crate::sync::mpsc::{RecvError, SendError};
7use crate::time::{Duration, Instant};
8use crate::{error, fmt};
9
10/// Creates a new oneshot channel, returning the sender/receiver halves.
11///
12/// # Examples
13///
14/// ```
15/// #![feature(oneshot_channel)]
16/// use std::sync::oneshot;
17/// use std::thread;
18///
19/// let (sender, receiver) = oneshot::channel();
20///
21/// // Spawn off an expensive computation.
22/// thread::spawn(move || {
23/// #   fn expensive_computation() -> i32 { 42 }
24///     sender.send(expensive_computation()).unwrap();
25///     // `sender` is consumed by `send`, so we cannot use it anymore.
26/// });
27///
28/// # fn do_other_work() -> i32 { 42 }
29/// do_other_work();
30///
31/// // Let's see what that answer was...
32/// println!("{:?}", receiver.recv().unwrap());
33/// // `receiver` is consumed by `recv`, so we cannot use it anymore.
34/// ```
35#[must_use]
36#[unstable(feature = "oneshot_channel", issue = "143674")]
37pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
38    // Using a `sync_channel` with capacity 1 means that the internal implementation will use the
39    // `Array`-flavored channel implementation.
40    let (sender, receiver) = mpmc::sync_channel(1);
41    (Sender { inner: sender }, Receiver { inner: receiver })
42}
43
44////////////////////////////////////////////////////////////////////////////////////////////////////
45// Sender
46////////////////////////////////////////////////////////////////////////////////////////////////////
47
48/// The sending half of a oneshot channel.
49///
50/// # Examples
51///
52/// ```
53/// #![feature(oneshot_channel)]
54/// use std::sync::oneshot;
55/// use std::thread;
56///
57/// let (sender, receiver) = oneshot::channel();
58///
59/// thread::spawn(move || {
60///     sender.send("Hello from thread!").unwrap();
61/// });
62///
63/// assert_eq!(receiver.recv().unwrap(), "Hello from thread!");
64/// ```
65///
66/// `Sender` cannot be sent between threads if it is sending non-`Send` types.
67///
68/// ```compile_fail
69/// #![feature(oneshot_channel)]
70/// use std::sync::oneshot;
71/// use std::thread;
72/// use std::ptr;
73///
74/// let (sender, receiver) = oneshot::channel();
75///
76/// struct NotSend(*mut ());
77/// thread::spawn(move || {
78///     sender.send(NotSend(ptr::null_mut()));
79/// });
80///
81/// let reply = receiver.try_recv().unwrap();
82/// ```
83#[unstable(feature = "oneshot_channel", issue = "143674")]
84pub struct Sender<T> {
85    /// The `oneshot` channel is simply a wrapper around a `mpmc` channel.
86    inner: mpmc::Sender<T>,
87}
88
89// SAFETY: Since the only methods in which synchronization must occur take full ownership of the
90// [`Sender`], it is perfectly safe to share a `&Sender` between threads (as it is effectively
91// useless without ownership).
92#[unstable(feature = "oneshot_channel", issue = "143674")]
93unsafe impl<T> Sync for Sender<T> {}
94
95impl<T> Sender<T> {
96    /// Attempts to send a value through this channel. This can only fail if the corresponding
97    /// [`Receiver<T>`] has been dropped.
98    ///
99    /// This method is non-blocking (wait-free).
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// #![feature(oneshot_channel)]
105    /// use std::sync::oneshot;
106    /// use std::thread;
107    ///
108    /// let (tx, rx) = oneshot::channel();
109    ///
110    /// thread::spawn(move || {
111    ///     // Perform some computation.
112    ///     let result = 2 + 2;
113    ///     tx.send(result).unwrap();
114    /// });
115    ///
116    /// assert_eq!(rx.recv().unwrap(), 4);
117    /// ```
118    #[unstable(feature = "oneshot_channel", issue = "143674")]
119    pub fn send(self, t: T) -> Result<(), SendError<T>> {
120        self.inner.send(t)
121    }
122}
123
124#[unstable(feature = "oneshot_channel", issue = "143674")]
125impl<T> fmt::Debug for Sender<T> {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        f.debug_struct("Sender").finish_non_exhaustive()
128    }
129}
130
131////////////////////////////////////////////////////////////////////////////////////////////////////
132// Receiver
133////////////////////////////////////////////////////////////////////////////////////////////////////
134
135/// The receiving half of a oneshot channel.
136///
137/// # Examples
138///
139/// ```
140/// #![feature(oneshot_channel)]
141/// use std::sync::oneshot;
142/// use std::thread;
143/// use std::time::Duration;
144///
145/// let (sender, receiver) = oneshot::channel();
146///
147/// thread::spawn(move || {
148///     thread::sleep(Duration::from_millis(100));
149///     sender.send("Hello after delay!").unwrap();
150/// });
151///
152/// println!("Waiting for message...");
153/// println!("{}", receiver.recv().unwrap());
154/// ```
155///
156/// `Receiver` cannot be sent between threads if it is receiving non-`Send` types.
157///
158/// ```compile_fail
159/// # #![feature(oneshot_channel)]
160/// # use std::sync::oneshot;
161/// # use std::thread;
162/// # use std::ptr;
163/// #
164/// let (sender, receiver) = oneshot::channel();
165///
166/// struct NotSend(*mut ());
167/// sender.send(NotSend(ptr::null_mut()));
168///
169/// thread::spawn(move || {
170///     let reply = receiver.try_recv().unwrap();
171/// });
172/// ```
173#[unstable(feature = "oneshot_channel", issue = "143674")]
174pub struct Receiver<T> {
175    /// The `oneshot` channel is simply a wrapper around a `mpmc` channel.
176    inner: mpmc::Receiver<T>,
177}
178
179// SAFETY: Since the only methods in which synchronization must occur take full ownership of the
180// [`Receiver`], it is perfectly safe to share a `&Receiver` between threads (as it is unable to
181// receive any values without ownership).
182#[unstable(feature = "oneshot_channel", issue = "143674")]
183unsafe impl<T> Sync for Receiver<T> {}
184
185impl<T> Receiver<T> {
186    /// Receives the value from the sending end, blocking the calling thread until it gets it.
187    ///
188    /// Can only fail if the corresponding [`Sender<T>`] has been dropped.
189    ///
190    /// # Examples
191    ///
192    /// ```
193    /// #![feature(oneshot_channel)]
194    /// use std::sync::oneshot;
195    /// use std::thread;
196    /// use std::time::Duration;
197    ///
198    /// let (tx, rx) = oneshot::channel();
199    ///
200    /// thread::spawn(move || {
201    ///     thread::sleep(Duration::from_millis(500));
202    ///     tx.send("Done!").unwrap();
203    /// });
204    ///
205    /// // This will block until the message arrives.
206    /// println!("{}", rx.recv().unwrap());
207    /// ```
208    #[unstable(feature = "oneshot_channel", issue = "143674")]
209    pub fn recv(self) -> Result<T, RecvError> {
210        self.inner.recv()
211    }
212
213    // Fallible methods.
214
215    /// Attempts to return a pending value on this receiver without blocking.
216    ///
217    /// # Examples
218    ///
219    /// ```
220    /// #![feature(oneshot_channel)]
221    /// use std::sync::oneshot;
222    /// use std::thread;
223    /// use std::time::Duration;
224    ///
225    /// let (sender, mut receiver) = oneshot::channel();
226    ///
227    /// thread::spawn(move || {
228    ///     thread::sleep(Duration::from_millis(100));
229    ///     sender.send(42).unwrap();
230    /// });
231    ///
232    /// // Keep trying until we get the message, doing other work in the process.
233    /// loop {
234    ///     match receiver.try_recv() {
235    ///         Ok(value) => {
236    ///             assert_eq!(value, 42);
237    ///             break;
238    ///         }
239    ///         Err(oneshot::TryRecvError::Empty(rx)) => {
240    ///             // Retake ownership of the receiver.
241    ///             receiver = rx;
242    /// #           fn do_other_work() { thread::sleep(Duration::from_millis(25)); }
243    ///             do_other_work();
244    ///         }
245    ///         Err(oneshot::TryRecvError::Disconnected) => panic!("Sender disconnected"),
246    ///     }
247    /// }
248    /// ```
249    #[unstable(feature = "oneshot_channel", issue = "143674")]
250    pub fn try_recv(self) -> Result<T, TryRecvError<T>> {
251        self.inner.try_recv().map_err(|err| match err {
252            mpmc::TryRecvError::Empty => TryRecvError::Empty(self),
253            mpmc::TryRecvError::Disconnected => TryRecvError::Disconnected,
254        })
255    }
256
257    /// Attempts to wait for a value on this receiver, returning an error if the corresponding
258    /// [`Sender`] half of this channel has been dropped, or if it waits more than `timeout`.
259    ///
260    /// # Examples
261    ///
262    /// ```
263    /// #![feature(oneshot_channel)]
264    /// use std::sync::oneshot;
265    /// use std::thread;
266    /// use std::time::Duration;
267    ///
268    /// let (sender, receiver) = oneshot::channel();
269    ///
270    /// thread::spawn(move || {
271    ///     thread::sleep(Duration::from_millis(500));
272    ///     sender.send("Success!").unwrap();
273    /// });
274    ///
275    /// // Wait up to 1 second for the message
276    /// match receiver.recv_timeout(Duration::from_secs(1)) {
277    ///     Ok(msg) => println!("Received: {}", msg),
278    ///     Err(oneshot::RecvTimeoutError::Timeout(_)) => println!("Timed out!"),
279    ///     Err(oneshot::RecvTimeoutError::Disconnected) => println!("Sender dropped!"),
280    /// }
281    /// ```
282    #[unstable(feature = "oneshot_channel", issue = "143674")]
283    pub fn recv_timeout(self, timeout: Duration) -> Result<T, RecvTimeoutError<T>> {
284        self.inner.recv_timeout(timeout).map_err(|err| match err {
285            mpmc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout(self),
286            mpmc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
287        })
288    }
289
290    /// Attempts to wait for a value on this receiver, returning an error if the corresponding
291    /// [`Sender`] half of this channel has been dropped, or if `deadline` is reached.
292    ///
293    /// # Examples
294    ///
295    /// ```
296    /// #![feature(oneshot_channel)]
297    /// use std::sync::oneshot;
298    /// use std::thread;
299    /// use std::time::{Duration, Instant};
300    ///
301    /// let (sender, receiver) = oneshot::channel();
302    ///
303    /// thread::spawn(move || {
304    ///     thread::sleep(Duration::from_millis(100));
305    ///     sender.send("Just in time!").unwrap();
306    /// });
307    ///
308    /// let deadline = Instant::now() + Duration::from_millis(500);
309    /// match receiver.recv_deadline(deadline) {
310    ///     Ok(msg) => println!("Received: {}", msg),
311    ///     Err(oneshot::RecvTimeoutError::Timeout(_)) => println!("Missed deadline!"),
312    ///     Err(oneshot::RecvTimeoutError::Disconnected) => println!("Sender dropped!"),
313    /// }
314    /// ```
315    #[unstable(feature = "oneshot_channel", issue = "143674")]
316    pub fn recv_deadline(self, deadline: Instant) -> Result<T, RecvTimeoutError<T>> {
317        self.inner.recv_deadline(deadline).map_err(|err| match err {
318            mpmc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout(self),
319            mpmc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
320        })
321    }
322}
323
324#[unstable(feature = "oneshot_channel", issue = "143674")]
325impl<T> fmt::Debug for Receiver<T> {
326    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
327        f.debug_struct("Receiver").finish_non_exhaustive()
328    }
329}
330
331////////////////////////////////////////////////////////////////////////////////////////////////////
332// Receiver Errors
333////////////////////////////////////////////////////////////////////////////////////////////////////
334
335/// An error returned from the [`try_recv`](Receiver::try_recv) method.
336///
337/// See the documentation for [`try_recv`] for more information on how to use this error.
338///
339/// [`try_recv`]: Receiver::try_recv
340#[unstable(feature = "oneshot_channel", issue = "143674")]
341pub enum TryRecvError<T> {
342    /// The [`Sender`] has not sent a message yet, but it might in the future (as it has not yet
343    /// disconnected). This variant contains the [`Receiver`] that [`try_recv`](Receiver::try_recv)
344    /// took ownership over.
345    Empty(Receiver<T>),
346    /// The corresponding [`Sender`] half of this channel has become disconnected, and there will
347    /// never be any more data sent over the channel.
348    Disconnected,
349}
350
351/// An error returned from the [`recv_timeout`](Receiver::recv_timeout) or
352/// [`recv_deadline`](Receiver::recv_deadline) methods.
353///
354/// # Examples
355///
356/// Usage of this error is similar to [`TryRecvError`].
357///
358/// ```
359/// #![feature(oneshot_channel)]
360/// use std::sync::oneshot::{self, RecvTimeoutError};
361/// use std::thread;
362/// use std::time::Duration;
363///
364/// let (sender, receiver) = oneshot::channel();
365///
366/// let send_failure = thread::spawn(move || {
367///     // Simulate a long computation that takes longer than our timeout.
368///     thread::sleep(Duration::from_millis(250));
369///
370///     // This will likely fail to send because we drop the receiver in the main thread.
371///     sender.send("Goodbye!".to_string()).unwrap();
372/// });
373///
374/// // Try to receive the message with a short timeout.
375/// match receiver.recv_timeout(Duration::from_millis(10)) {
376///     Ok(msg) => println!("Received: {}", msg),
377///     Err(RecvTimeoutError::Timeout(rx)) => {
378///         println!("Timed out waiting for message!");
379///
380///         // Note that you can reuse the receiver without dropping it.
381///         drop(rx);
382///     },
383///     Err(RecvTimeoutError::Disconnected) => println!("Sender dropped!"),
384/// }
385///
386/// send_failure.join().unwrap_err();
387/// ```
388#[unstable(feature = "oneshot_channel", issue = "143674")]
389pub enum RecvTimeoutError<T> {
390    /// The [`Sender`] has not sent a message yet, but it might in the future (as it has not yet
391    /// disconnected). This variant contains the [`Receiver`] that either
392    /// [`recv_timeout`](Receiver::recv_timeout) or [`recv_deadline`](Receiver::recv_deadline) took
393    /// ownership over.
394    Timeout(Receiver<T>),
395    /// The corresponding [`Sender`] half of this channel has become disconnected, and there will
396    /// never be any more data sent over the channel.
397    Disconnected,
398}
399
400#[unstable(feature = "oneshot_channel", issue = "143674")]
401impl<T> fmt::Debug for TryRecvError<T> {
402    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403        f.debug_tuple("TryRecvError").finish_non_exhaustive()
404    }
405}
406
407#[unstable(feature = "oneshot_channel", issue = "143674")]
408impl<T> fmt::Display for TryRecvError<T> {
409    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410        match *self {
411            TryRecvError::Empty(..) => "receiving on an empty oneshot channel".fmt(f),
412            TryRecvError::Disconnected => "receiving on a closed oneshot channel".fmt(f),
413        }
414    }
415}
416
417#[unstable(feature = "oneshot_channel", issue = "143674")]
418impl<T> error::Error for TryRecvError<T> {}
419
420#[unstable(feature = "oneshot_channel", issue = "143674")]
421impl<T> From<RecvError> for TryRecvError<T> {
422    /// Converts a `RecvError` into a `TryRecvError`.
423    ///
424    /// This conversion always returns `TryRecvError::Disconnected`.
425    ///
426    /// No data is allocated on the heap.
427    fn from(err: RecvError) -> TryRecvError<T> {
428        match err {
429            RecvError => TryRecvError::Disconnected,
430        }
431    }
432}
433
434#[unstable(feature = "oneshot_channel", issue = "143674")]
435impl<T> fmt::Debug for RecvTimeoutError<T> {
436    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
437        f.debug_tuple("RecvTimeoutError").finish_non_exhaustive()
438    }
439}
440
441#[unstable(feature = "oneshot_channel", issue = "143674")]
442impl<T> fmt::Display for RecvTimeoutError<T> {
443    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
444        match *self {
445            RecvTimeoutError::Timeout(..) => "timed out waiting on oneshot channel".fmt(f),
446            RecvTimeoutError::Disconnected => "receiving on a closed oneshot channel".fmt(f),
447        }
448    }
449}
450
451#[unstable(feature = "oneshot_channel", issue = "143674")]
452impl<T> error::Error for RecvTimeoutError<T> {}
453
454#[unstable(feature = "oneshot_channel", issue = "143674")]
455impl<T> From<RecvError> for RecvTimeoutError<T> {
456    /// Converts a `RecvError` into a `RecvTimeoutError`.
457    ///
458    /// This conversion always returns `RecvTimeoutError::Disconnected`.
459    ///
460    /// No data is allocated on the heap.
461    fn from(err: RecvError) -> RecvTimeoutError<T> {
462        match err {
463            RecvError => RecvTimeoutError::Disconnected,
464        }
465    }
466}