core/future/
join.rs

1#![allow(unused_imports, unused_macros)] // items are used by the macro
2
3use crate::cell::UnsafeCell;
4use crate::future::{Future, poll_fn};
5use crate::mem;
6use crate::pin::Pin;
7use crate::task::{Context, Poll, ready};
8
9/// Polls multiple futures simultaneously, returning a tuple
10/// of all results once complete.
11///
12/// While `join!(a, b).await` is similar to `(a.await, b.await)`,
13/// `join!` polls both futures concurrently and is therefore more efficient.
14///
15/// # Examples
16///
17/// ```
18/// #![feature(future_join)]
19///
20/// use std::future::join;
21///
22/// async fn one() -> usize { 1 }
23/// async fn two() -> usize { 2 }
24///
25/// # let _ =  async {
26/// let x = join!(one(), two()).await;
27/// assert_eq!(x, (1, 2));
28/// # };
29/// ```
30///
31/// `join!` is variadic, so you can pass any number of futures:
32///
33/// ```
34/// #![feature(future_join)]
35///
36/// use std::future::join;
37///
38/// async fn one() -> usize { 1 }
39/// async fn two() -> usize { 2 }
40/// async fn three() -> usize { 3 }
41///
42/// # let _ = async {
43/// let x = join!(one(), two(), three()).await;
44/// assert_eq!(x, (1, 2, 3));
45/// # };
46/// ```
47#[unstable(feature = "future_join", issue = "91642")]
48pub macro join( $($fut:expr),+ $(,)? ) {
49    // Funnel through an internal macro not to leak implementation details.
50    join_internal! {
51        current_position: []
52        futures_and_positions: []
53        munching: [ $($fut)+ ]
54    }
55}
56
57// FIXME(danielhenrymantilla): a private macro should need no stability guarantee.
58#[unstable(feature = "future_join", issue = "91642")]
59/// To be able to *name* the i-th future in the tuple (say we want the .4-th),
60/// the following trick will be used: `let (_, _, _, _, it, ..) = tuple;`
61/// In order to do that, we need to generate a `i`-long repetition of `_`,
62/// for each i-th fut. Hence the recursive muncher approach.
63macro join_internal {
64    // Recursion step: map each future with its "position" (underscore count).
65    (
66        // Accumulate a token for each future that has been expanded: "_ _ _".
67        current_position: [
68            $($underscores:tt)*
69        ]
70        // Accumulate Futures and their positions in the tuple: `_0th ()   _1st ( _ ) …`.
71        futures_and_positions: [
72            $($acc:tt)*
73        ]
74        // Munch one future.
75        munching: [
76            $current:tt
77            $($rest:tt)*
78        ]
79    ) => (
80        join_internal! {
81            current_position: [
82                $($underscores)*
83                _
84            ]
85            futures_and_positions: [
86                $($acc)*
87                $current ( $($underscores)* )
88            ]
89            munching: [
90                $($rest)*
91            ]
92        }
93    ),
94
95    // End of recursion: generate the output future.
96    (
97        current_position: $_:tt
98        futures_and_positions: [
99            $(
100                $fut_expr:tt ( $($pos:tt)* )
101            )*
102        ]
103        // Nothing left to munch.
104        munching: []
105    ) => (
106        match ( $( MaybeDone::Future($fut_expr), )* ) { futures => async {
107            let mut futures = futures;
108            // SAFETY: this is `pin_mut!`.
109            let mut futures = unsafe { Pin::new_unchecked(&mut futures) };
110            poll_fn(move |cx| {
111                let mut done = true;
112                // For each `fut`, pin-project to it, and poll it.
113                $(
114                    // SAFETY: pinning projection
115                    let fut = unsafe {
116                        futures.as_mut().map_unchecked_mut(|it| {
117                            let ( $($pos,)* fut, .. ) = it;
118                            fut
119                        })
120                    };
121                    // Despite how tempting it may be to `let () = ready!(fut.poll(cx));`
122                    // doing so would defeat the point of `join!`: to start polling eagerly all
123                    // of the futures, to allow parallelizing the waits.
124                    done &= fut.poll(cx).is_ready();
125                )*
126                if !done {
127                    return Poll::Pending;
128                }
129                // All ready; time to extract all the outputs.
130
131                // SAFETY: `.take_output()` does not break the `Pin` invariants for that `fut`.
132                let futures = unsafe {
133                    futures.as_mut().get_unchecked_mut()
134                };
135                Poll::Ready(
136                    ($(
137                        {
138                            let ( $($pos,)* fut, .. ) = &mut *futures;
139                            fut.take_output().unwrap()
140                        }
141                    ),*) // <- no trailing comma since we don't want 1-tuples.
142                )
143            }).await
144        }}
145    ),
146}
147
148/// Future used by `join!` that stores it's output to
149/// be later taken and doesn't panic when polled after ready.
150///
151/// This type is public in a private module for use by the macro.
152#[allow(missing_debug_implementations)]
153#[unstable(feature = "future_join", issue = "91642")]
154pub enum MaybeDone<F: Future> {
155    Future(F),
156    Done(F::Output),
157    Taken,
158}
159
160#[unstable(feature = "future_join", issue = "91642")]
161impl<F: Future> MaybeDone<F> {
162    pub fn take_output(&mut self) -> Option<F::Output> {
163        match *self {
164            MaybeDone::Done(_) => match mem::replace(self, Self::Taken) {
165                MaybeDone::Done(val) => Some(val),
166                _ => unreachable!(),
167            },
168            _ => None,
169        }
170    }
171}
172
173#[unstable(feature = "future_join", issue = "91642")]
174impl<F: Future> Future for MaybeDone<F> {
175    type Output = ();
176
177    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
178        // SAFETY: pinning in structural for `f`
179        unsafe {
180            // Do not mix match ergonomics with unsafe.
181            match *self.as_mut().get_unchecked_mut() {
182                MaybeDone::Future(ref mut f) => {
183                    let val = ready!(Pin::new_unchecked(f).poll(cx));
184                    self.set(Self::Done(val));
185                }
186                MaybeDone::Done(_) => {}
187                MaybeDone::Taken => unreachable!(),
188            }
189        }
190
191        Poll::Ready(())
192    }
193}