1#![deny(clippy::all)]
13#![deny(clippy::pedantic)]
14#![deny(missing_docs)]
15#![deny(unsafe_code)]
16
17use crate::WaitResult;
18use std::collections::BTreeMap;
19use std::sync::{Arc, Condvar, Mutex};
20use std::time::Instant;
21
22#[derive(Default, Debug)]
23struct Spot {
24 num_parked: u32,
26
27 to_unpark: u32,
30
31 cvar: Arc<Condvar>,
33}
34
35#[derive(Default, Debug)]
37pub struct ParkingSpot {
38 inner: Mutex<BTreeMap<u64, Spot>>,
39}
40
41impl ParkingSpot {
42 pub fn park(
52 &self,
53 key: u64,
54 validate: impl FnOnce() -> bool,
55 timeout: impl Into<Option<Instant>>,
56 ) -> WaitResult {
57 self.park_inner(key, validate, timeout.into())
58 }
59
60 fn park_inner(
61 &self,
62 key: u64,
63 validate: impl FnOnce() -> bool,
64 timeout: Option<Instant>,
65 ) -> WaitResult {
66 let mut inner = self
67 .inner
68 .lock()
69 .expect("failed to lock inner parking table");
70
71 if !validate() {
73 return WaitResult::Mismatch;
74 }
75
76 let cvar = {
78 let spot = inner.entry(key).or_insert_with(Spot::default);
79 spot.num_parked = spot
80 .num_parked
81 .checked_add(1)
82 .expect("parking spot number overflow");
83 spot.cvar.clone()
84 };
85
86 loop {
87 let timed_out = if let Some(timeout) = timeout {
88 let now = Instant::now();
89 if now >= timeout {
90 true
91 } else {
92 let dur = timeout - now;
93 let (lock, result) = cvar
94 .wait_timeout(inner, dur)
95 .expect("failed to wait for condition");
96 inner = lock;
97 result.timed_out()
98 }
99 } else {
100 inner = cvar.wait(inner).expect("failed to wait for condition");
101 false
102 };
103
104 let spot = inner.get_mut(&key).expect("failed to get spot");
105
106 if timed_out {
107 if Instant::now() < timeout.unwrap() {
112 continue;
113 }
114
115 if spot.to_unpark > 0 {
141 spot.to_unpark -= 1;
142 }
143 } else {
144 if spot.to_unpark == 0 {
145 continue;
151 }
152 spot.to_unpark -= 1;
155 }
156
157 spot.num_parked = spot
158 .num_parked
159 .checked_sub(1)
160 .expect("corrupted parking spot state");
161
162 if spot.num_parked == 0 {
163 assert_eq!(spot.to_unpark, 0);
164 inner
165 .remove(&key)
166 .expect("failed to remove spot from inner parking table");
167 }
168
169 if timed_out {
170 return WaitResult::TimedOut;
171 }
172
173 return WaitResult::Ok;
174 }
175 }
176
177 pub fn unpark(&self, key: u64, n: u32) -> u32 {
181 if n == 0 {
182 return 0;
183 }
184 let mut num_unpark = 0;
185
186 self.with_lot(key, |spot| {
187 num_unpark = n.min(spot.num_parked - spot.to_unpark);
188 spot.to_unpark += num_unpark;
189 if n >= num_unpark {
190 spot.cvar.notify_all();
191 } else {
192 for _ in 0..num_unpark {
193 spot.cvar.notify_one();
194 }
195 }
196 });
197
198 num_unpark
199 }
200
201 fn with_lot<F: FnMut(&mut Spot)>(&self, key: u64, mut f: F) {
202 let mut inner = self
203 .inner
204 .lock()
205 .expect("failed to lock inner parking table");
206 if let Some(spot) = inner.get_mut(&key) {
207 f(spot);
208 }
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::ParkingSpot;
215 use std::ptr::addr_of;
216 use std::sync::atomic::{AtomicU64, Ordering};
217 use std::thread;
218 use std::time::{Duration, Instant};
219
220 #[test]
221 fn atomic_wait_notify() {
222 let parking_spot = &ParkingSpot::default();
223 let atomic = &AtomicU64::new(0);
224
225 thread::scope(|s| {
226 let atomic_key = addr_of!(atomic) as u64;
227 let thread1 = s.spawn(move || {
228 atomic.store(1, Ordering::SeqCst);
229 parking_spot.unpark(atomic_key, u32::MAX);
230 parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 1, None);
231 });
232
233 let thread2 = s.spawn(move || {
234 while atomic.load(Ordering::SeqCst) != 1 {
235 parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) != 1, None);
236 }
237 atomic.store(2, Ordering::SeqCst);
238 parking_spot.unpark(atomic_key, u32::MAX);
239 parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 2, None);
240 });
241
242 let thread3 = s.spawn(move || {
243 while atomic.load(Ordering::SeqCst) != 2 {
244 parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) != 2, None);
245 }
246 atomic.store(3, Ordering::SeqCst);
247 parking_spot.unpark(atomic_key, u32::MAX);
248
249 parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) == 3, None);
250 });
251
252 while atomic.load(Ordering::SeqCst) != 3 {
253 parking_spot.park(atomic_key, || atomic.load(Ordering::SeqCst) != 3, None);
254 }
255 atomic.store(4, Ordering::SeqCst);
256 parking_spot.unpark(atomic_key, u32::MAX);
257
258 thread1.join().unwrap();
259 thread2.join().unwrap();
260 thread3.join().unwrap();
261 });
262 }
263
264 mod parking_lot {
265 use super::*;
268 use std::sync::atomic::{AtomicIsize, AtomicU32};
269 use std::sync::Arc;
270 use std::time::Duration;
271
272 macro_rules! test {
273 ( $( $name:ident(
274 repeats: $repeats:expr,
275 latches: $latches:expr,
276 delay: $delay:expr,
277 threads: $threads:expr,
278 single_unparks: $single_unparks:expr);
279 )* ) => {
280 $(
281 #[test]
282 fn $name() {
283 if std::env::var("WASMTIME_TEST_NO_HOG_MEMORY").is_ok() {
284 return;
285 }
286 let delay = Duration::from_micros($delay);
287 for _ in 0..$repeats {
288 run_parking_test($latches, delay, $threads, $single_unparks);
289 }
290 })*
291 };
292 }
293
294 test! {
295 unpark_all_one_fast(
296 repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0
297 );
298 unpark_all_hundred_fast(
299 repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
300 );
301 unpark_one_one_fast(
302 repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
303 );
304 unpark_one_hundred_fast(
305 repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
306 );
307 unpark_one_fifty_then_fifty_all_fast(
308 repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
309 );
310 unpark_all_one(
311 repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
312 );
313 unpark_all_hundred(
314 repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
315 );
316 unpark_one_one(
317 repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
318 );
319 unpark_one_fifty(
320 repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
321 );
322 unpark_one_fifty_then_fifty_all(
323 repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
324 );
325 hundred_unpark_all_one_fast(
326 repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
327 );
328 hundred_unpark_all_one(
329 repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
330 );
331 }
332
333 fn run_parking_test(
334 num_latches: usize,
335 delay: Duration,
336 num_threads: u32,
337 num_single_unparks: u32,
338 ) {
339 let spot = ParkingSpot::default();
340
341 thread::scope(|s| {
342 let mut tests = Vec::with_capacity(num_latches);
343
344 for _ in 0..num_latches {
345 let test = Arc::new(SingleLatchTest::new(num_threads, &spot));
346 let mut threads = Vec::with_capacity(num_threads as _);
347 for _ in 0..num_threads {
348 let test = test.clone();
349 threads.push(s.spawn(move || test.run()));
350 }
351 tests.push((test, threads));
352 }
353
354 for unpark_index in 0..num_single_unparks {
355 thread::sleep(delay);
356 for (test, _) in &tests {
357 test.unpark_one(unpark_index);
358 }
359 }
360
361 for (test, threads) in tests {
362 test.finish(num_single_unparks);
363 for thread in threads {
364 thread.join().expect("Test thread panic");
365 }
366 }
367 });
368 }
369
370 struct SingleLatchTest<'a> {
371 semaphore: AtomicIsize,
372 num_awake: AtomicU32,
373 num_threads: u32,
375 spot: &'a ParkingSpot,
376 }
377
378 impl<'a> SingleLatchTest<'a> {
379 pub fn new(num_threads: u32, spot: &'a ParkingSpot) -> Self {
380 Self {
381 semaphore: AtomicIsize::new(0),
383 num_awake: AtomicU32::new(0),
384 num_threads,
385 spot,
386 }
387 }
388
389 pub fn run(&self) {
390 self.down();
392
393 self.num_awake.fetch_add(1, Ordering::SeqCst);
394 }
395
396 pub fn unpark_one(&self, _single_unpark_index: u32) {
397 let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
398
399 self.up();
400
401 while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
403 thread::yield_now();
404 }
405 }
406
407 pub fn finish(&self, num_single_unparks: u32) {
408 let mut num_threads_left =
410 self.num_threads.checked_sub(num_single_unparks).unwrap();
411
412 while num_threads_left > 0 {
415 let mut num_waiting_on_address = 0;
416 self.spot.with_lot(self.semaphore_addr(), |thread_data| {
417 num_waiting_on_address = thread_data.num_parked;
418 });
419 assert!(num_waiting_on_address <= num_threads_left);
420
421 let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
422
423 let num_unparked = self.spot.unpark(self.semaphore_addr(), u32::MAX);
424 assert!(num_unparked >= num_waiting_on_address);
425 assert!(num_unparked <= num_threads_left);
426
427 while self.num_awake.load(Ordering::SeqCst)
429 != num_awake_before_unpark + num_unparked
430 {
431 thread::yield_now();
432 }
433
434 num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
435 }
436 assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
438
439 let mut num_waiting_on_address = 0;
441 self.spot.with_lot(self.semaphore_addr(), |thread_data| {
442 num_waiting_on_address = thread_data.num_parked;
443 });
444 assert_eq!(num_waiting_on_address, 0);
445 }
446
447 pub fn down(&self) {
448 let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
449
450 if old_semaphore_value > 0 {
451 return;
453 }
454
455 let validate = || true;
457 self.spot.park(self.semaphore_addr(), validate, None);
458 }
459
460 pub fn up(&self) {
461 let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst);
462
463 if old_semaphore_value < 0 {
465 loop {
469 match self.spot.unpark(self.semaphore_addr(), 1) {
470 1 => break,
471 0 => (),
472 i => panic!("Should not wake up {i} threads"),
473 }
474 }
475 }
476 }
477
478 fn semaphore_addr(&self) -> u64 {
479 addr_of!(self.semaphore) as _
480 }
481 }
482 }
483
484 #[test]
485 fn wait_with_timeout() {
486 let parking_spot = &ParkingSpot::default();
487 let atomic = &AtomicU64::new(0);
488
489 thread::scope(|s| {
490 let atomic_key = addr_of!(atomic) as u64;
491
492 const N: u64 = 5;
493 const M: u64 = 1000;
494
495 let thread = s.spawn(move || {
496 while atomic.load(Ordering::SeqCst) != N * M {
497 let timeout = Instant::now() + Duration::from_millis(1);
498 parking_spot.park(
499 atomic_key,
500 || atomic.load(Ordering::SeqCst) != N * M,
501 Some(timeout),
502 );
503 }
504 });
505
506 let mut threads = vec![thread];
507 for _ in 0..N {
508 threads.push(s.spawn(move || {
509 for _ in 0..M {
510 atomic.fetch_add(1, Ordering::SeqCst);
511 parking_spot.unpark(atomic_key, 1);
512 }
513 }));
514 }
515
516 for thread in threads {
517 thread.join().unwrap();
518 }
519 });
520 }
521}