wasmtime_runtime/
parking_spot.rs

1//! Implements thread wait and notify primitives with `std::sync` primitives.
2//!
3//! This is a simplified version of the `parking_lot_core` crate.
4//!
5//! There are two main operations that can be performed:
6//!
7//! - *Parking* refers to suspending the thread while simultaneously enqueuing it
8//! on a queue keyed by some address.
9//! - *Unparking* refers to dequeuing a thread from a queue keyed by some address
10//! and resuming it.
11
12#![deny(clippy::all)]
13#![deny(clippy::pedantic)]
14#![deny(missing_docs)]
15#![deny(unsafe_code)]
16
17use crate::WaitResult;
18use std::collections::BTreeMap;
19use std::sync::{Arc, Condvar, Mutex};
20use std::time::Instant;
21
22#[derive(Default, Debug)]
23struct Spot {
24    /// The number of threads parked on this spot.
25    num_parked: u32,
26
27    /// The number of threads that have been unparked but not yet woken up.
28    /// This is used to avoid spurious wakeups.
29    to_unpark: u32,
30
31    /// The [`Condvar`] used to notify parked threads.
32    cvar: Arc<Condvar>,
33}
34
35/// The thread global `ParkingSpot`.
36#[derive(Default, Debug)]
37pub struct ParkingSpot {
38    inner: Mutex<BTreeMap<u64, Spot>>,
39}
40
41impl ParkingSpot {
42    /// Park the current thread until it is unparked or a timeout is reached.
43    ///
44    /// The `key` is used to identify the parking spot. If another thread calls
45    /// `unpark_all` or `unpark` with the same key, the current thread will be unparked.
46    ///
47    /// The `validate` callback is called before parking.
48    /// If it returns `false`, the thread is not parked and `WaitResult::Mismatch` is returned.
49    ///
50    /// The `timeout` argument specifies the maximum amount of time the thread will be parked.
51    pub fn park(
52        &self,
53        key: u64,
54        validate: impl FnOnce() -> bool,
55        timeout: impl Into<Option<Instant>>,
56    ) -> WaitResult {
57        self.park_inner(key, validate, timeout.into())
58    }
59
60    fn park_inner(
61        &self,
62        key: u64,
63        validate: impl FnOnce() -> bool,
64        timeout: Option<Instant>,
65    ) -> WaitResult {
66        let mut inner = self
67            .inner
68            .lock()
69            .expect("failed to lock inner parking table");
70
71        // check validation with lock held
72        if !validate() {
73            return WaitResult::Mismatch;
74        }
75
76        // clone the condvar, so we can move the lock
77        let cvar = {
78            let spot = inner.entry(key).or_insert_with(Spot::default);
79            spot.num_parked = spot
80                .num_parked
81                .checked_add(1)
82                .expect("parking spot number overflow");
83            spot.cvar.clone()
84        };
85
86        loop {
87            let timed_out = if let Some(timeout) = timeout {
88                let now = Instant::now();
89                if now >= timeout {
90                    true
91                } else {
92                    let dur = timeout - now;
93                    let (lock, result) = cvar
94                        .wait_timeout(inner, dur)
95                        .expect("failed to wait for condition");
96                    inner = lock;
97                    result.timed_out()
98                }
99            } else {
100                inner = cvar.wait(inner).expect("failed to wait for condition");
101                false
102            };
103
104            let spot = inner.get_mut(&key).expect("failed to get spot");
105
106            if timed_out {
107                // If waiting on the cvar timed out then due to how system cvars
108                // are implemented we may need to continue to sleep longer. If
109                // the deadline has not been reached then turn the crank again
110                // and go back to sleep.
111                if Instant::now() < timeout.unwrap() {
112                    continue;
113                }
114
115                // Opportunistically consume `to_unpark` signals even on
116                // timeout. From the perspective of `unpark` this "agent" raced
117                // between its own timeout and receiving the unpark signal, but
118                // from unpark's perspective it's definitely going to wake up N
119                // agents as returned from the `unpark` return value.
120                //
121                // Note that this may actually prevent other threads from
122                // getting unparked. For example:
123                //
124                // * Thread A parks with a timeout
125                // * Thread B parks with no timeout
126                // * Thread C decides to unpark 1 thread
127                // * Thread A's cvar wakes up due to a timeout, blocks on the
128                //   lock
129                // * Thread C finishes unpark and signals the cvar once
130                // * Thread B wakes up
131                // * Thread A and B contend for the lock and A wins
132                // * A consumes the "to_unpark" value
133                // * B goes back to sleep since `to_unpark == 0`, thinking that
134                //   a spurious wakeup happened.
135                //
136                // It's believed that this is ok, however, since from C's
137                // perspective one agent was still woken up and is allowed to
138                // continue, notably A in this case. C doesn't know that A raced
139                // with B and "stole" its wakeup signal.
140                if spot.to_unpark > 0 {
141                    spot.to_unpark -= 1;
142                }
143            } else {
144                if spot.to_unpark == 0 {
145                    // If no timeout happen but nothing has unparked this spot (as
146                    // signaled through `to_unpark`) then this is indicative of a
147                    // spurious wakeup. In this situation turn the crank again and
148                    // go back to sleep as this interface doesn't allow for spurious
149                    // wakeups.
150                    continue;
151                }
152                // No timeout happened, and some other thread registered to
153                // unpark this thread, so consume one unpark notification.
154                spot.to_unpark -= 1;
155            }
156
157            spot.num_parked = spot
158                .num_parked
159                .checked_sub(1)
160                .expect("corrupted parking spot state");
161
162            if spot.num_parked == 0 {
163                assert_eq!(spot.to_unpark, 0);
164                inner
165                    .remove(&key)
166                    .expect("failed to remove spot from inner parking table");
167            }
168
169            if timed_out {
170                return WaitResult::TimedOut;
171            }
172
173            return WaitResult::Ok;
174        }
175    }
176
177    /// Unpark at most `n` threads that are parked with the given key.
178    ///
179    /// Returns the number of threads that were actually unparked.
180    pub fn unpark(&self, key: u64, n: u32) -> u32 {
181        if n == 0 {
182            return 0;
183        }
184        let mut num_unpark = 0;
185
186        self.with_lot(key, |spot| {
187            num_unpark = n.min(spot.num_parked - spot.to_unpark);
188            spot.to_unpark += num_unpark;
189            if n >= num_unpark {
190                spot.cvar.notify_all();
191            } else {
192                for _ in 0..num_unpark {
193                    spot.cvar.notify_one();
194                }
195            }
196        });
197
198        num_unpark
199    }
200
201    fn with_lot<F: FnMut(&mut Spot)>(&self, key: u64, mut f: F) {
202        let mut inner = self
203            .inner
204            .lock()
205            .expect("failed to lock inner parking table");
206        if let Some(spot) = inner.get_mut(&key) {
207            f(spot);
208        }
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::ParkingSpot;
215    use std::ptr::addr_of;
216    use std::sync::atomic::{AtomicU64, Ordering};
217    use std::thread;
218    use std::time::{Duration, Instant};
219
220    #[test]
221    fn atomic_wait_notify() {
222        let parking_spot = &ParkingSpot::default();
223        let atomic = &AtomicU64::new(0);
224
225        thread::scope(|s| {
226            let atomic_key = addr_of!(atomic) as u64;
227            let thread1 = s.spawn(move || {
228                atomic.store(1, Ordering::SeqCst);
229                parking_spot.unpark(atomic_key, u32::MAX);
230                parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 1, None);
231            });
232
233            let thread2 = s.spawn(move || {
234                while atomic.load(Ordering::SeqCst) != 1 {
235                    parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) != 1, None);
236                }
237                atomic.store(2, Ordering::SeqCst);
238                parking_spot.unpark(atomic_key, u32::MAX);
239                parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 2, None);
240            });
241
242            let thread3 = s.spawn(move || {
243                while atomic.load(Ordering::SeqCst) != 2 {
244                    parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) != 2, None);
245                }
246                atomic.store(3, Ordering::SeqCst);
247                parking_spot.unpark(atomic_key, u32::MAX);
248
249                parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 3, None);
250            });
251
252            while atomic.load(Ordering::SeqCst) != 3 {
253                parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) != 3, None);
254            }
255            atomic.store(4, Ordering::SeqCst);
256            parking_spot.unpark(atomic_key, u32::MAX);
257
258            thread1.join().unwrap();
259            thread2.join().unwrap();
260            thread3.join().unwrap();
261        });
262    }
263
264    mod parking_lot {
265        // This is a modified version of the parking_lot_core tests,
266        // which are licensed under the MIT and Apache 2.0 licenses.
267        use super::*;
268        use std::sync::atomic::{AtomicIsize, AtomicU32};
269        use std::sync::Arc;
270        use std::time::Duration;
271
272        macro_rules! test {
273            ( $( $name:ident(
274                repeats: $repeats:expr,
275                latches: $latches:expr,
276                delay: $delay:expr,
277                threads: $threads:expr,
278                single_unparks: $single_unparks:expr);
279            )* ) => {
280                $(
281                #[test]
282                fn $name() {
283                    if std::env::var("WASMTIME_TEST_NO_HOG_MEMORY").is_ok() {
284                        return;
285                    }
286                    let delay = Duration::from_micros($delay);
287                    for _ in 0..$repeats {
288                        run_parking_test($latches, delay, $threads, $single_unparks);
289                    }
290                })*
291            };
292        }
293
294        test! {
295            unpark_all_one_fast(
296                repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0
297            );
298            unpark_all_hundred_fast(
299                repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
300            );
301            unpark_one_one_fast(
302                repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
303            );
304            unpark_one_hundred_fast(
305                repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
306            );
307            unpark_one_fifty_then_fifty_all_fast(
308                repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
309            );
310            unpark_all_one(
311                repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
312            );
313            unpark_all_hundred(
314                repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
315            );
316            unpark_one_one(
317                repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
318            );
319            unpark_one_fifty(
320                repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
321            );
322            unpark_one_fifty_then_fifty_all(
323                repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
324            );
325            hundred_unpark_all_one_fast(
326                repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
327            );
328            hundred_unpark_all_one(
329                repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
330            );
331        }
332
333        fn run_parking_test(
334            num_latches: usize,
335            delay: Duration,
336            num_threads: u32,
337            num_single_unparks: u32,
338        ) {
339            let spot = ParkingSpot::default();
340
341            thread::scope(|s| {
342                let mut tests = Vec::with_capacity(num_latches);
343
344                for _ in 0..num_latches {
345                    let test = Arc::new(SingleLatchTest::new(num_threads, &spot));
346                    let mut threads = Vec::with_capacity(num_threads as _);
347                    for _ in 0..num_threads {
348                        let test = test.clone();
349                        threads.push(s.spawn(move || test.run()));
350                    }
351                    tests.push((test, threads));
352                }
353
354                for unpark_index in 0..num_single_unparks {
355                    thread::sleep(delay);
356                    for (test, _) in &tests {
357                        test.unpark_one(unpark_index);
358                    }
359                }
360
361                for (test, threads) in tests {
362                    test.finish(num_single_unparks);
363                    for thread in threads {
364                        thread.join().expect("Test thread panic");
365                    }
366                }
367            });
368        }
369
370        struct SingleLatchTest<'a> {
371            semaphore: AtomicIsize,
372            num_awake: AtomicU32,
373            /// Total number of threads participating in this test.
374            num_threads: u32,
375            spot: &'a ParkingSpot,
376        }
377
378        impl<'a> SingleLatchTest<'a> {
379            pub fn new(num_threads: u32, spot: &'a ParkingSpot) -> Self {
380                Self {
381                    // This implements a fair (FIFO) semaphore, and it starts out unavailable.
382                    semaphore: AtomicIsize::new(0),
383                    num_awake: AtomicU32::new(0),
384                    num_threads,
385                    spot,
386                }
387            }
388
389            pub fn run(&self) {
390                // Get one slot from the semaphore
391                self.down();
392
393                self.num_awake.fetch_add(1, Ordering::SeqCst);
394            }
395
396            pub fn unpark_one(&self, _single_unpark_index: u32) {
397                let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
398
399                self.up();
400
401                // Wait for a parked thread to wake up and update num_awake + last_awoken.
402                while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
403                    thread::yield_now();
404                }
405            }
406
407            pub fn finish(&self, num_single_unparks: u32) {
408                // The amount of threads not unparked via unpark_one
409                let mut num_threads_left =
410                    self.num_threads.checked_sub(num_single_unparks).unwrap();
411
412                // Wake remaining threads up with unpark_all. Has to be in a loop, because there might
413                // still be threads that has not yet parked.
414                while num_threads_left > 0 {
415                    let mut num_waiting_on_address = 0;
416                    self.spot.with_lot(self.semaphore_addr(), |thread_data| {
417                        num_waiting_on_address = thread_data.num_parked;
418                    });
419                    assert!(num_waiting_on_address <= num_threads_left);
420
421                    let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
422
423                    let num_unparked = self.spot.unpark(self.semaphore_addr(), u32::MAX);
424                    assert!(num_unparked >= num_waiting_on_address);
425                    assert!(num_unparked <= num_threads_left);
426
427                    // Wait for all unparked threads to wake up and update num_awake + last_awoken.
428                    while self.num_awake.load(Ordering::SeqCst)
429                        != num_awake_before_unpark + num_unparked
430                    {
431                        thread::yield_now();
432                    }
433
434                    num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
435                }
436                // By now, all threads should have been woken up
437                assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
438
439                // Make sure no thread is parked on our semaphore address
440                let mut num_waiting_on_address = 0;
441                self.spot.with_lot(self.semaphore_addr(), |thread_data| {
442                    num_waiting_on_address = thread_data.num_parked;
443                });
444                assert_eq!(num_waiting_on_address, 0);
445            }
446
447            pub fn down(&self) {
448                let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
449
450                if old_semaphore_value > 0 {
451                    // We acquired the semaphore. Done.
452                    return;
453                }
454
455                // We need to wait.
456                let validate = || true;
457                self.spot.park(self.semaphore_addr(), validate, None);
458            }
459
460            pub fn up(&self) {
461                let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst);
462
463                // Check if anyone was waiting on the semaphore. If they were, then pass ownership to them.
464                if old_semaphore_value < 0 {
465                    // We need to continue until we have actually unparked someone. It might be that
466                    // the thread we want to pass ownership to has decremented the semaphore counter,
467                    // but not yet parked.
468                    loop {
469                        match self.spot.unpark(self.semaphore_addr(), 1) {
470                            1 => break,
471                            0 => (),
472                            i => panic!("Should not wake up {i} threads"),
473                        }
474                    }
475                }
476            }
477
478            fn semaphore_addr(&self) -> u64 {
479                addr_of!(self.semaphore) as _
480            }
481        }
482    }
483
484    #[test]
485    fn wait_with_timeout() {
486        let parking_spot = &ParkingSpot::default();
487        let atomic = &AtomicU64::new(0);
488
489        thread::scope(|s| {
490            let atomic_key = addr_of!(atomic) as u64;
491
492            const N: u64 = 5;
493            const M: u64 = 1000;
494
495            let thread = s.spawn(move || {
496                while atomic.load(Ordering::SeqCst) != N * M {
497                    let timeout = Instant::now() + Duration::from_millis(1);
498                    parking_spot.park(
499                        atomic_key,
500                        || atomic.load(Ordering::SeqCst) != N * M,
501                        Some(timeout),
502                    );
503                }
504            });
505
506            let mut threads = vec![thread];
507            for _ in 0..N {
508                threads.push(s.spawn(move || {
509                    for _ in 0..M {
510                        atomic.fetch_add(1, Ordering::SeqCst);
511                        parking_spot.unpark(atomic_key, 1);
512                    }
513                }));
514            }
515
516            for thread in threads {
517                thread.join().unwrap();
518            }
519        });
520    }
521}