prioritized_metered_channel/
unbounded.rs

1// Copyright 2017-2021 Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Metered variant of unbounded mpsc channels to be able to extract metrics.
18
19use futures::{
20	stream::Stream,
21	task::{Context, Poll},
22};
23
24#[cfg(feature = "async_channel")]
25use async_channel::{unbounded as unbounded_channel, Receiver, Sender, TryRecvError, TrySendError};
26
27#[cfg(feature = "futures_channel")]
28use futures::{
29	channel::mpsc::unbounded as unbounded_channel,
30	channel::mpsc::{
31		TryRecvError, TrySendError, UnboundedReceiver as Receiver, UnboundedSender as Sender,
32	},
33};
34
35use std::{pin::Pin, result};
36
37use super::{measure_tof_check, CoarseInstant, MaybeTimeOfFlight, Meter};
38
39/// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`.
40pub fn unbounded<T>() -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) {
41	let (tx, rx) = unbounded_channel::<MaybeTimeOfFlight<T>>();
42	let shared_meter = Meter::default();
43	let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx };
44	let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx };
45	(tx, rx)
46}
47
48/// A receiver tracking the messages consumed by itself.
49#[derive(Debug)]
50pub struct UnboundedMeteredReceiver<T> {
51	// count currently contained messages
52	meter: Meter,
53	inner: Receiver<MaybeTimeOfFlight<T>>,
54}
55
56impl<T> std::ops::Deref for UnboundedMeteredReceiver<T> {
57	type Target = Receiver<MaybeTimeOfFlight<T>>;
58	fn deref(&self) -> &Self::Target {
59		&self.inner
60	}
61}
62
63impl<T> std::ops::DerefMut for UnboundedMeteredReceiver<T> {
64	fn deref_mut(&mut self) -> &mut Self::Target {
65		&mut self.inner
66	}
67}
68
69impl<T> Stream for UnboundedMeteredReceiver<T> {
70	type Item = T;
71	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
72		match Receiver::poll_next(Pin::new(&mut self.inner), cx) {
73			Poll::Ready(maybe_value) => Poll::Ready(self.maybe_meter_tof(maybe_value)),
74			Poll::Pending => Poll::Pending,
75		}
76	}
77
78	/// Don't rely on the unreliable size hint.
79	fn size_hint(&self) -> (usize, Option<usize>) {
80		self.inner.size_hint()
81	}
82}
83
84impl<T> UnboundedMeteredReceiver<T> {
85	fn maybe_meter_tof(&mut self, maybe_value: Option<MaybeTimeOfFlight<T>>) -> Option<T> {
86		self.meter.note_received();
87		maybe_value.map(|value| {
88			match value {
89				MaybeTimeOfFlight::<T>::WithTimeOfFlight(value, tof_start) => {
90					// do not use `.elapsed()` of `std::time`, it may panic
91					// `coarsetime` does a saturating substractio for all `CoarseInstant`s
92					let duration = tof_start.elapsed();
93					self.meter.note_time_of_flight(duration);
94					value
95				},
96				MaybeTimeOfFlight::<T>::Bare(value) => value,
97			}
98			.into()
99		})
100	}
101
102	/// Get an updated accessor object for all metrics collected.
103	pub fn meter(&self) -> &Meter {
104		&self.meter
105	}
106
107	/// Attempt to receive the next item.
108	#[cfg(feature = "futures_channel")]
109	pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
110		match self.inner.try_next()? {
111			Some(value) => Ok(self.maybe_meter_tof(Some(value))),
112			None => Ok(None),
113		}
114	}
115
116	/// Attempt to receive the next item.
117	#[cfg(feature = "async_channel")]
118	pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
119		let result = match self.inner.try_recv() {
120			Ok(value) => Ok(self.maybe_meter_tof(Some(value))),
121			Err(err) => Err(err),
122		};
123
124		self.meter.note_channel_len(self.len());
125		result
126	}
127
128	/// Returns the current number of messages in the channel
129	#[cfg(feature = "async_channel")]
130	pub fn len(&self) -> usize {
131		self.inner.len()
132	}
133}
134
135impl<T> futures::stream::FusedStream for UnboundedMeteredReceiver<T> {
136	fn is_terminated(&self) -> bool {
137		self.inner.is_terminated()
138	}
139}
140
141/// The sender component, tracking the number of items
142/// sent across it.
143#[derive(Debug)]
144pub struct UnboundedMeteredSender<T> {
145	meter: Meter,
146	inner: Sender<MaybeTimeOfFlight<T>>,
147}
148
149impl<T> Clone for UnboundedMeteredSender<T> {
150	fn clone(&self) -> Self {
151		Self { meter: self.meter.clone(), inner: self.inner.clone() }
152	}
153}
154
155impl<T> std::ops::Deref for UnboundedMeteredSender<T> {
156	type Target = Sender<MaybeTimeOfFlight<T>>;
157	fn deref(&self) -> &Self::Target {
158		&self.inner
159	}
160}
161
162impl<T> std::ops::DerefMut for UnboundedMeteredSender<T> {
163	fn deref_mut(&mut self) -> &mut Self::Target {
164		&mut self.inner
165	}
166}
167
168impl<T> UnboundedMeteredSender<T> {
169	fn prepare_with_tof(&self, item: T) -> MaybeTimeOfFlight<T> {
170		let previous = self.meter.note_sent();
171		let item = if measure_tof_check(previous) {
172			MaybeTimeOfFlight::WithTimeOfFlight(item, CoarseInstant::now())
173		} else {
174			MaybeTimeOfFlight::Bare(item)
175		};
176
177		item
178	}
179
180	/// Get an updated accessor object for all metrics collected.
181	pub fn meter(&self) -> &Meter {
182		&self.meter
183	}
184
185	/// Attempt to send message or fail immediately.
186	#[cfg(feature = "futures_channel")]
187	pub fn unbounded_send(&self, msg: T) -> result::Result<(), TrySendError<MaybeTimeOfFlight<T>>> {
188		let msg = self.prepare_with_tof(msg);
189		self.inner.unbounded_send(msg).map_err(|e| {
190			self.meter.retract_sent();
191			e
192		})
193	}
194
195	/// Attempt to send message or fail immediately.
196	#[cfg(feature = "async_channel")]
197	pub fn unbounded_send(&self, msg: T) -> result::Result<(), TrySendError<MaybeTimeOfFlight<T>>> {
198		let msg = self.prepare_with_tof(msg);
199		let result = self.inner.try_send(msg).map_err(|e| {
200			self.meter.retract_sent();
201			e
202		});
203
204		self.meter.note_channel_len(self.len());
205		result
206	}
207
208	/// Returns the current number of messages in the channel
209	#[cfg(feature = "async_channel")]
210	pub fn len(&self) -> usize {
211		self.inner.len()
212	}
213}