referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/common/
sliding_stat.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Logging helper. Sliding window statistics with retention-based pruning.
20//!
21//! `SlidingStats<T>` tracks timestamped values and computes statistical summaries
22//! (min, max, average, percentiles, count) over a rolling time window.
23//!
24//! Old entries are automatically pruned based on a configurable retention `Duration`.
25//! Values can be logged periodically using `insert_with_log` or the `insert_and_log_throttled!`
26//! macro.
27
28use std::{
29	collections::{BTreeSet, HashMap, VecDeque},
30	fmt::Display,
31	sync::Arc,
32	time::{Duration, Instant},
33};
34use tokio::sync::RwLock;
35
36mod sealed {
37	pub trait HasDefaultStatFormatter {}
38}
39
40impl sealed::HasDefaultStatFormatter for u32 {}
41impl sealed::HasDefaultStatFormatter for i64 {}
42
43pub trait StatFormatter {
44	fn format_stat(value: f64) -> String;
45}
46
47impl<T> StatFormatter for T
48where
49	T: Display + sealed::HasDefaultStatFormatter,
50{
51	fn format_stat(value: f64) -> String {
52		format!("{value:.2}")
53	}
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
57pub struct StatDuration(pub std::time::Duration);
58
59impl Into<f64> for StatDuration {
60	fn into(self) -> f64 {
61		self.0.as_secs_f64()
62	}
63}
64
65impl Into<StatDuration> for Duration {
66	fn into(self) -> StatDuration {
67		StatDuration(self)
68	}
69}
70
71impl std::fmt::Display for StatDuration {
72	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73		write!(f, "{:?}", self.0)
74	}
75}
76
77impl StatFormatter for StatDuration {
78	fn format_stat(value: f64) -> String {
79		format!("{:?}", Duration::from_secs_f64(value))
80	}
81}
82
83/// Sliding window statistics collector.
84///
85/// `SlidingStats<T>` maintains a rolling buffer of values with timestamps,
86/// automatically pruning values older than the configured `retention` period.
87/// It provides percentile queries (e.g., p50, p95), min/max, average, and count.
88pub struct SlidingStats<T> {
89	inner: Arc<RwLock<Inner<T>>>,
90}
91
92/// Sync version of `SlidingStats`
93pub struct SyncSlidingStats<T> {
94	inner: Arc<parking_lot::RwLock<Inner<T>>>,
95}
96
97/// A type alias for `SlidingStats` specialized for durations with human-readable formatting.
98///
99/// Wraps `std::time::Duration` values using `StatDuration`, allowing for statistical summaries
100/// (e.g. p50, p95, average) to be displayed in units like nanoseconds, milliseconds, or seconds.
101pub type DurationSlidingStats = SlidingStats<StatDuration>;
102
103/// Sync version of `DurationSlidingStats`
104pub type SyncDurationSlidingStats = SyncSlidingStats<StatDuration>;
105
106/// Internal state of the statistics buffer.
107pub struct Inner<T> {
108	/// How long to retain items after insertion.
109	retention: Duration,
110
111	/// Counter to assign unique ids to each entry.
112	next_id: usize,
113
114	/// Maps id to actual value + timestamp.
115	entries: HashMap<usize, Entry<T>>,
116
117	/// Queue of IDs in insertion order for expiration.
118	by_time: VecDeque<usize>,
119
120	/// Set of values with ids, ordered by value.
121	by_value: BTreeSet<(T, usize)>,
122
123	/// The time stamp of most recent insertion with log.
124	///
125	/// Used to throttle debug messages.
126	last_log: Option<Instant>,
127}
128
129impl<T> Default for Inner<T> {
130	fn default() -> Self {
131		Self {
132			retention: Default::default(),
133			next_id: Default::default(),
134			entries: Default::default(),
135			by_time: Default::default(),
136			by_value: Default::default(),
137			last_log: None,
138		}
139	}
140}
141
142impl<T> Display for Inner<T>
143where
144	T: Ord + Copy + Into<f64> + std::fmt::Display + StatFormatter,
145{
146	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147		let mut parts = Vec::new();
148
149		parts.push(format!("count={}", self.count()));
150		if let Some(min) = self.min() {
151			parts.push(format!("min={}", min));
152		}
153		if let Some(max) = self.max() {
154			parts.push(format!("max={}", max));
155		}
156		if let Some(avg) = self.avg() {
157			parts.push(format!("avg={}", <T as StatFormatter>::format_stat(avg)));
158		}
159
160		for p in [50, 90, 95, 99] {
161			let val = self.percentile(p);
162			if val.is_finite() {
163				parts.push(format!("p{}={}", p, <T as StatFormatter>::format_stat(val)));
164			}
165		}
166		parts.push(format!("span={:?}", self.retention));
167		write!(f, "{}", parts.join(", "))
168	}
169}
170
171/// A value inserted into the buffer, along with its insertion time.
172#[derive(Clone, Copy)]
173struct Entry<T> {
174	timestamp: Instant,
175	value: T,
176}
177
178impl<T> SlidingStats<T>
179where
180	T: Ord + Copy,
181{
182	/// Creates a new `SlidingStats` with the given retention duration.
183	pub fn new(retention: Duration) -> Self {
184		Self { inner: Arc::new(RwLock::new(Inner { retention, ..Default::default() })) }
185	}
186
187	/// Inserts a value into the buffer, timestamped with `Instant::now()`.
188	///
189	/// May trigger pruning of old items.
190	#[cfg(test)]
191	pub async fn insert(&self, value: T) {
192		self.inner.write().await.insert(value)
193	}
194
195	/// Inserts a value into the buffer with provided timestamp.
196	///
197	/// May trigger pruning of old items.
198	#[cfg(test)]
199	pub async fn insert_using_timestamp(&self, value: T, now: Instant) {
200		self.inner.write().await.insert_using_timestamp(value, now)
201	}
202
203	#[cfg(test)]
204	pub async fn len(&self) -> usize {
205		self.inner.read().await.len()
206	}
207
208	/// Grants temporary read-only access to the locked inner structure,
209	/// passing it into the provided closure.
210	///
211	/// Intended to dump stats and prune inner based on current timestamp.
212	#[cfg(test)]
213	pub async fn with_inner<R>(&self, f: impl FnOnce(&mut Inner<T>) -> R) -> R {
214		let mut guard = self.inner.write().await;
215		f(&mut *guard)
216	}
217}
218
219impl<T> SyncSlidingStats<T>
220where
221	T: Ord + Copy,
222{
223	/// Creates a new `SlidingStats` with the given retention duration.
224	pub fn new(retention: Duration) -> Self {
225		Self {
226			inner: Arc::new(parking_lot::RwLock::new(Inner { retention, ..Default::default() })),
227		}
228	}
229}
230
231impl<T> SlidingStats<T>
232where
233	T: Ord + Copy + Into<f64> + std::fmt::Display + StatFormatter,
234{
235	/// Inserts a value and optionally returns a formatted log string of the current stats.
236	///
237	/// If enough time has passed since the last log (determined by `log_interval` or retention),
238	/// this method returns `Some(log_string)`, otherwise it returns `None`.
239	///
240	/// This method performs:
241	/// - Automatic pruning of expired entries
242	/// - Throttling via `last_log` timestamp
243	///
244	///  Note: The newly inserted value may not  be included in the returned summary.
245	pub async fn insert_with_log(
246		&self,
247		value: T,
248		log_interval: Option<Duration>,
249		now: Instant,
250	) -> Option<String> {
251		let mut inner = self.inner.write().await;
252		inner.insert_with_log(value, log_interval, now)
253	}
254}
255
256impl<T> SyncSlidingStats<T>
257where
258	T: Ord + Copy + Into<f64> + std::fmt::Display + StatFormatter,
259{
260	pub fn insert_with_log(
261		&self,
262		value: T,
263		log_interval: Option<Duration>,
264		now: Instant,
265	) -> Option<String> {
266		let mut inner = self.inner.write();
267		inner.insert_with_log(value, log_interval, now)
268	}
269}
270
271impl<T> Inner<T>
272where
273	T: Ord + Copy,
274{
275	#[cfg(test)]
276	fn insert(&mut self, value: T) {
277		self.insert_using_timestamp(value, Instant::now())
278	}
279
280	/// Refer to [`SlidingStats::insert_using_timestamp`]
281	fn insert_using_timestamp(&mut self, value: T, now: Instant) {
282		let id = self.next_id;
283		self.next_id += 1;
284
285		let entry = Entry { timestamp: now, value };
286
287		self.entries.insert(id, entry);
288		self.by_time.push_back(id);
289		self.by_value.insert((value, id));
290
291		self.prune(now);
292	}
293
294	/// Returns the minimum value in the current window.
295	pub fn min(&self) -> Option<T> {
296		self.by_value.first().map(|(v, _)| *v)
297	}
298
299	/// Returns the maximum value in the current window.
300	pub fn max(&self) -> Option<T> {
301		self.by_value.last().map(|(v, _)| *v)
302	}
303
304	/// Returns the number of items currently retained.
305	pub fn count(&self) -> usize {
306		self.len()
307	}
308
309	/// Explicitly prunes expired items from the buffer.
310	///
311	/// This is also called automatically during insertions.
312	pub fn prune(&mut self, now: Instant) {
313		let cutoff = now - self.retention;
314
315		while let Some(&oldest_id) = self.by_time.front() {
316			let expired = match self.entries.get(&oldest_id) {
317				Some(entry) => entry.timestamp < cutoff,
318				None => {
319					debug_assert!(false);
320					true
321				},
322			};
323
324			if !expired {
325				break;
326			}
327
328			if let Some(entry) = self.entries.remove(&oldest_id) {
329				self.by_value.remove(&(entry.value, oldest_id));
330			} else {
331				debug_assert!(false);
332			}
333			self.by_time.pop_front();
334		}
335	}
336
337	pub fn len(&self) -> usize {
338		debug_assert_eq!(self.entries.len(), self.by_time.len());
339		debug_assert_eq!(self.entries.len(), self.by_value.len());
340		self.entries.len()
341	}
342}
343
344impl<T> Inner<T>
345where
346	T: Ord + Copy + Into<f64>,
347{
348	/// Returns the average (mean) of values in the current window.
349	pub fn avg(&self) -> Option<f64> {
350		let len = self.len();
351		if len == 0 {
352			None
353		} else {
354			Some(self.entries.values().map(|e| e.value.into()).sum::<f64>() / len as f64)
355		}
356	}
357
358	/// Returns the value at the given percentile (e.g., 0.5 for p50).
359	///
360	/// Returns `None` if the buffer is empty.
361	// note: copied from: https://docs.rs/statrs/0.18.0/src/statrs/statistics/slice_statistics.rs.html#164-182
362	pub fn percentile(&self, percentile: usize) -> f64 {
363		if self.len() == 0 || percentile > 100 {
364			return f64::NAN;
365		}
366
367		let tau = percentile as f64 / 100.0;
368		let len = self.len();
369
370		let h = (len as f64 + 1.0 / 3.0) * tau + 1.0 / 3.0;
371		let hf = h as i64;
372
373		if hf <= 0 || percentile == 0 {
374			return self.min().map(|v| v.into()).unwrap_or(f64::NAN);
375		}
376
377		if hf >= len as i64 || percentile == 100 {
378			return self.max().map(|v| v.into()).unwrap_or(f64::NAN);
379		}
380
381		let mut iter = self.by_value.iter().map(|(v, _)| (*v).into());
382
383		let a = iter.nth((hf as usize).saturating_sub(1)).unwrap_or(f64::NAN);
384		let b = iter.next().unwrap_or(f64::NAN);
385
386		a + (h - hf as f64) * (b - a)
387	}
388}
389
390impl<T> Inner<T>
391where
392	T: Ord + Copy + Into<f64> + std::fmt::Display + StatFormatter,
393{
394	/// Refer to [`SlidingStats::insert_with_log`]
395	pub fn insert_with_log(
396		&mut self,
397		value: T,
398		log_interval: Option<Duration>,
399		now: Instant,
400	) -> Option<String> {
401		let Some(last_log) = self.last_log else {
402			self.last_log = Some(now);
403			self.insert_using_timestamp(value, now);
404			return None;
405		};
406
407		let log_interval = log_interval.unwrap_or(self.retention);
408		let should_log = now.duration_since(last_log) >= log_interval;
409		let result = should_log.then(|| {
410			self.last_log = Some(now);
411			format!("{self}")
412		});
413		self.insert_using_timestamp(value, now);
414		result
415	}
416}
417
418impl<T> Clone for SlidingStats<T> {
419	fn clone(&self) -> Self {
420		Self { inner: Arc::clone(&self.inner) }
421	}
422}
423
424impl<T> Clone for SyncSlidingStats<T> {
425	fn clone(&self) -> Self {
426		Self { inner: Arc::clone(&self.inner) }
427	}
428}
429
430/// Inserts a value into a `SlidingStats` and conditionally logs the current stats using `tracing`.
431///
432/// This macro inserts the given `$value` into the `$stats` collector only if tracing is enabled
433/// for the given `$target` and `$level`. The log will be emiited only if enough time has passed
434/// since the last logged output (as tracked by the internal last_log timestamp).
435///
436/// The macro respects throttling: stats will not be logged more frequently than either the
437/// explicitly provided `log_interval` or the stats' retention period (if no interval is given).
438///
439/// Note that:
440/// - Logging is skipped unless `tracing::enabled!` returns true for the target and level.
441/// - All entries older than the retention period will be logged and pruned,
442/// - The newly inserted value may not be included in the logged statistics output (it is inserted
443///   *after* the log decision).
444#[macro_export]
445macro_rules! insert_and_log_throttled {
446    (
447        $level:expr,
448        target: $target:expr,
449        log_interval: $log_interval:expr,
450        prefix: $prefix:expr,
451        $stats:expr,
452        $value:expr
453    ) => {{
454        if tracing::enabled!(target: $target, $level) {
455            let now = Instant::now();
456            if let Some(msg) = $stats.insert_with_log($value, Some($log_interval), now).await {
457                tracing::event!(target: $target, $level, "{}: {}", $prefix, msg);
458            }
459        }
460    }};
461
462    (
463        $level:expr,
464        target: $target:expr,
465        prefix: $prefix:expr,
466        $stats:expr,
467        $value:expr
468    ) => {{
469        if tracing::enabled!(target: $target, $level) {
470            let now = std::time::Instant::now();
471            if let Some(msg) = $stats.insert_with_log($value, None, now).await {
472                tracing::event!(target: $target, $level, "{}: {}", $prefix, msg);
473            }
474        }
475    }};
476}
477
478/// Sync version of `insert_and_log_throttled`
479#[macro_export]
480macro_rules! insert_and_log_throttled_sync {
481    (
482        $level:expr,
483        target: $target:literal,
484        prefix: $prefix:expr,
485        $stats:expr,
486        $value:expr
487    ) => {{
488        if tracing::enabled!(target: $target, $level) {
489            let now = std::time::Instant::now();
490            if let Some(msg) = $stats.insert_with_log($value, None, now){
491                tracing::event!(target: $target, $level, "{}: {}", $prefix, msg);
492            }
493        }
494    }};
495}
496
497#[cfg(test)]
498mod test {
499	use super::*;
500	use std::time::{Duration, Instant};
501
502	#[tokio::test]
503	async fn retention_prunes_old_items() {
504		let stats = SlidingStats::<u64>::new(Duration::from_secs(10));
505
506		let base = Instant::now();
507		for i in 0..5 {
508			stats.insert_using_timestamp(i * 10, base + Duration::from_secs(i * 5)).await;
509		}
510		assert_eq!(stats.len().await, 3);
511
512		stats.insert_using_timestamp(999, base + Duration::from_secs(26)).await;
513
514		assert_eq!(stats.len().await, 2);
515	}
516
517	#[tokio::test]
518	async fn retention_prunes_old_items2() {
519		let stats = SlidingStats::<u64>::new(Duration::from_secs(10));
520
521		let base = Instant::now();
522		for i in 0..100 {
523			stats.insert_using_timestamp(i * 10, base + Duration::from_secs(5)).await;
524		}
525		assert_eq!(stats.len().await, 100);
526
527		stats.insert_using_timestamp(999, base + Duration::from_secs(16)).await;
528
529		let len = stats.len().await;
530		assert_eq!(len, 1);
531	}
532
533	#[tokio::test]
534	async fn insert_with_log_message_contains_all_old_items() {
535		let stats = SlidingStats::<u32>::new(Duration::from_secs(100));
536
537		let base = Instant::now();
538		for _ in 0..10 {
539			stats.insert_with_log(1, None, base + Duration::from_secs(5)).await;
540		}
541		assert_eq!(stats.len().await, 10);
542
543		let output = stats.insert_with_log(1, None, base + Duration::from_secs(200)).await.unwrap();
544		assert!(output.contains("count=10"));
545
546		let len = stats.len().await;
547		assert_eq!(len, 1);
548	}
549
550	#[tokio::test]
551	async fn insert_with_log_message_prunes_all_old_items() {
552		let stats = SlidingStats::<u32>::new(Duration::from_secs(25));
553
554		let base = Instant::now();
555		for i in 0..10 {
556			stats.insert_with_log(1, None, base + Duration::from_secs(i * 5)).await;
557		}
558		assert_eq!(stats.len().await, 6);
559
560		let output = stats.insert_with_log(1, None, base + Duration::from_secs(200)).await.unwrap();
561		assert!(output.contains("count=6"));
562
563		let len = stats.len().await;
564		assert_eq!(len, 1);
565	}
566
567	#[tokio::test]
568	async fn test_avg_min_max() {
569		let stats = SlidingStats::<u32>::new(Duration::from_secs(100));
570		let base = Instant::now();
571
572		stats.insert_using_timestamp(10, base).await;
573		stats.insert_using_timestamp(20, base + Duration::from_secs(1)).await;
574		stats.insert_using_timestamp(30, base + Duration::from_secs(2)).await;
575
576		stats
577			.with_inner(|inner| {
578				assert_eq!(inner.count(), 3);
579				assert_eq!(inner.avg(), Some(20.0));
580				assert_eq!(inner.min(), Some(10));
581				assert_eq!(inner.max(), Some(30));
582			})
583			.await;
584	}
585
586	#[tokio::test]
587	async fn duration_format() {
588		let stats = SlidingStats::<StatDuration>::new(Duration::from_secs(100));
589		stats.insert(Duration::from_nanos(100).into()).await;
590		let output = stats.with_inner(|i| format!("{i}")).await;
591		assert!(output.contains("max=100ns"));
592
593		let stats = SlidingStats::<StatDuration>::new(Duration::from_secs(100));
594		stats.insert(Duration::from_micros(100).into()).await;
595		let output = stats.with_inner(|i| format!("{i}")).await;
596		assert!(output.contains("max=100µs"));
597
598		let stats = SlidingStats::<StatDuration>::new(Duration::from_secs(100));
599		stats.insert(Duration::from_millis(100).into()).await;
600		let output = stats.with_inner(|i| format!("{i}")).await;
601		assert!(output.contains("max=100ms"));
602
603		let stats = SlidingStats::<StatDuration>::new(Duration::from_secs(100));
604		stats.insert(Duration::from_secs(100).into()).await;
605		let output = stats.with_inner(|i| format!("{i}")).await;
606		assert!(output.contains("max=100s"));
607
608		let stats = SlidingStats::<StatDuration>::new(Duration::from_secs(100));
609		stats.insert(Duration::from_nanos(100).into()).await;
610		stats.insert(Duration::from_micros(100).into()).await;
611		stats.insert(Duration::from_millis(100).into()).await;
612		stats.insert(Duration::from_secs(100).into()).await;
613		let output = stats.with_inner(|i| format!("{i}")).await;
614		println!("{output}");
615		assert_eq!(output, "count=4, min=100ns, max=100s, avg=25.025025025s, p50=50.05ms, p90=100s, p95=100s, p99=100s, span=100s");
616	}
617}