core/future/join.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
#![allow(unused_imports, unused_macros)] // items are used by the macro
use crate::cell::UnsafeCell;
use crate::future::{Future, poll_fn};
use crate::mem;
use crate::pin::Pin;
use crate::task::{Context, Poll, ready};
/// Polls multiple futures simultaneously, returning a tuple
/// of all results once complete.
///
/// While `join!(a, b).await` is similar to `(a.await, b.await)`,
/// `join!` polls both futures concurrently and is therefore more efficient.
///
/// # Examples
///
/// ```
/// #![feature(future_join)]
///
/// use std::future::join;
///
/// async fn one() -> usize { 1 }
/// async fn two() -> usize { 2 }
///
/// # let _ = async {
/// let x = join!(one(), two()).await;
/// assert_eq!(x, (1, 2));
/// # };
/// ```
///
/// `join!` is variadic, so you can pass any number of futures:
///
/// ```
/// #![feature(future_join)]
///
/// use std::future::join;
///
/// async fn one() -> usize { 1 }
/// async fn two() -> usize { 2 }
/// async fn three() -> usize { 3 }
///
/// # let _ = async {
/// let x = join!(one(), two(), three()).await;
/// assert_eq!(x, (1, 2, 3));
/// # };
/// ```
#[unstable(feature = "future_join", issue = "91642")]
pub macro join( $($fut:expr),+ $(,)? ) {
// Funnel through an internal macro not to leak implementation details.
join_internal! {
current_position: []
futures_and_positions: []
munching: [ $($fut)+ ]
}
}
// FIXME(danielhenrymantilla): a private macro should need no stability guarantee.
#[unstable(feature = "future_join", issue = "91642")]
/// To be able to *name* the i-th future in the tuple (say we want the .4-th),
/// the following trick will be used: `let (_, _, _, _, it, ..) = tuple;`
/// In order to do that, we need to generate a `i`-long repetition of `_`,
/// for each i-th fut. Hence the recursive muncher approach.
macro join_internal {
// Recursion step: map each future with its "position" (underscore count).
(
// Accumulate a token for each future that has been expanded: "_ _ _".
current_position: [
$($underscores:tt)*
]
// Accumulate Futures and their positions in the tuple: `_0th () _1st ( _ ) …`.
futures_and_positions: [
$($acc:tt)*
]
// Munch one future.
munching: [
$current:tt
$($rest:tt)*
]
) => (
join_internal! {
current_position: [
$($underscores)*
_
]
futures_and_positions: [
$($acc)*
$current ( $($underscores)* )
]
munching: [
$($rest)*
]
}
),
// End of recursion: generate the output future.
(
current_position: $_:tt
futures_and_positions: [
$(
$fut_expr:tt ( $($pos:tt)* )
)*
]
// Nothing left to munch.
munching: []
) => (
match ( $( MaybeDone::Future($fut_expr), )* ) { futures => async {
let mut futures = futures;
// SAFETY: this is `pin_mut!`.
let mut futures = unsafe { Pin::new_unchecked(&mut futures) };
poll_fn(move |cx| {
let mut done = true;
// For each `fut`, pin-project to it, and poll it.
$(
// SAFETY: pinning projection
let fut = unsafe {
futures.as_mut().map_unchecked_mut(|it| {
let ( $($pos,)* fut, .. ) = it;
fut
})
};
// Despite how tempting it may be to `let () = ready!(fut.poll(cx));`
// doing so would defeat the point of `join!`: to start polling eagerly all
// of the futures, to allow parallelizing the waits.
done &= fut.poll(cx).is_ready();
)*
if !done {
return Poll::Pending;
}
// All ready; time to extract all the outputs.
// SAFETY: `.take_output()` does not break the `Pin` invariants for that `fut`.
let futures = unsafe {
futures.as_mut().get_unchecked_mut()
};
Poll::Ready(
($(
{
let ( $($pos,)* fut, .. ) = &mut *futures;
fut.take_output().unwrap()
}
),*) // <- no trailing comma since we don't want 1-tuples.
)
}).await
}}
),
}
/// Future used by `join!` that stores it's output to
/// be later taken and doesn't panic when polled after ready.
///
/// This type is public in a private module for use by the macro.
#[allow(missing_debug_implementations)]
#[unstable(feature = "future_join", issue = "91642")]
pub enum MaybeDone<F: Future> {
Future(F),
Done(F::Output),
Taken,
}
#[unstable(feature = "future_join", issue = "91642")]
impl<F: Future> MaybeDone<F> {
pub fn take_output(&mut self) -> Option<F::Output> {
match *self {
MaybeDone::Done(_) => match mem::replace(self, Self::Taken) {
MaybeDone::Done(val) => Some(val),
_ => unreachable!(),
},
_ => None,
}
}
}
#[unstable(feature = "future_join", issue = "91642")]
impl<F: Future> Future for MaybeDone<F> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// SAFETY: pinning in structural for `f`
unsafe {
// Do not mix match ergonomics with unsafe.
match *self.as_mut().get_unchecked_mut() {
MaybeDone::Future(ref mut f) => {
let val = ready!(Pin::new_unchecked(f).poll(cx));
self.set(Self::Done(val));
}
MaybeDone::Done(_) => {}
MaybeDone::Taken => unreachable!(),
}
}
Poll::Ready(())
}
}