referrerpolicy=no-referrer-when-downgrade

sc_network/service/
out_events.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Registering events streams.
20//!
21//! This code holds the logic that is used for the network service to inform other parts of
22//! Substrate about what is happening.
23//!
24//! # Usage
25//!
26//! - Create an instance of [`OutChannels`].
27//! - Create channels using the [`channel`] function. The receiving side implements the `Stream`
28//! trait.
29//! - You cannot directly send an event on a sender. Instead, you have to call
30//! [`OutChannels::push`] to put the sender within a [`OutChannels`].
31//! - Send events by calling [`OutChannels::send`]. Events are cloned for each sender in the
32//! collection.
33
34use crate::event::Event;
35
36use futures::{prelude::*, ready, stream::FusedStream};
37use log::{debug, error};
38use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64};
39use std::{
40	backtrace::Backtrace,
41	cell::RefCell,
42	fmt,
43	pin::Pin,
44	task::{Context, Poll},
45};
46
47/// Log target for this file.
48pub const LOG_TARGET: &str = "sub-libp2p::out_events";
49
50/// Creates a new channel that can be associated to a [`OutChannels`].
51///
52/// The name is used in Prometheus reports, the queue size threshold is used
53/// to warn if there are too many unprocessed events in the channel.
54pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) {
55	let (tx, rx) = async_channel::unbounded();
56	let tx = Sender {
57		inner: tx,
58		name,
59		queue_size_warning,
60		warning_fired: SenderWarningState::NotFired,
61		creation_backtrace: Backtrace::force_capture(),
62		metrics: None,
63	};
64	let rx = Receiver { inner: rx, name, metrics: None };
65	(tx, rx)
66}
67
68/// A state of a sender warning that is used to avoid spamming the logs.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70enum SenderWarningState {
71	/// The warning has not been fired yet.
72	NotFired,
73	/// The warning has been fired, and the channel is full
74	FiredFull,
75	/// The warning has been fired and the channel is not full anymore.
76	FiredFree,
77}
78
79/// Sending side of a channel.
80///
81/// Must be associated with an [`OutChannels`] before anything can be sent on it
82///
83/// > **Note**: Contrary to regular channels, this `Sender` is purposefully designed to not
84/// implement the `Clone` trait e.g. in Order to not complicate the logic keeping the metrics in
85/// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**.
86pub struct Sender {
87	inner: async_channel::Sender<Event>,
88	/// Name to identify the channel (e.g., in Prometheus and logs).
89	name: &'static str,
90	/// Threshold queue size to generate an error message in the logs.
91	queue_size_warning: usize,
92	/// We generate the error message only once to not spam the logs after the first error.
93	/// Subsequently we indicate channel fullness on debug level.
94	warning_fired: SenderWarningState,
95	/// Backtrace of a place where the channel was created.
96	creation_backtrace: Backtrace,
97	/// Clone of [`Receiver::metrics`]. Will be initialized when [`Sender`] is added to
98	/// [`OutChannels`] with `OutChannels::push()`.
99	metrics: Option<Metrics>,
100}
101
102impl fmt::Debug for Sender {
103	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
104		f.debug_tuple("Sender").finish()
105	}
106}
107
108impl Drop for Sender {
109	fn drop(&mut self) {
110		if let Some(metrics) = self.metrics.as_ref() {
111			metrics.num_channels.with_label_values(&[self.name]).dec();
112		}
113	}
114}
115
116/// Receiving side of a channel.
117pub struct Receiver {
118	inner: async_channel::Receiver<Event>,
119	name: &'static str,
120	/// Initially contains `None`, and will be set to a value once the corresponding [`Sender`]
121	/// is assigned to an instance of [`OutChannels`].
122	metrics: Option<Metrics>,
123}
124
125impl Stream for Receiver {
126	type Item = Event;
127
128	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Event>> {
129		if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
130			if let Some(metrics) = &self.metrics {
131				metrics.event_out(&ev, self.name);
132			}
133			Poll::Ready(Some(ev))
134		} else {
135			Poll::Ready(None)
136		}
137	}
138}
139
140impl fmt::Debug for Receiver {
141	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142		f.debug_tuple("Receiver").finish()
143	}
144}
145
146impl Drop for Receiver {
147	fn drop(&mut self) {
148		if !self.inner.is_terminated() {
149			// Empty the list to properly decrease the metrics.
150			while let Some(Some(_)) = self.next().now_or_never() {}
151		}
152	}
153}
154
155/// Collection of senders.
156pub struct OutChannels {
157	event_streams: Vec<Sender>,
158	/// The metrics we collect. A clone of this is sent to each [`Receiver`] associated with this
159	/// object.
160	metrics: Option<Metrics>,
161}
162
163impl OutChannels {
164	/// Creates a new empty collection of senders.
165	pub fn new(registry: Option<&Registry>) -> Result<Self, PrometheusError> {
166		let metrics =
167			if let Some(registry) = registry { Some(Metrics::register(registry)?) } else { None };
168
169		Ok(Self { event_streams: Vec::new(), metrics })
170	}
171
172	/// Adds a new [`Sender`] to the collection.
173	pub fn push(&mut self, mut sender: Sender) {
174		debug_assert!(sender.metrics.is_none());
175		sender.metrics = self.metrics.clone();
176
177		if let Some(metrics) = &self.metrics {
178			metrics.num_channels.with_label_values(&[sender.name]).inc();
179		}
180
181		self.event_streams.push(sender);
182	}
183
184	/// Sends an event.
185	pub fn send(&mut self, event: Event) {
186		self.event_streams.retain_mut(|sender| {
187			let current_pending = sender.inner.len();
188			if current_pending >= sender.queue_size_warning {
189				if sender.warning_fired == SenderWarningState::NotFired {
190					error!(
191						"The number of unprocessed events in channel `{}` exceeded {}.\n\
192						 The channel was created at:\n{:}\n
193						 The last event was sent from:\n{:}",
194						sender.name,
195						sender.queue_size_warning,
196						sender.creation_backtrace,
197						Backtrace::force_capture(),
198					);
199				} else if sender.warning_fired == SenderWarningState::FiredFree {
200					// We don't want to spam the logs, so we only log on debug level
201					debug!(
202						target: LOG_TARGET,
203						"Channel `{}` is overflowed again. Number of events: {}",
204						sender.name, current_pending
205					);
206				}
207				sender.warning_fired = SenderWarningState::FiredFull;
208			} else if sender.warning_fired == SenderWarningState::FiredFull &&
209				current_pending < sender.queue_size_warning.wrapping_div(2)
210			{
211				sender.warning_fired = SenderWarningState::FiredFree;
212				debug!(
213					target: LOG_TARGET,
214					"Channel `{}` is no longer overflowed. Number of events: {}",
215					sender.name, current_pending
216				);
217			}
218
219			sender.inner.try_send(event.clone()).is_ok()
220		});
221
222		if let Some(metrics) = &self.metrics {
223			for ev in &self.event_streams {
224				metrics.event_in(&event, ev.name);
225			}
226		}
227	}
228}
229
230impl fmt::Debug for OutChannels {
231	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
232		f.debug_struct("OutChannels")
233			.field("num_channels", &self.event_streams.len())
234			.finish()
235	}
236}
237
238#[derive(Clone)]
239struct Metrics {
240	// This list is ordered alphabetically
241	events_total: CounterVec<U64>,
242	notifications_sizes: CounterVec<U64>,
243	num_channels: GaugeVec<U64>,
244}
245
246thread_local! {
247	static LABEL_BUFFER: RefCell<String> = RefCell::new(String::new());
248}
249
250fn format_label(prefix: &str, protocol: &str, callback: impl FnOnce(&str)) {
251	LABEL_BUFFER.with(|label_buffer| {
252		let mut label_buffer = label_buffer.borrow_mut();
253		label_buffer.clear();
254		label_buffer.reserve(prefix.len() + protocol.len() + 2);
255		label_buffer.push_str(prefix);
256		label_buffer.push('"');
257		label_buffer.push_str(protocol);
258		label_buffer.push('"');
259		callback(&label_buffer);
260	});
261}
262
263impl Metrics {
264	fn register(registry: &Registry) -> Result<Self, PrometheusError> {
265		Ok(Self {
266			events_total: register(CounterVec::new(
267				Opts::new(
268					"substrate_sub_libp2p_out_events_events_total",
269					"Number of broadcast network events that have been sent or received across all \
270					 channels"
271				),
272				&["event_name", "action", "name"]
273			)?, registry)?,
274			notifications_sizes: register(CounterVec::new(
275				Opts::new(
276					"substrate_sub_libp2p_out_events_notifications_sizes",
277					"Size of notification events that have been sent or received across all \
278					 channels"
279				),
280				&["protocol", "action", "name"]
281			)?, registry)?,
282			num_channels: register(GaugeVec::new(
283				Opts::new(
284					"substrate_sub_libp2p_out_events_num_channels",
285					"Number of internal active channels that broadcast network events",
286				),
287				&["name"]
288			)?, registry)?,
289		})
290	}
291
292	fn event_in(&self, event: &Event, name: &str) {
293		match event {
294			Event::Dht(_) => {
295				self.events_total.with_label_values(&["dht", "sent", name]).inc();
296			},
297			Event::NotificationStreamOpened { protocol, .. } => {
298				format_label("notif-open-", protocol, |protocol_label| {
299					self.events_total.with_label_values(&[protocol_label, "sent", name]).inc();
300				});
301			},
302			Event::NotificationStreamClosed { protocol, .. } => {
303				format_label("notif-closed-", protocol, |protocol_label| {
304					self.events_total.with_label_values(&[protocol_label, "sent", name]).inc();
305				});
306			},
307			Event::NotificationsReceived { messages, .. } =>
308				for (protocol, message) in messages {
309					format_label("notif-", protocol, |protocol_label| {
310						self.events_total.with_label_values(&[protocol_label, "sent", name]).inc();
311					});
312					self.notifications_sizes
313						.with_label_values(&[protocol, "sent", name])
314						.inc_by(u64::try_from(message.len()).unwrap_or(u64::MAX));
315				},
316		}
317	}
318
319	fn event_out(&self, event: &Event, name: &str) {
320		match event {
321			Event::Dht(_) => {
322				self.events_total.with_label_values(&["dht", "received", name]).inc();
323			},
324			Event::NotificationStreamOpened { protocol, .. } => {
325				format_label("notif-open-", protocol, |protocol_label| {
326					self.events_total.with_label_values(&[protocol_label, "received", name]).inc();
327				});
328			},
329			Event::NotificationStreamClosed { protocol, .. } => {
330				format_label("notif-closed-", protocol, |protocol_label| {
331					self.events_total.with_label_values(&[protocol_label, "received", name]).inc();
332				});
333			},
334			Event::NotificationsReceived { messages, .. } =>
335				for (protocol, message) in messages {
336					format_label("notif-", protocol, |protocol_label| {
337						self.events_total
338							.with_label_values(&[protocol_label, "received", name])
339							.inc();
340					});
341					self.notifications_sizes
342						.with_label_values(&[protocol, "received", name])
343						.inc_by(u64::try_from(message.len()).unwrap_or(u64::MAX));
344				},
345		}
346	}
347}