1use std::{
29 collections::{BTreeSet, HashMap, VecDeque},
30 fmt::Display,
31 sync::Arc,
32 time::{Duration, Instant},
33};
34use tokio::sync::RwLock;
35
36mod sealed {
37 pub trait HasDefaultStatFormatter {}
38}
39
40impl sealed::HasDefaultStatFormatter for u32 {}
41impl sealed::HasDefaultStatFormatter for i64 {}
42
43pub trait StatFormatter {
44 fn format_stat(value: f64) -> String;
45}
46
47impl<T> StatFormatter for T
48where
49 T: Display + sealed::HasDefaultStatFormatter,
50{
51 fn format_stat(value: f64) -> String {
52 format!("{value:.2}")
53 }
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
57pub struct StatDuration(pub std::time::Duration);
58
59impl Into<f64> for StatDuration {
60 fn into(self) -> f64 {
61 self.0.as_secs_f64()
62 }
63}
64
65impl Into<StatDuration> for Duration {
66 fn into(self) -> StatDuration {
67 StatDuration(self)
68 }
69}
70
71impl std::fmt::Display for StatDuration {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 write!(f, "{:?}", self.0)
74 }
75}
76
77impl StatFormatter for StatDuration {
78 fn format_stat(value: f64) -> String {
79 format!("{:?}", Duration::from_secs_f64(value))
80 }
81}
82
83pub struct SlidingStats<T> {
89 inner: Arc<RwLock<Inner<T>>>,
90}
91
92pub struct SyncSlidingStats<T> {
94 inner: Arc<parking_lot::RwLock<Inner<T>>>,
95}
96
97pub type DurationSlidingStats = SlidingStats<StatDuration>;
102
103pub type SyncDurationSlidingStats = SyncSlidingStats<StatDuration>;
105
106pub struct Inner<T> {
108 retention: Duration,
110
111 next_id: usize,
113
114 entries: HashMap<usize, Entry<T>>,
116
117 by_time: VecDeque<usize>,
119
120 by_value: BTreeSet<(T, usize)>,
122
123 last_log: Option<Instant>,
127}
128
129impl<T> Default for Inner<T> {
130 fn default() -> Self {
131 Self {
132 retention: Default::default(),
133 next_id: Default::default(),
134 entries: Default::default(),
135 by_time: Default::default(),
136 by_value: Default::default(),
137 last_log: None,
138 }
139 }
140}
141
142impl<T> Display for Inner<T>
143where
144 T: Ord + Copy + Into<f64> + std::fmt::Display + StatFormatter,
145{
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 let mut parts = Vec::new();
148
149 parts.push(format!("count={}", self.count()));
150 if let Some(min) = self.min() {
151 parts.push(format!("min={}", min));
152 }
153 if let Some(max) = self.max() {
154 parts.push(format!("max={}", max));
155 }
156 if let Some(avg) = self.avg() {
157 parts.push(format!("avg={}", <T as StatFormatter>::format_stat(avg)));
158 }
159
160 for p in [50, 90, 95, 99] {
161 let val = self.percentile(p);
162 if val.is_finite() {
163 parts.push(format!("p{}={}", p, <T as StatFormatter>::format_stat(val)));
164 }
165 }
166 parts.push(format!("span={:?}", self.retention));
167 write!(f, "{}", parts.join(", "))
168 }
169}
170
171#[derive(Clone, Copy)]
173struct Entry<T> {
174 timestamp: Instant,
175 value: T,
176}
177
178impl<T> SlidingStats<T>
179where
180 T: Ord + Copy,
181{
182 pub fn new(retention: Duration) -> Self {
184 Self { inner: Arc::new(RwLock::new(Inner { retention, ..Default::default() })) }
185 }
186
187 #[cfg(test)]
191 pub async fn insert(&self, value: T) {
192 self.inner.write().await.insert(value)
193 }
194
195 #[cfg(test)]
199 pub async fn insert_using_timestamp(&self, value: T, now: Instant) {
200 self.inner.write().await.insert_using_timestamp(value, now)
201 }
202
203 #[cfg(test)]
204 pub async fn len(&self) -> usize {
205 self.inner.read().await.len()
206 }
207
208 #[cfg(test)]
213 pub async fn with_inner<R>(&self, f: impl FnOnce(&mut Inner<T>) -> R) -> R {
214 let mut guard = self.inner.write().await;
215 f(&mut *guard)
216 }
217}
218
219impl<T> SyncSlidingStats<T>
220where
221 T: Ord + Copy,
222{
223 pub fn new(retention: Duration) -> Self {
225 Self {
226 inner: Arc::new(parking_lot::RwLock::new(Inner { retention, ..Default::default() })),
227 }
228 }
229}
230
231impl<T> SlidingStats<T>
232where
233 T: Ord + Copy + Into<f64> + std::fmt::Display + StatFormatter,
234{
235 pub async fn insert_with_log(
246 &self,
247 value: T,
248 log_interval: Option<Duration>,
249 now: Instant,
250 ) -> Option<String> {
251 let mut inner = self.inner.write().await;
252 inner.insert_with_log(value, log_interval, now)
253 }
254}
255
256impl<T> SyncSlidingStats<T>
257where
258 T: Ord + Copy + Into<f64> + std::fmt::Display + StatFormatter,
259{
260 pub fn insert_with_log(
261 &self,
262 value: T,
263 log_interval: Option<Duration>,
264 now: Instant,
265 ) -> Option<String> {
266 let mut inner = self.inner.write();
267 inner.insert_with_log(value, log_interval, now)
268 }
269}
270
271impl<T> Inner<T>
272where
273 T: Ord + Copy,
274{
275 #[cfg(test)]
276 fn insert(&mut self, value: T) {
277 self.insert_using_timestamp(value, Instant::now())
278 }
279
280 fn insert_using_timestamp(&mut self, value: T, now: Instant) {
282 let id = self.next_id;
283 self.next_id += 1;
284
285 let entry = Entry { timestamp: now, value };
286
287 self.entries.insert(id, entry);
288 self.by_time.push_back(id);
289 self.by_value.insert((value, id));
290
291 self.prune(now);
292 }
293
294 pub fn min(&self) -> Option<T> {
296 self.by_value.first().map(|(v, _)| *v)
297 }
298
299 pub fn max(&self) -> Option<T> {
301 self.by_value.last().map(|(v, _)| *v)
302 }
303
304 pub fn count(&self) -> usize {
306 self.len()
307 }
308
309 pub fn prune(&mut self, now: Instant) {
313 let cutoff = now - self.retention;
314
315 while let Some(&oldest_id) = self.by_time.front() {
316 let expired = match self.entries.get(&oldest_id) {
317 Some(entry) => entry.timestamp < cutoff,
318 None => {
319 debug_assert!(false);
320 true
321 },
322 };
323
324 if !expired {
325 break;
326 }
327
328 if let Some(entry) = self.entries.remove(&oldest_id) {
329 self.by_value.remove(&(entry.value, oldest_id));
330 } else {
331 debug_assert!(false);
332 }
333 self.by_time.pop_front();
334 }
335 }
336
337 pub fn len(&self) -> usize {
338 debug_assert_eq!(self.entries.len(), self.by_time.len());
339 debug_assert_eq!(self.entries.len(), self.by_value.len());
340 self.entries.len()
341 }
342}
343
344impl<T> Inner<T>
345where
346 T: Ord + Copy + Into<f64>,
347{
348 pub fn avg(&self) -> Option<f64> {
350 let len = self.len();
351 if len == 0 {
352 None
353 } else {
354 Some(self.entries.values().map(|e| e.value.into()).sum::<f64>() / len as f64)
355 }
356 }
357
358 pub fn percentile(&self, percentile: usize) -> f64 {
363 if self.len() == 0 || percentile > 100 {
364 return f64::NAN;
365 }
366
367 let tau = percentile as f64 / 100.0;
368 let len = self.len();
369
370 let h = (len as f64 + 1.0 / 3.0) * tau + 1.0 / 3.0;
371 let hf = h as i64;
372
373 if hf <= 0 || percentile == 0 {
374 return self.min().map(|v| v.into()).unwrap_or(f64::NAN);
375 }
376
377 if hf >= len as i64 || percentile == 100 {
378 return self.max().map(|v| v.into()).unwrap_or(f64::NAN);
379 }
380
381 let mut iter = self.by_value.iter().map(|(v, _)| (*v).into());
382
383 let a = iter.nth((hf as usize).saturating_sub(1)).unwrap_or(f64::NAN);
384 let b = iter.next().unwrap_or(f64::NAN);
385
386 a + (h - hf as f64) * (b - a)
387 }
388}
389
390impl<T> Inner<T>
391where
392 T: Ord + Copy + Into<f64> + std::fmt::Display + StatFormatter,
393{
394 pub fn insert_with_log(
396 &mut self,
397 value: T,
398 log_interval: Option<Duration>,
399 now: Instant,
400 ) -> Option<String> {
401 let Some(last_log) = self.last_log else {
402 self.last_log = Some(now);
403 self.insert_using_timestamp(value, now);
404 return None;
405 };
406
407 let log_interval = log_interval.unwrap_or(self.retention);
408 let should_log = now.duration_since(last_log) >= log_interval;
409 let result = should_log.then(|| {
410 self.last_log = Some(now);
411 format!("{self}")
412 });
413 self.insert_using_timestamp(value, now);
414 result
415 }
416}
417
418impl<T> Clone for SlidingStats<T> {
419 fn clone(&self) -> Self {
420 Self { inner: Arc::clone(&self.inner) }
421 }
422}
423
424impl<T> Clone for SyncSlidingStats<T> {
425 fn clone(&self) -> Self {
426 Self { inner: Arc::clone(&self.inner) }
427 }
428}
429
430#[macro_export]
445macro_rules! insert_and_log_throttled {
446 (
447 $level:expr,
448 target: $target:expr,
449 log_interval: $log_interval:expr,
450 prefix: $prefix:expr,
451 $stats:expr,
452 $value:expr
453 ) => {{
454 if tracing::enabled!(target: $target, $level) {
455 let now = Instant::now();
456 if let Some(msg) = $stats.insert_with_log($value, Some($log_interval), now).await {
457 tracing::event!(target: $target, $level, "{}: {}", $prefix, msg);
458 }
459 }
460 }};
461
462 (
463 $level:expr,
464 target: $target:expr,
465 prefix: $prefix:expr,
466 $stats:expr,
467 $value:expr
468 ) => {{
469 if tracing::enabled!(target: $target, $level) {
470 let now = std::time::Instant::now();
471 if let Some(msg) = $stats.insert_with_log($value, None, now).await {
472 tracing::event!(target: $target, $level, "{}: {}", $prefix, msg);
473 }
474 }
475 }};
476}
477
478#[macro_export]
480macro_rules! insert_and_log_throttled_sync {
481 (
482 $level:expr,
483 target: $target:literal,
484 prefix: $prefix:expr,
485 $stats:expr,
486 $value:expr
487 ) => {{
488 if tracing::enabled!(target: $target, $level) {
489 let now = std::time::Instant::now();
490 if let Some(msg) = $stats.insert_with_log($value, None, now){
491 tracing::event!(target: $target, $level, "{}: {}", $prefix, msg);
492 }
493 }
494 }};
495}
496
497#[cfg(test)]
498mod test {
499 use super::*;
500 use std::time::{Duration, Instant};
501
502 #[tokio::test]
503 async fn retention_prunes_old_items() {
504 let stats = SlidingStats::<u64>::new(Duration::from_secs(10));
505
506 let base = Instant::now();
507 for i in 0..5 {
508 stats.insert_using_timestamp(i * 10, base + Duration::from_secs(i * 5)).await;
509 }
510 assert_eq!(stats.len().await, 3);
511
512 stats.insert_using_timestamp(999, base + Duration::from_secs(26)).await;
513
514 assert_eq!(stats.len().await, 2);
515 }
516
517 #[tokio::test]
518 async fn retention_prunes_old_items2() {
519 let stats = SlidingStats::<u64>::new(Duration::from_secs(10));
520
521 let base = Instant::now();
522 for i in 0..100 {
523 stats.insert_using_timestamp(i * 10, base + Duration::from_secs(5)).await;
524 }
525 assert_eq!(stats.len().await, 100);
526
527 stats.insert_using_timestamp(999, base + Duration::from_secs(16)).await;
528
529 let len = stats.len().await;
530 assert_eq!(len, 1);
531 }
532
533 #[tokio::test]
534 async fn insert_with_log_message_contains_all_old_items() {
535 let stats = SlidingStats::<u32>::new(Duration::from_secs(100));
536
537 let base = Instant::now();
538 for _ in 0..10 {
539 stats.insert_with_log(1, None, base + Duration::from_secs(5)).await;
540 }
541 assert_eq!(stats.len().await, 10);
542
543 let output = stats.insert_with_log(1, None, base + Duration::from_secs(200)).await.unwrap();
544 assert!(output.contains("count=10"));
545
546 let len = stats.len().await;
547 assert_eq!(len, 1);
548 }
549
550 #[tokio::test]
551 async fn insert_with_log_message_prunes_all_old_items() {
552 let stats = SlidingStats::<u32>::new(Duration::from_secs(25));
553
554 let base = Instant::now();
555 for i in 0..10 {
556 stats.insert_with_log(1, None, base + Duration::from_secs(i * 5)).await;
557 }
558 assert_eq!(stats.len().await, 6);
559
560 let output = stats.insert_with_log(1, None, base + Duration::from_secs(200)).await.unwrap();
561 assert!(output.contains("count=6"));
562
563 let len = stats.len().await;
564 assert_eq!(len, 1);
565 }
566
567 #[tokio::test]
568 async fn test_avg_min_max() {
569 let stats = SlidingStats::<u32>::new(Duration::from_secs(100));
570 let base = Instant::now();
571
572 stats.insert_using_timestamp(10, base).await;
573 stats.insert_using_timestamp(20, base + Duration::from_secs(1)).await;
574 stats.insert_using_timestamp(30, base + Duration::from_secs(2)).await;
575
576 stats
577 .with_inner(|inner| {
578 assert_eq!(inner.count(), 3);
579 assert_eq!(inner.avg(), Some(20.0));
580 assert_eq!(inner.min(), Some(10));
581 assert_eq!(inner.max(), Some(30));
582 })
583 .await;
584 }
585
586 #[tokio::test]
587 async fn duration_format() {
588 let stats = SlidingStats::<StatDuration>::new(Duration::from_secs(100));
589 stats.insert(Duration::from_nanos(100).into()).await;
590 let output = stats.with_inner(|i| format!("{i}")).await;
591 assert!(output.contains("max=100ns"));
592
593 let stats = SlidingStats::<StatDuration>::new(Duration::from_secs(100));
594 stats.insert(Duration::from_micros(100).into()).await;
595 let output = stats.with_inner(|i| format!("{i}")).await;
596 assert!(output.contains("max=100µs"));
597
598 let stats = SlidingStats::<StatDuration>::new(Duration::from_secs(100));
599 stats.insert(Duration::from_millis(100).into()).await;
600 let output = stats.with_inner(|i| format!("{i}")).await;
601 assert!(output.contains("max=100ms"));
602
603 let stats = SlidingStats::<StatDuration>::new(Duration::from_secs(100));
604 stats.insert(Duration::from_secs(100).into()).await;
605 let output = stats.with_inner(|i| format!("{i}")).await;
606 assert!(output.contains("max=100s"));
607
608 let stats = SlidingStats::<StatDuration>::new(Duration::from_secs(100));
609 stats.insert(Duration::from_nanos(100).into()).await;
610 stats.insert(Duration::from_micros(100).into()).await;
611 stats.insert(Duration::from_millis(100).into()).await;
612 stats.insert(Duration::from_secs(100).into()).await;
613 let output = stats.with_inner(|i| format!("{i}")).await;
614 println!("{output}");
615 assert_eq!(output, "count=4, min=100ns, max=100s, avg=25.025025025s, p50=50.05ms, p90=100s, p95=100s, p99=100s, span=100s");
616 }
617}