static_init/phase_locker/
futex.rs

1//TODO: this adjustement may be adapted to
2//      plateform processor
3//
4/// The lock gives priority to write locks when
5/// one is waiting to be fair with write locks. This
6/// fairness somehow equilibrate with the fact read locks
7/// are extremely fair with other read locks.
8///
9/// But this mitigation can lead to the opposite. If there
10/// are many attempts to get a write lock, read locks will
11/// never be awaken. So if concecutive `READ_FAIRNESS_PERIOD`
12/// write locks are awaked, gives priority to awake read locks
13const READ_FAIRNESS_PERIOD: u16 = 32;
14#[cfg(all(
15    not(feature = "parking_lot_core"),
16    any(target_os = "linux", target_os = "android")
17))]
18mod linux {
19    use super::READ_FAIRNESS_PERIOD;
20    use crate::phase::*;
21    use core::ops::{Deref, DerefMut};
22    use core::ptr;
23    use core::sync::atomic::{compiler_fence, AtomicU16, AtomicU32, Ordering};
24    use libc::{syscall, SYS_futex, FUTEX_PRIVATE_FLAG, FUTEX_WAIT_BITSET, FUTEX_WAKE_BITSET};
25
26    pub(crate) struct Futex {
27        futex: AtomicU32,
28        writer_count: AtomicU32,
29        fairness: AtomicU16,
30    }
31
32    const READER_BIT: u32 = 0b01;
33    const WRITER_BIT: u32 = 0b10;
34
35    impl Futex {
36        pub(crate) const fn new(value: u32) -> Self {
37            Self {
38                futex: AtomicU32::new(value),
39                writer_count: AtomicU32::new(0),
40                fairness: AtomicU16::new(0),
41                //to allow the static to be placed zeroed segment
42                //and fairness with threads who attempted but failed to
43                //initialize the static
44            }
45        }
46
47        pub(crate) fn prefer_wake_one_writer(&self) -> bool {
48            self.fairness.load(Ordering::Relaxed) % READ_FAIRNESS_PERIOD != 0
49        }
50
51        pub(crate) fn compare_and_wait_as_reader(&self, value: u32) -> bool {
52            unsafe {
53                syscall(
54                    SYS_futex,
55                    &self.futex as *const _ as *const _,
56                    FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG,
57                    value,
58                    ptr::null::<u32>(),
59                    ptr::null::<u32>(),
60                    READER_BIT,
61                ) == 0
62            }
63        }
64        pub(crate) fn compare_and_wait_as_writer(&self, value: u32) -> bool {
65            assert_ne!(self.writer_count.fetch_add(1, Ordering::Relaxed), u32::MAX);
66            compiler_fence(Ordering::AcqRel);
67            let res = unsafe {
68                syscall(
69                    SYS_futex,
70                    &self.futex as *const _ as *const _,
71                    FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG,
72                    value,
73                    ptr::null::<u32>(),
74                    ptr::null::<u32>(),
75                    WRITER_BIT,
76                ) == 0
77            };
78            compiler_fence(Ordering::AcqRel);
79            let prev_count = self.writer_count.fetch_sub(1, Ordering::Relaxed);
80            assert_ne!(prev_count, 0);
81            //// count = number of threads waiting at the time of wake + those
82            ////         for which the futex syscall as been interrupted but count not
83            ////         yet substracted + those that are in the process of waiting
84            //// so here count is larger than the number of waiting threads
85            if res && prev_count > 1 {
86                self.futex.fetch_or(WRITE_WAITER_BIT, Ordering::Relaxed);
87            }
88            res
89        }
90        pub(crate) fn wake_readers(&self) -> usize {
91            self.fairness.store(1, Ordering::Relaxed);
92            let count = unsafe {
93                syscall(
94                    SYS_futex,
95                    &self.futex as *const _ as *const _,
96                    FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG,
97                    MAX_WAKED_READERS as i32,
98                    ptr::null::<u32>(),
99                    ptr::null::<u32>(),
100                    READER_BIT,
101                ) as usize
102            };
103            if count == MAX_WAKED_READERS {
104                self.futex.fetch_or(READ_WAITER_BIT, Ordering::Relaxed);
105            }
106            count
107        }
108        pub(crate) fn wake_one_writer(&self) -> bool {
109            self.fairness.fetch_add(1, Ordering::Relaxed);
110            unsafe {
111                syscall(
112                    SYS_futex,
113                    &self.futex as *const _ as *const _,
114                    FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG,
115                    1,
116                    ptr::null::<u32>(),
117                    ptr::null::<u32>(),
118                    WRITER_BIT,
119                ) == 1
120            }
121        }
122    }
123
124    impl Deref for Futex {
125        type Target = AtomicU32;
126        fn deref(&self) -> &Self::Target {
127            &self.futex
128        }
129    }
130    impl DerefMut for Futex {
131        fn deref_mut(&mut self) -> &mut Self::Target {
132            &mut self.futex
133        }
134    }
135}
136#[cfg(all(
137    not(feature = "parking_lot_core"),
138    any(target_os = "linux", target_os = "android")
139))]
140pub(crate) use linux::Futex;
141
142#[cfg(any(
143    feature = "parking_lot_core",
144    not(any(target_os = "linux", target_os = "android"))
145))]
146mod other {
147    use super::READ_FAIRNESS_PERIOD;
148    use crate::phase::*;
149    use core::ops::{Deref, DerefMut};
150    use core::sync::atomic::{compiler_fence, AtomicU16, AtomicU32, Ordering};
151    use parking_lot_core::{
152        park, unpark_filter, unpark_one, FilterOp, ParkResult, DEFAULT_PARK_TOKEN,
153        DEFAULT_UNPARK_TOKEN,
154    };
155
156    pub(crate) struct Futex {
157        futex: AtomicU32,
158        writer_count: AtomicU32,
159        fairness: AtomicU16,
160    }
161
162    impl Futex {
163        pub(crate) const fn new(value: u32) -> Self {
164            Self {
165                futex: AtomicU32::new(value),
166                writer_count: AtomicU32::new(0),
167                fairness: AtomicU16::new(0),
168            }
169        }
170
171        pub(crate) fn prefer_wake_one_writer(&self) -> bool {
172            self.fairness.load(Ordering::Relaxed) % READ_FAIRNESS_PERIOD == 0
173        }
174
175        pub(crate) fn compare_and_wait_as_reader(&self, value: u32) -> bool {
176            unsafe {
177                matches!(
178                    park(
179                        self.reader_key(),
180                        || self.futex.load(Ordering::Relaxed) == value,
181                        || {},
182                        |_, _| {},
183                        DEFAULT_PARK_TOKEN,
184                        None,
185                    ),
186                    ParkResult::Unparked(_)
187                )
188            }
189        }
190        pub(crate) fn compare_and_wait_as_writer(&self, value: u32) -> bool {
191            assert_ne!(self.writer_count.fetch_add(1, Ordering::Relaxed), u32::MAX);
192            compiler_fence(Ordering::AcqRel);
193            let res = unsafe {
194                matches!(
195                    park(
196                        self.writer_key(),
197                        || self.futex.load(Ordering::Relaxed) == value,
198                        || {},
199                        |_, _| {},
200                        DEFAULT_PARK_TOKEN,
201                        None,
202                    ),
203                    ParkResult::Unparked(_)
204                )
205            };
206            compiler_fence(Ordering::AcqRel);
207            let prev_count = self.writer_count.fetch_sub(1, Ordering::Relaxed);
208            assert_ne!(prev_count, 0);
209            //// count = number of threads waiting at the time of unpark + those
210            ////         for which the futex syscall as been interrupted but count not
211            ////         yet substracted + those that are in the process of waiting
212            //// so here count is larger than the number of waiting threads
213            if res && prev_count > 1 {
214                self.futex.fetch_or(WRITE_WAITER_BIT, Ordering::Relaxed);
215            }
216            res
217        }
218        pub(crate) fn wake_readers(&self) -> usize {
219            self.fairness.store(1, Ordering::Relaxed);
220            let mut c = 0;
221            let r = unsafe {
222                unpark_filter(
223                    self.reader_key(),
224                    |_| {
225                        if c < MAX_WAKED_READERS {
226                            c += 1;
227                            FilterOp::Unpark
228                        } else {
229                            FilterOp::Stop
230                        }
231                    },
232                    |_| DEFAULT_UNPARK_TOKEN,
233                )
234            };
235
236            if c == MAX_WAKED_READERS {
237                self.futex.fetch_or(READ_WAITER_BIT, Ordering::Relaxed);
238            }
239
240            r.unparked_threads
241        }
242        pub(crate) fn wake_one_writer(&self) -> bool {
243            self.fairness.fetch_add(1, Ordering::Relaxed);
244            let r = unsafe { unpark_one(self.writer_key(), |_| DEFAULT_UNPARK_TOKEN) };
245            r.unparked_threads == 1
246        }
247
248        fn reader_key(&self) -> usize {
249            &self.futex as *const _ as usize
250        }
251        fn writer_key(&self) -> usize {
252            (&self.futex as *const _ as usize) + 1
253        }
254    }
255
256    impl Deref for Futex {
257        type Target = AtomicU32;
258        fn deref(&self) -> &Self::Target {
259            &self.futex
260        }
261    }
262    impl DerefMut for Futex {
263        fn deref_mut(&mut self) -> &mut Self::Target {
264            &mut self.futex
265        }
266    }
267}
268#[cfg(any(
269    feature = "parking_lot_core",
270    not(any(target_os = "linux", target_os = "android"))
271))]
272pub(crate) use other::Futex;