prioritized_metered_channel/
lib.rs1#![allow(clippy::all)]
19
20#[cfg(all(feature = "async_channel", feature = "futures_channel",))]
21compile_error!("`async_channel` and `futures_channel` are mutually exclusive features");
22
23#[cfg(not(any(feature = "async_channel", feature = "futures_channel")))]
24compile_error!("Must build with either `async_channel` or `futures_channel` features");
25
26use std::sync::{
27 atomic::{AtomicUsize, Ordering},
28 Arc,
29};
30
31use derive_more::Display;
32
33mod bounded;
34pub mod oneshot;
35mod unbounded;
36
37pub use self::{bounded::*, unbounded::*};
38
39pub use coarsetime::Duration as CoarseDuration;
40use coarsetime::Instant as CoarseInstant;
41
42#[cfg(test)]
43mod tests;
44
45const TOF_QUEUE_SIZE: usize = 100;
47
48#[derive(Debug, Clone)]
50pub struct Meter {
51 sent: Arc<AtomicUsize>,
53 received: Arc<AtomicUsize>,
55 #[cfg(feature = "async_channel")]
56 channel_len: Arc<AtomicUsize>,
58 blocked: Arc<AtomicUsize>,
60 tof: Arc<crossbeam_queue::ArrayQueue<CoarseDuration>>,
62}
63
64impl std::default::Default for Meter {
65 fn default() -> Self {
66 Self {
67 sent: Arc::new(AtomicUsize::new(0)),
68 received: Arc::new(AtomicUsize::new(0)),
69 #[cfg(feature = "async_channel")]
70 channel_len: Arc::new(AtomicUsize::new(0)),
71 blocked: Arc::new(AtomicUsize::new(0)),
72 tof: Arc::new(crossbeam_queue::ArrayQueue::new(TOF_QUEUE_SIZE)),
73 }
74 }
75}
76
77#[derive(Debug, Display, Clone, Default, PartialEq)]
80#[display(fmt = "(sent={} received={})", sent, received)]
81pub struct Readout {
82 pub sent: usize,
84 pub received: usize,
86 pub channel_len: usize,
88 pub blocked: usize,
90 pub tof: Vec<CoarseDuration>,
92}
93
94impl Meter {
95 pub fn read(&self) -> Readout {
97 let sent = self.sent.load(Ordering::Relaxed);
100 let received = self.received.load(Ordering::Relaxed);
101
102 #[cfg(feature = "async_channel")]
103 let channel_len = self.channel_len.load(Ordering::Relaxed);
104 #[cfg(feature = "futures_channel")]
105 let channel_len = sent.saturating_sub(received);
106
107 Readout {
108 sent,
109 received,
110 channel_len,
111 blocked: self.blocked.load(Ordering::Relaxed),
112 tof: {
113 let mut acc = Vec::with_capacity(self.tof.len());
114 while let Some(value) = self.tof.pop() {
115 acc.push(value)
116 }
117 acc
118 },
119 }
120 }
121
122 fn note_sent(&self) -> usize {
123 self.sent.fetch_add(1, Ordering::Relaxed)
124 }
125
126 #[cfg(feature = "async_channel")]
127 fn note_channel_len(&self, len: usize) {
128 self.channel_len.store(len, Ordering::Relaxed)
129 }
130
131 fn retract_sent(&self) {
132 self.sent.fetch_sub(1, Ordering::Relaxed);
133 }
134
135 fn note_received(&self) {
136 self.received.fetch_add(1, Ordering::Relaxed);
137 }
138
139 fn note_blocked(&self) {
140 self.blocked.fetch_add(1, Ordering::Relaxed);
141 }
142
143 fn note_time_of_flight(&self, tof: CoarseDuration) {
144 let _ = self.tof.force_push(tof);
145 }
146
147 #[cfg(feature = "futures_channel")]
148 fn calculate_channel_len(&self) -> usize {
149 let sent = self.sent.load(Ordering::Relaxed);
150 let received = self.received.load(Ordering::Relaxed);
151 sent.saturating_sub(received) as usize
152 }
153}
154
155#[inline(always)]
157fn measure_tof_check(nth: usize) -> bool {
158 if cfg!(test) {
159 nth & 0x01 == 0
161 } else {
162 use nanorand::Rng;
163 let mut rng = nanorand::WyRand::new_seed(nth as u64);
164
165 const PROB: u64 = (u64::MAX as f64 * 0.053_f64) as u64;
167 let coin = rng.generate::<u64>();
168
169 coin >= PROB
170 }
171}
172
173#[derive(Debug)]
177pub enum MaybeTimeOfFlight<T: Sized> {
178 Bare(T),
179 WithTimeOfFlight(T, CoarseInstant),
180}
181
182impl<T> From<T> for MaybeTimeOfFlight<T> {
183 fn from(value: T) -> Self {
184 Self::Bare(value)
185 }
186}
187
188impl<T> MaybeTimeOfFlight<T> {
190 pub fn into(self) -> T {
192 match self {
193 Self::Bare(value) => value,
194 Self::WithTimeOfFlight(value, _tof_start) => value,
195 }
196 }
197}
198
199pub fn prepare_with_tof<T>(meter: &Meter, item: T) -> MaybeTimeOfFlight<T> {
200 let previous = meter.note_sent();
201 let item = if measure_tof_check(previous) {
202 MaybeTimeOfFlight::WithTimeOfFlight(item, CoarseInstant::now())
203 } else {
204 MaybeTimeOfFlight::Bare(item)
205 };
206 item
207}
208
209impl<T> std::ops::Deref for MaybeTimeOfFlight<T> {
210 type Target = T;
211 fn deref(&self) -> &Self::Target {
212 match self {
213 Self::Bare(ref value) => value,
214 Self::WithTimeOfFlight(ref value, _tof_start) => value,
215 }
216 }
217}