std/sys/sync/thread_parking/
futex.rs

1#![forbid(unsafe_op_in_unsafe_fn)]
2use crate::pin::Pin;
3use crate::sync::atomic::Ordering::{Acquire, Release};
4use crate::sys::futex::{self, futex_wait, futex_wake};
5use crate::time::Duration;
6
7type Futex = futex::SmallFutex;
8type State = futex::SmallPrimitive;
9
10const PARKED: State = State::MAX;
11const EMPTY: State = 0;
12const NOTIFIED: State = 1;
13
14pub struct Parker {
15    state: Futex,
16}
17
18// Notes about memory ordering:
19//
20// Memory ordering is only relevant for the relative ordering of operations
21// between different variables. Even Ordering::Relaxed guarantees a
22// monotonic/consistent order when looking at just a single atomic variable.
23//
24// So, since this parker is just a single atomic variable, we only need to look
25// at the ordering guarantees we need to provide to the 'outside world'.
26//
27// The only memory ordering guarantee that parking and unparking provide, is
28// that things which happened before unpark() are visible on the thread
29// returning from park() afterwards. Otherwise, it was effectively unparked
30// before unpark() was called while still consuming the 'token'.
31//
32// In other words, unpark() needs to synchronize with the part of park() that
33// consumes the token and returns.
34//
35// This is done with a release-acquire synchronization, by using
36// Ordering::Release when writing NOTIFIED (the 'token') in unpark(), and using
37// Ordering::Acquire when checking for this state in park().
38impl Parker {
39    /// Constructs the futex parker. The UNIX parker implementation
40    /// requires this to happen in-place.
41    pub unsafe fn new_in_place(parker: *mut Parker) {
42        unsafe { parker.write(Self { state: Futex::new(EMPTY) }) };
43    }
44
45    // Assumes this is only called by the thread that owns the Parker,
46    // which means that `self.state != PARKED`.
47    pub unsafe fn park(self: Pin<&Self>) {
48        // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
49        // first case.
50        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
51            return;
52        }
53        loop {
54            // Wait for something to happen, assuming it's still set to PARKED.
55            futex_wait(&self.state, PARKED, None);
56            // Change NOTIFIED=>EMPTY and return in that case.
57            if self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Acquire).is_ok() {
58                return;
59            } else {
60                // Spurious wake up. We loop to try again.
61            }
62        }
63    }
64
65    // Assumes this is only called by the thread that owns the Parker,
66    // which means that `self.state != PARKED`. This implementation doesn't
67    // require `Pin`, but other implementations do.
68    pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) {
69        // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and directly return in the
70        // first case.
71        if self.state.fetch_sub(1, Acquire) == NOTIFIED {
72            return;
73        }
74        // Wait for something to happen, assuming it's still set to PARKED.
75        futex_wait(&self.state, PARKED, Some(timeout));
76        // This is not just a store, because we need to establish a
77        // release-acquire ordering with unpark().
78        if self.state.swap(EMPTY, Acquire) == NOTIFIED {
79            // Woke up because of unpark().
80        } else {
81            // Timeout or spurious wake up.
82            // We return either way, because we can't easily tell if it was the
83            // timeout or not.
84        }
85    }
86
87    // This implementation doesn't require `Pin`, but other implementations do.
88    #[inline]
89    pub fn unpark(self: Pin<&Self>) {
90        // Change PARKED=>NOTIFIED, EMPTY=>NOTIFIED, or NOTIFIED=>NOTIFIED, and
91        // wake the thread in the first case.
92        //
93        // Note that even NOTIFIED=>NOTIFIED results in a write. This is on
94        // purpose, to make sure every unpark() has a release-acquire ordering
95        // with park().
96        if self.state.swap(NOTIFIED, Release) == PARKED {
97            futex_wake(&self.state);
98        }
99    }
100}