std/sys/sync/rwlock/futex.rs
1use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
2use crate::sys::futex::{Futex, Primitive, futex_wait, futex_wake, futex_wake_all};
3
4pub struct RwLock {
5 // The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
6 // Bits 0..30:
7 // 0: Unlocked
8 // 1..=0x3FFF_FFFE: Locked by N readers
9 // 0x3FFF_FFFF: Write locked
10 // Bit 30: Readers are waiting on this futex.
11 // Bit 31: Writers are waiting on the writer_notify futex.
12 state: Futex,
13 // The 'condition variable' to notify writers through.
14 // Incremented on every signal.
15 writer_notify: Futex,
16}
17
18const READ_LOCKED: Primitive = 1;
19const MASK: Primitive = (1 << 30) - 1;
20const WRITE_LOCKED: Primitive = MASK;
21const DOWNGRADE: Primitive = READ_LOCKED.wrapping_sub(WRITE_LOCKED); // READ_LOCKED - WRITE_LOCKED
22const MAX_READERS: Primitive = MASK - 1;
23const READERS_WAITING: Primitive = 1 << 30;
24const WRITERS_WAITING: Primitive = 1 << 31;
25
26#[inline]
27fn is_unlocked(state: Primitive) -> bool {
28 state & MASK == 0
29}
30
31#[inline]
32fn is_write_locked(state: Primitive) -> bool {
33 state & MASK == WRITE_LOCKED
34}
35
36#[inline]
37fn has_readers_waiting(state: Primitive) -> bool {
38 state & READERS_WAITING != 0
39}
40
41#[inline]
42fn has_writers_waiting(state: Primitive) -> bool {
43 state & WRITERS_WAITING != 0
44}
45
46#[inline]
47fn is_read_lockable(state: Primitive) -> bool {
48 // This also returns false if the counter could overflow if we tried to read lock it.
49 //
50 // We don't allow read-locking if there's readers waiting, even if the lock is unlocked
51 // and there's no writers waiting. The only situation when this happens is after unlocking,
52 // at which point the unlocking thread might be waking up writers, which have priority over readers.
53 // The unlocking thread will clear the readers waiting bit and wake up readers, if necessary.
54 state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
55}
56
57#[inline]
58fn is_read_lockable_after_wakeup(state: Primitive) -> bool {
59 // We make a special case for checking if we can read-lock _after_ a reader thread that went to
60 // sleep has been woken up by a call to `downgrade`.
61 //
62 // `downgrade` will wake up all readers and place the lock in read mode. Thus, there should be
63 // no readers waiting and the lock should be read-locked (not write-locked or unlocked).
64 //
65 // Note that we do not check if any writers are waiting. This is because a call to `downgrade`
66 // implies that the caller wants other readers to read the value protected by the lock. If we
67 // did not allow readers to acquire the lock before writers after a `downgrade`, then only the
68 // original writer would be able to read the value, thus defeating the purpose of `downgrade`.
69 state & MASK < MAX_READERS
70 && !has_readers_waiting(state)
71 && !is_write_locked(state)
72 && !is_unlocked(state)
73}
74
75#[inline]
76fn has_reached_max_readers(state: Primitive) -> bool {
77 state & MASK == MAX_READERS
78}
79
80impl RwLock {
81 #[inline]
82 pub const fn new() -> Self {
83 Self { state: Futex::new(0), writer_notify: Futex::new(0) }
84 }
85
86 #[inline]
87 pub fn try_read(&self) -> bool {
88 self.state
89 .fetch_update(Acquire, Relaxed, |s| is_read_lockable(s).then(|| s + READ_LOCKED))
90 .is_ok()
91 }
92
93 #[inline]
94 pub fn read(&self) {
95 let state = self.state.load(Relaxed);
96 if !is_read_lockable(state)
97 || self
98 .state
99 .compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
100 .is_err()
101 {
102 self.read_contended();
103 }
104 }
105
106 /// # Safety
107 ///
108 /// The `RwLock` must be read-locked (N readers) in order to call this.
109 #[inline]
110 pub unsafe fn read_unlock(&self) {
111 let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;
112
113 // It's impossible for a reader to be waiting on a read-locked RwLock,
114 // except if there is also a writer waiting.
115 debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));
116
117 // Wake up a writer if we were the last reader and there's a writer waiting.
118 if is_unlocked(state) && has_writers_waiting(state) {
119 self.wake_writer_or_readers(state);
120 }
121 }
122
123 #[cold]
124 fn read_contended(&self) {
125 let mut has_slept = false;
126 let mut state = self.spin_read();
127
128 loop {
129 // If we have just been woken up, first check for a `downgrade` call.
130 // Otherwise, if we can read-lock it, lock it.
131 if (has_slept && is_read_lockable_after_wakeup(state)) || is_read_lockable(state) {
132 match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
133 {
134 Ok(_) => return, // Locked!
135 Err(s) => {
136 state = s;
137 continue;
138 }
139 }
140 }
141
142 // Check for overflow.
143 assert!(!has_reached_max_readers(state), "too many active read locks on RwLock");
144
145 // Make sure the readers waiting bit is set before we go to sleep.
146 if !has_readers_waiting(state) {
147 if let Err(s) =
148 self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
149 {
150 state = s;
151 continue;
152 }
153 }
154
155 // Wait for the state to change.
156 futex_wait(&self.state, state | READERS_WAITING, None);
157 has_slept = true;
158
159 // Spin again after waking up.
160 state = self.spin_read();
161 }
162 }
163
164 #[inline]
165 pub fn try_write(&self) -> bool {
166 self.state
167 .fetch_update(Acquire, Relaxed, |s| is_unlocked(s).then(|| s + WRITE_LOCKED))
168 .is_ok()
169 }
170
171 #[inline]
172 pub fn write(&self) {
173 if self.state.compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed).is_err() {
174 self.write_contended();
175 }
176 }
177
178 /// # Safety
179 ///
180 /// The `RwLock` must be write-locked (single writer) in order to call this.
181 #[inline]
182 pub unsafe fn write_unlock(&self) {
183 let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;
184
185 debug_assert!(is_unlocked(state));
186
187 if has_writers_waiting(state) || has_readers_waiting(state) {
188 self.wake_writer_or_readers(state);
189 }
190 }
191
192 /// # Safety
193 ///
194 /// The `RwLock` must be write-locked (single writer) in order to call this.
195 #[inline]
196 pub unsafe fn downgrade(&self) {
197 // Removes all write bits and adds a single read bit.
198 let state = self.state.fetch_add(DOWNGRADE, Release);
199 debug_assert!(is_write_locked(state), "RwLock must be write locked to call `downgrade`");
200
201 if has_readers_waiting(state) {
202 // Since we had the exclusive lock, nobody else can unset this bit.
203 self.state.fetch_sub(READERS_WAITING, Relaxed);
204 futex_wake_all(&self.state);
205 }
206 }
207
208 #[cold]
209 fn write_contended(&self) {
210 let mut state = self.spin_write();
211
212 let mut other_writers_waiting = 0;
213
214 loop {
215 // If it's unlocked, we try to lock it.
216 if is_unlocked(state) {
217 match self.state.compare_exchange_weak(
218 state,
219 state | WRITE_LOCKED | other_writers_waiting,
220 Acquire,
221 Relaxed,
222 ) {
223 Ok(_) => return, // Locked!
224 Err(s) => {
225 state = s;
226 continue;
227 }
228 }
229 }
230
231 // Set the waiting bit indicating that we're waiting on it.
232 if !has_writers_waiting(state) {
233 if let Err(s) =
234 self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
235 {
236 state = s;
237 continue;
238 }
239 }
240
241 // Other writers might be waiting now too, so we should make sure
242 // we keep that bit on once we manage lock it.
243 other_writers_waiting = WRITERS_WAITING;
244
245 // Examine the notification counter before we check if `state` has changed,
246 // to make sure we don't miss any notifications.
247 let seq = self.writer_notify.load(Acquire);
248
249 // Don't go to sleep if the lock has become available,
250 // or if the writers waiting bit is no longer set.
251 state = self.state.load(Relaxed);
252 if is_unlocked(state) || !has_writers_waiting(state) {
253 continue;
254 }
255
256 // Wait for the state to change.
257 futex_wait(&self.writer_notify, seq, None);
258
259 // Spin again after waking up.
260 state = self.spin_write();
261 }
262 }
263
264 /// Wakes up waiting threads after unlocking.
265 ///
266 /// If both are waiting, this will wake up only one writer, but will fall
267 /// back to waking up readers if there was no writer to wake up.
268 #[cold]
269 fn wake_writer_or_readers(&self, mut state: Primitive) {
270 assert!(is_unlocked(state));
271
272 // The readers waiting bit might be turned on at any point now,
273 // since readers will block when there's anything waiting.
274 // Writers will just lock the lock though, regardless of the waiting bits,
275 // so we don't have to worry about the writer waiting bit.
276 //
277 // If the lock gets locked in the meantime, we don't have to do
278 // anything, because then the thread that locked the lock will take
279 // care of waking up waiters when it unlocks.
280
281 // If only writers are waiting, wake one of them up.
282 if state == WRITERS_WAITING {
283 match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
284 Ok(_) => {
285 self.wake_writer();
286 return;
287 }
288 Err(s) => {
289 // Maybe some readers are now waiting too. So, continue to the next `if`.
290 state = s;
291 }
292 }
293 }
294
295 // If both writers and readers are waiting, leave the readers waiting
296 // and only wake up one writer.
297 if state == READERS_WAITING + WRITERS_WAITING {
298 if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() {
299 // The lock got locked. Not our problem anymore.
300 return;
301 }
302 if self.wake_writer() {
303 return;
304 }
305 // No writers were actually blocked on futex_wait, so we continue
306 // to wake up readers instead, since we can't be sure if we notified a writer.
307 state = READERS_WAITING;
308 }
309
310 // If readers are waiting, wake them all up.
311 if state == READERS_WAITING {
312 if self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok() {
313 futex_wake_all(&self.state);
314 }
315 }
316 }
317
318 /// This wakes one writer and returns true if we woke up a writer that was
319 /// blocked on futex_wait.
320 ///
321 /// If this returns false, it might still be the case that we notified a
322 /// writer that was about to go to sleep.
323 fn wake_writer(&self) -> bool {
324 self.writer_notify.fetch_add(1, Release);
325 futex_wake(&self.writer_notify)
326 // Note that FreeBSD and DragonFlyBSD don't tell us whether they woke
327 // up any threads or not, and always return `false` here. That still
328 // results in correct behavior: it just means readers get woken up as
329 // well in case both readers and writers were waiting.
330 }
331
332 /// Spin for a while, but stop directly at the given condition.
333 #[inline]
334 fn spin_until(&self, f: impl Fn(Primitive) -> bool) -> Primitive {
335 let mut spin = 100; // Chosen by fair dice roll.
336 loop {
337 let state = self.state.load(Relaxed);
338 if f(state) || spin == 0 {
339 return state;
340 }
341 crate::hint::spin_loop();
342 spin -= 1;
343 }
344 }
345
346 #[inline]
347 fn spin_write(&self) -> Primitive {
348 // Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
349 self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
350 }
351
352 #[inline]
353 fn spin_read(&self) -> Primitive {
354 // Stop spinning when it's unlocked or read locked, or when there's waiting threads.
355 self.spin_until(|state| {
356 !is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
357 })
358 }
359}