std/sys/sync/rwlock/
futex.rs

1use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
2use crate::sys::futex::{Futex, Primitive, futex_wait, futex_wake, futex_wake_all};
3
4pub struct RwLock {
5    // The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
6    // Bits 0..30:
7    //   0: Unlocked
8    //   1..=0x3FFF_FFFE: Locked by N readers
9    //   0x3FFF_FFFF: Write locked
10    // Bit 30: Readers are waiting on this futex.
11    // Bit 31: Writers are waiting on the writer_notify futex.
12    state: Futex,
13    // The 'condition variable' to notify writers through.
14    // Incremented on every signal.
15    writer_notify: Futex,
16}
17
18const READ_LOCKED: Primitive = 1;
19const MASK: Primitive = (1 << 30) - 1;
20const WRITE_LOCKED: Primitive = MASK;
21const DOWNGRADE: Primitive = READ_LOCKED.wrapping_sub(WRITE_LOCKED); // READ_LOCKED - WRITE_LOCKED
22const MAX_READERS: Primitive = MASK - 1;
23const READERS_WAITING: Primitive = 1 << 30;
24const WRITERS_WAITING: Primitive = 1 << 31;
25
26#[inline]
27fn is_unlocked(state: Primitive) -> bool {
28    state & MASK == 0
29}
30
31#[inline]
32fn is_write_locked(state: Primitive) -> bool {
33    state & MASK == WRITE_LOCKED
34}
35
36#[inline]
37fn has_readers_waiting(state: Primitive) -> bool {
38    state & READERS_WAITING != 0
39}
40
41#[inline]
42fn has_writers_waiting(state: Primitive) -> bool {
43    state & WRITERS_WAITING != 0
44}
45
46#[inline]
47fn is_read_lockable(state: Primitive) -> bool {
48    // This also returns false if the counter could overflow if we tried to read lock it.
49    //
50    // We don't allow read-locking if there's readers waiting, even if the lock is unlocked
51    // and there's no writers waiting. The only situation when this happens is after unlocking,
52    // at which point the unlocking thread might be waking up writers, which have priority over readers.
53    // The unlocking thread will clear the readers waiting bit and wake up readers, if necessary.
54    state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
55}
56
57#[inline]
58fn is_read_lockable_after_wakeup(state: Primitive) -> bool {
59    // We make a special case for checking if we can read-lock _after_ a reader thread that went to
60    // sleep has been woken up by a call to `downgrade`.
61    //
62    // `downgrade` will wake up all readers and place the lock in read mode. Thus, there should be
63    // no readers waiting and the lock should be read-locked (not write-locked or unlocked).
64    //
65    // Note that we do not check if any writers are waiting. This is because a call to `downgrade`
66    // implies that the caller wants other readers to read the value protected by the lock. If we
67    // did not allow readers to acquire the lock before writers after a `downgrade`, then only the
68    // original writer would be able to read the value, thus defeating the purpose of `downgrade`.
69    state & MASK < MAX_READERS
70        && !has_readers_waiting(state)
71        && !is_write_locked(state)
72        && !is_unlocked(state)
73}
74
75#[inline]
76fn has_reached_max_readers(state: Primitive) -> bool {
77    state & MASK == MAX_READERS
78}
79
80impl RwLock {
81    #[inline]
82    pub const fn new() -> Self {
83        Self { state: Futex::new(0), writer_notify: Futex::new(0) }
84    }
85
86    #[inline]
87    pub fn try_read(&self) -> bool {
88        self.state
89            .fetch_update(Acquire, Relaxed, |s| is_read_lockable(s).then(|| s + READ_LOCKED))
90            .is_ok()
91    }
92
93    #[inline]
94    pub fn read(&self) {
95        let state = self.state.load(Relaxed);
96        if !is_read_lockable(state)
97            || self
98                .state
99                .compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
100                .is_err()
101        {
102            self.read_contended();
103        }
104    }
105
106    /// # Safety
107    ///
108    /// The `RwLock` must be read-locked (N readers) in order to call this.
109    #[inline]
110    pub unsafe fn read_unlock(&self) {
111        let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;
112
113        // It's impossible for a reader to be waiting on a read-locked RwLock,
114        // except if there is also a writer waiting.
115        debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));
116
117        // Wake up a writer if we were the last reader and there's a writer waiting.
118        if is_unlocked(state) && has_writers_waiting(state) {
119            self.wake_writer_or_readers(state);
120        }
121    }
122
123    #[cold]
124    fn read_contended(&self) {
125        let mut has_slept = false;
126        let mut state = self.spin_read();
127
128        loop {
129            // If we have just been woken up, first check for a `downgrade` call.
130            // Otherwise, if we can read-lock it, lock it.
131            if (has_slept && is_read_lockable_after_wakeup(state)) || is_read_lockable(state) {
132                match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
133                {
134                    Ok(_) => return, // Locked!
135                    Err(s) => {
136                        state = s;
137                        continue;
138                    }
139                }
140            }
141
142            // Check for overflow.
143            assert!(!has_reached_max_readers(state), "too many active read locks on RwLock");
144
145            // Make sure the readers waiting bit is set before we go to sleep.
146            if !has_readers_waiting(state) {
147                if let Err(s) =
148                    self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
149                {
150                    state = s;
151                    continue;
152                }
153            }
154
155            // Wait for the state to change.
156            futex_wait(&self.state, state | READERS_WAITING, None);
157            has_slept = true;
158
159            // Spin again after waking up.
160            state = self.spin_read();
161        }
162    }
163
164    #[inline]
165    pub fn try_write(&self) -> bool {
166        self.state
167            .fetch_update(Acquire, Relaxed, |s| is_unlocked(s).then(|| s + WRITE_LOCKED))
168            .is_ok()
169    }
170
171    #[inline]
172    pub fn write(&self) {
173        if self.state.compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed).is_err() {
174            self.write_contended();
175        }
176    }
177
178    /// # Safety
179    ///
180    /// The `RwLock` must be write-locked (single writer) in order to call this.
181    #[inline]
182    pub unsafe fn write_unlock(&self) {
183        let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;
184
185        debug_assert!(is_unlocked(state));
186
187        if has_writers_waiting(state) || has_readers_waiting(state) {
188            self.wake_writer_or_readers(state);
189        }
190    }
191
192    /// # Safety
193    ///
194    /// The `RwLock` must be write-locked (single writer) in order to call this.
195    #[inline]
196    pub unsafe fn downgrade(&self) {
197        // Removes all write bits and adds a single read bit.
198        let state = self.state.fetch_add(DOWNGRADE, Release);
199        debug_assert!(is_write_locked(state), "RwLock must be write locked to call `downgrade`");
200
201        if has_readers_waiting(state) {
202            // Since we had the exclusive lock, nobody else can unset this bit.
203            self.state.fetch_sub(READERS_WAITING, Relaxed);
204            futex_wake_all(&self.state);
205        }
206    }
207
208    #[cold]
209    fn write_contended(&self) {
210        let mut state = self.spin_write();
211
212        let mut other_writers_waiting = 0;
213
214        loop {
215            // If it's unlocked, we try to lock it.
216            if is_unlocked(state) {
217                match self.state.compare_exchange_weak(
218                    state,
219                    state | WRITE_LOCKED | other_writers_waiting,
220                    Acquire,
221                    Relaxed,
222                ) {
223                    Ok(_) => return, // Locked!
224                    Err(s) => {
225                        state = s;
226                        continue;
227                    }
228                }
229            }
230
231            // Set the waiting bit indicating that we're waiting on it.
232            if !has_writers_waiting(state) {
233                if let Err(s) =
234                    self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
235                {
236                    state = s;
237                    continue;
238                }
239            }
240
241            // Other writers might be waiting now too, so we should make sure
242            // we keep that bit on once we manage lock it.
243            other_writers_waiting = WRITERS_WAITING;
244
245            // Examine the notification counter before we check if `state` has changed,
246            // to make sure we don't miss any notifications.
247            let seq = self.writer_notify.load(Acquire);
248
249            // Don't go to sleep if the lock has become available,
250            // or if the writers waiting bit is no longer set.
251            state = self.state.load(Relaxed);
252            if is_unlocked(state) || !has_writers_waiting(state) {
253                continue;
254            }
255
256            // Wait for the state to change.
257            futex_wait(&self.writer_notify, seq, None);
258
259            // Spin again after waking up.
260            state = self.spin_write();
261        }
262    }
263
264    /// Wakes up waiting threads after unlocking.
265    ///
266    /// If both are waiting, this will wake up only one writer, but will fall
267    /// back to waking up readers if there was no writer to wake up.
268    #[cold]
269    fn wake_writer_or_readers(&self, mut state: Primitive) {
270        assert!(is_unlocked(state));
271
272        // The readers waiting bit might be turned on at any point now,
273        // since readers will block when there's anything waiting.
274        // Writers will just lock the lock though, regardless of the waiting bits,
275        // so we don't have to worry about the writer waiting bit.
276        //
277        // If the lock gets locked in the meantime, we don't have to do
278        // anything, because then the thread that locked the lock will take
279        // care of waking up waiters when it unlocks.
280
281        // If only writers are waiting, wake one of them up.
282        if state == WRITERS_WAITING {
283            match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
284                Ok(_) => {
285                    self.wake_writer();
286                    return;
287                }
288                Err(s) => {
289                    // Maybe some readers are now waiting too. So, continue to the next `if`.
290                    state = s;
291                }
292            }
293        }
294
295        // If both writers and readers are waiting, leave the readers waiting
296        // and only wake up one writer.
297        if state == READERS_WAITING + WRITERS_WAITING {
298            if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() {
299                // The lock got locked. Not our problem anymore.
300                return;
301            }
302            if self.wake_writer() {
303                return;
304            }
305            // No writers were actually blocked on futex_wait, so we continue
306            // to wake up readers instead, since we can't be sure if we notified a writer.
307            state = READERS_WAITING;
308        }
309
310        // If readers are waiting, wake them all up.
311        if state == READERS_WAITING {
312            if self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok() {
313                futex_wake_all(&self.state);
314            }
315        }
316    }
317
318    /// This wakes one writer and returns true if we woke up a writer that was
319    /// blocked on futex_wait.
320    ///
321    /// If this returns false, it might still be the case that we notified a
322    /// writer that was about to go to sleep.
323    fn wake_writer(&self) -> bool {
324        self.writer_notify.fetch_add(1, Release);
325        futex_wake(&self.writer_notify)
326        // Note that FreeBSD and DragonFlyBSD don't tell us whether they woke
327        // up any threads or not, and always return `false` here. That still
328        // results in correct behavior: it just means readers get woken up as
329        // well in case both readers and writers were waiting.
330    }
331
332    /// Spin for a while, but stop directly at the given condition.
333    #[inline]
334    fn spin_until(&self, f: impl Fn(Primitive) -> bool) -> Primitive {
335        let mut spin = 100; // Chosen by fair dice roll.
336        loop {
337            let state = self.state.load(Relaxed);
338            if f(state) || spin == 0 {
339                return state;
340            }
341            crate::hint::spin_loop();
342            spin -= 1;
343        }
344    }
345
346    #[inline]
347    fn spin_write(&self) -> Primitive {
348        // Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
349        self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
350    }
351
352    #[inline]
353    fn spin_read(&self) -> Primitive {
354        // Stop spinning when it's unlocked or read locked, or when there's waiting threads.
355        self.spin_until(|state| {
356            !is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
357        })
358    }
359}