prioritized_metered_channel/
lib.rs

1// Copyright 2017-2021 Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Metered variant of mpsc channels to be able to extract metrics.
18#![allow(clippy::all)]
19
20#[cfg(all(feature = "async_channel", feature = "futures_channel",))]
21compile_error!("`async_channel` and `futures_channel` are mutually exclusive features");
22
23#[cfg(not(any(feature = "async_channel", feature = "futures_channel")))]
24compile_error!("Must build with either `async_channel` or `futures_channel` features");
25
26use std::sync::{
27	atomic::{AtomicUsize, Ordering},
28	Arc,
29};
30
31use derive_more::Display;
32
33mod bounded;
34pub mod oneshot;
35mod unbounded;
36
37pub use self::{bounded::*, unbounded::*};
38
39pub use coarsetime::Duration as CoarseDuration;
40use coarsetime::Instant as CoarseInstant;
41
42#[cfg(test)]
43mod tests;
44
45/// Defines the maximum number of time of flight values to be stored.
46const TOF_QUEUE_SIZE: usize = 100;
47
48/// A peek into the inner state of a meter.
49#[derive(Debug, Clone)]
50pub struct Meter {
51	// Number of sends on this channel.
52	sent: Arc<AtomicUsize>,
53	// Number of receives on this channel.
54	received: Arc<AtomicUsize>,
55	#[cfg(feature = "async_channel")]
56	// Number of elements in the channel.
57	channel_len: Arc<AtomicUsize>,
58	// Number of times senders blocked while sending messages to a subsystem.
59	blocked: Arc<AtomicUsize>,
60	// Atomic ringbuffer of the last `TOF_QUEUE_SIZE` time of flight values
61	tof: Arc<crossbeam_queue::ArrayQueue<CoarseDuration>>,
62}
63
64impl std::default::Default for Meter {
65	fn default() -> Self {
66		Self {
67			sent: Arc::new(AtomicUsize::new(0)),
68			received: Arc::new(AtomicUsize::new(0)),
69			#[cfg(feature = "async_channel")]
70			channel_len: Arc::new(AtomicUsize::new(0)),
71			blocked: Arc::new(AtomicUsize::new(0)),
72			tof: Arc::new(crossbeam_queue::ArrayQueue::new(TOF_QUEUE_SIZE)),
73		}
74	}
75}
76
77/// A readout of sizes from the meter. Note that it is possible, due to asynchrony, for received
78/// to be slightly higher than sent.
79#[derive(Debug, Display, Clone, Default, PartialEq)]
80#[display(fmt = "(sent={} received={})", sent, received)]
81pub struct Readout {
82	/// The amount of messages sent on the channel, in aggregate.
83	pub sent: usize,
84	/// The amount of messages received on the channel, in aggregate.
85	pub received: usize,
86	/// An approximation of the queue size.
87	pub channel_len: usize,
88	/// How many times the caller blocked when sending messages.
89	pub blocked: usize,
90	/// Time of flight in micro seconds (us)
91	pub tof: Vec<CoarseDuration>,
92}
93
94impl Meter {
95	/// Count the number of items queued up inside the channel.
96	pub fn read(&self) -> Readout {
97		// when obtaining we don't care much about off by one
98		// accuracy
99		let sent = self.sent.load(Ordering::Relaxed);
100		let received = self.received.load(Ordering::Relaxed);
101
102		#[cfg(feature = "async_channel")]
103		let channel_len = self.channel_len.load(Ordering::Relaxed);
104		#[cfg(feature = "futures_channel")]
105		let channel_len = sent.saturating_sub(received);
106
107		Readout {
108			sent,
109			received,
110			channel_len,
111			blocked: self.blocked.load(Ordering::Relaxed),
112			tof: {
113				let mut acc = Vec::with_capacity(self.tof.len());
114				while let Some(value) = self.tof.pop() {
115					acc.push(value)
116				}
117				acc
118			},
119		}
120	}
121
122	fn note_sent(&self) -> usize {
123		self.sent.fetch_add(1, Ordering::Relaxed)
124	}
125
126	#[cfg(feature = "async_channel")]
127	fn note_channel_len(&self, len: usize) {
128		self.channel_len.store(len, Ordering::Relaxed)
129	}
130
131	fn retract_sent(&self) {
132		self.sent.fetch_sub(1, Ordering::Relaxed);
133	}
134
135	fn note_received(&self) {
136		self.received.fetch_add(1, Ordering::Relaxed);
137	}
138
139	fn note_blocked(&self) {
140		self.blocked.fetch_add(1, Ordering::Relaxed);
141	}
142
143	fn note_time_of_flight(&self, tof: CoarseDuration) {
144		let _ = self.tof.force_push(tof);
145	}
146
147	#[cfg(feature = "futures_channel")]
148	fn calculate_channel_len(&self) -> usize {
149		let sent = self.sent.load(Ordering::Relaxed);
150		let received = self.received.load(Ordering::Relaxed);
151		sent.saturating_sub(received) as usize
152	}
153}
154
155/// Determine if this instance shall be measured
156#[inline(always)]
157fn measure_tof_check(nth: usize) -> bool {
158	if cfg!(test) {
159		// for tests, be deterministic and pick every second
160		nth & 0x01 == 0
161	} else {
162		use nanorand::Rng;
163		let mut rng = nanorand::WyRand::new_seed(nth as u64);
164
165		// measure 5.3% (we ignore the fact that 2^64 cannot be represented as f64)
166		const PROB: u64 = (u64::MAX as f64 * 0.053_f64) as u64;
167		let coin = rng.generate::<u64>();
168
169		coin >= PROB
170	}
171}
172
173/// Measure the time of flight between insertion and removal
174/// of a single type `T`
175
176#[derive(Debug)]
177pub enum MaybeTimeOfFlight<T: Sized> {
178	Bare(T),
179	WithTimeOfFlight(T, CoarseInstant),
180}
181
182impl<T> From<T> for MaybeTimeOfFlight<T> {
183	fn from(value: T) -> Self {
184		Self::Bare(value)
185	}
186}
187
188// Has some unexplicable conflict with a wildcard impl of std
189impl<T> MaybeTimeOfFlight<T> {
190	/// Extract the inner `T` value.
191	pub fn into(self) -> T {
192		match self {
193			Self::Bare(value) => value,
194			Self::WithTimeOfFlight(value, _tof_start) => value,
195		}
196	}
197}
198
199pub fn prepare_with_tof<T>(meter: &Meter, item: T) -> MaybeTimeOfFlight<T> {
200	let previous = meter.note_sent();
201	let item = if measure_tof_check(previous) {
202		MaybeTimeOfFlight::WithTimeOfFlight(item, CoarseInstant::now())
203	} else {
204		MaybeTimeOfFlight::Bare(item)
205	};
206	item
207}
208
209impl<T> std::ops::Deref for MaybeTimeOfFlight<T> {
210	type Target = T;
211	fn deref(&self) -> &Self::Target {
212		match self {
213			Self::Bare(ref value) => value,
214			Self::WithTimeOfFlight(ref value, _tof_start) => value,
215		}
216	}
217}