referrerpolicy=no-referrer-when-downgrade

sc_telemetry/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate's client telemetry is a part of substrate that allows ingesting telemetry data
20//! with for example [Polkadot telemetry](https://github.com/paritytech/substrate-telemetry).
21//!
22//! It works using Tokio's [tracing](https://github.com/tokio-rs/tracing/) library. The telemetry
23//! information uses tracing's logging to report the telemetry data which is then retrieved by a
24//! tracing `Layer`. This layer will then send the data through an asynchronous channel to a
25//! background task called [`TelemetryWorker`] which will send the information to the configured
26//! remote telemetry servers.
27//!
28//! If multiple substrate nodes are running in the same process, it uses a `tracing::Span` to
29//! identify which substrate node is reporting the telemetry. Every task spawned using sc-service's
30//! `TaskManager` automatically inherit this span.
31//!
32//! Substrate's nodes initialize/register with the [`TelemetryWorker`] using a
33//! [`TelemetryWorkerHandle`]. This handle can be cloned and passed around. It uses an asynchronous
34//! channel to communicate with the running [`TelemetryWorker`] dedicated to registration.
35//! Registering can happen at any point in time during the process execution.
36
37#![warn(missing_docs)]
38
39use futures::{channel::mpsc, prelude::*};
40use libp2p::Multiaddr;
41use log::{error, warn};
42use parking_lot::Mutex;
43use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
44use serde::Serialize;
45use std::{
46	collections::{
47		hash_map::Entry::{Occupied, Vacant},
48		HashMap,
49	},
50	sync::{atomic, Arc},
51};
52
53pub use log;
54pub use serde_json;
55
56mod endpoints;
57mod error;
58mod node;
59mod transport;
60
61pub use endpoints::*;
62pub use error::*;
63use node::*;
64use transport::*;
65
66/// Substrate DEBUG log level.
67pub const SUBSTRATE_DEBUG: VerbosityLevel = 9;
68/// Substrate INFO log level.
69pub const SUBSTRATE_INFO: VerbosityLevel = 0;
70
71/// Consensus TRACE log level.
72pub const CONSENSUS_TRACE: VerbosityLevel = 9;
73/// Consensus DEBUG log level.
74pub const CONSENSUS_DEBUG: VerbosityLevel = 5;
75/// Consensus WARN log level.
76pub const CONSENSUS_WARN: VerbosityLevel = 4;
77/// Consensus INFO log level.
78pub const CONSENSUS_INFO: VerbosityLevel = 1;
79
80/// Telemetry message verbosity.
81pub type VerbosityLevel = u8;
82
83pub(crate) type Id = u64;
84pub(crate) type TelemetryPayload = serde_json::Map<String, serde_json::Value>;
85pub(crate) type TelemetryMessage = (Id, VerbosityLevel, TelemetryPayload);
86
87/// Message sent when the connection (re-)establishes.
88#[derive(Debug, Serialize)]
89pub struct ConnectionMessage {
90	/// Node's name.
91	pub name: String,
92	/// Node's implementation.
93	pub implementation: String,
94	/// Node's version.
95	pub version: String,
96	/// Node's configuration.
97	pub config: String,
98	/// Node's chain.
99	pub chain: String,
100	/// Node's genesis hash.
101	pub genesis_hash: String,
102	/// Node is an authority.
103	pub authority: bool,
104	/// Node's startup time.
105	pub startup_time: String,
106	/// Node's network ID.
107	pub network_id: String,
108
109	/// Node's OS.
110	pub target_os: String,
111
112	/// Node's ISA.
113	pub target_arch: String,
114
115	/// Node's target platform ABI or libc.
116	pub target_env: String,
117
118	/// Node's software and hardware information.
119	pub sysinfo: Option<SysInfo>,
120}
121
122/// Hardware and software information for the node.
123///
124/// Gathering most of this information is highly OS-specific,
125/// so most of the fields here are optional.
126#[derive(Debug, Serialize)]
127pub struct SysInfo {
128	/// The exact CPU model.
129	pub cpu: Option<String>,
130	/// The total amount of memory, in bytes.
131	pub memory: Option<u64>,
132	/// The number of physical CPU cores.
133	pub core_count: Option<u32>,
134	/// The Linux kernel version.
135	pub linux_kernel: Option<String>,
136	/// The exact Linux distribution used.
137	pub linux_distro: Option<String>,
138	/// Whether the node's running under a virtual machine.
139	pub is_virtual_machine: Option<bool>,
140}
141
142/// Telemetry worker.
143///
144/// It should run as a background task using the [`TelemetryWorker::run`] method. This method
145/// will consume the object and any further attempts of initializing a new telemetry through its
146/// handle will fail (without being fatal).
147#[derive(Debug)]
148pub struct TelemetryWorker {
149	message_receiver: mpsc::Receiver<TelemetryMessage>,
150	message_sender: mpsc::Sender<TelemetryMessage>,
151	register_receiver: TracingUnboundedReceiver<Register>,
152	register_sender: TracingUnboundedSender<Register>,
153	id_counter: Arc<atomic::AtomicU64>,
154}
155
156impl TelemetryWorker {
157	/// Instantiate a new [`TelemetryWorker`] which can run in background.
158	///
159	/// Only one is needed per process.
160	pub fn new(buffer_size: usize) -> Result<Self> {
161		// Let's try to initialize a transport to get an early return.
162		// Later transport will be initialized multiple times in
163		// `::process_register`, so it's a convenient way to get an
164		// error as early as possible.
165		let _transport = initialize_transport()?;
166		let (message_sender, message_receiver) = mpsc::channel(buffer_size);
167		let (register_sender, register_receiver) =
168			tracing_unbounded("mpsc_telemetry_register", 10_000);
169
170		Ok(Self {
171			message_receiver,
172			message_sender,
173			register_receiver,
174			register_sender,
175			id_counter: Arc::new(atomic::AtomicU64::new(1)),
176		})
177	}
178
179	/// Get a new [`TelemetryWorkerHandle`].
180	///
181	/// This is used when you want to register with the [`TelemetryWorker`].
182	pub fn handle(&self) -> TelemetryWorkerHandle {
183		TelemetryWorkerHandle {
184			message_sender: self.message_sender.clone(),
185			register_sender: self.register_sender.clone(),
186			id_counter: self.id_counter.clone(),
187		}
188	}
189
190	/// Run the telemetry worker.
191	///
192	/// This should be run in a background task.
193	pub async fn run(mut self) {
194		let mut node_map: HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>> = HashMap::new();
195		let mut node_pool: HashMap<Multiaddr, _> = HashMap::new();
196		let mut pending_connection_notifications: Vec<_> = Vec::new();
197
198		loop {
199			futures::select! {
200				message = self.message_receiver.next() => Self::process_message(
201					message,
202					&mut node_pool,
203					&node_map,
204				).await,
205				init_payload = self.register_receiver.next() => Self::process_register(
206					init_payload,
207					&mut node_pool,
208					&mut node_map,
209					&mut pending_connection_notifications,
210				).await,
211			}
212		}
213	}
214
215	async fn process_register(
216		input: Option<Register>,
217		node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
218		node_map: &mut HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
219		pending_connection_notifications: &mut Vec<(Multiaddr, ConnectionNotifierSender)>,
220	) {
221		let input = input.expect("the stream is never closed; qed");
222
223		match input {
224			Register::Telemetry { id, endpoints, connection_message } => {
225				let endpoints = endpoints.0;
226
227				let connection_message = match serde_json::to_value(&connection_message) {
228					Ok(serde_json::Value::Object(mut value)) => {
229						value.insert("msg".into(), "system.connected".into());
230						let mut obj = serde_json::Map::new();
231						obj.insert("id".to_string(), id.into());
232						obj.insert("payload".to_string(), value.into());
233						Some(obj)
234					},
235					Ok(_) => {
236						unreachable!("ConnectionMessage always serialize to an object; qed")
237					},
238					Err(err) => {
239						log::error!(
240							target: "telemetry",
241							"Could not serialize connection message: {}",
242							err,
243						);
244						None
245					},
246				};
247
248				for (addr, verbosity) in endpoints {
249					log::trace!(
250						target: "telemetry",
251						"Initializing telemetry for: {:?}",
252						addr,
253					);
254					node_map.entry(id).or_default().push((verbosity, addr.clone()));
255
256					let node = match node_pool.entry(addr.clone()) {
257						Occupied(entry) => entry.into_mut(),
258						Vacant(entry) => {
259							let transport = initialize_transport();
260							let transport = match transport {
261								Ok(t) => t,
262								Err(err) => {
263									log::error!(
264										target: "telemetry",
265										"Could not initialise transport: {}",
266										err,
267									);
268									continue
269								},
270							};
271							entry.insert(Node::new(transport, addr.clone(), Vec::new(), Vec::new()))
272						},
273					};
274
275					node.connection_messages.extend(connection_message.clone());
276
277					pending_connection_notifications.retain(|(addr_b, connection_message)| {
278						if *addr_b == addr {
279							node.telemetry_connection_notifier.push(connection_message.clone());
280							false
281						} else {
282							true
283						}
284					});
285				}
286			},
287			Register::Notifier { addresses, connection_notifier } => {
288				for addr in addresses {
289					// If the Node has been initialized, we directly push the connection_notifier.
290					// Otherwise we push it to a queue that will be consumed when the connection
291					// initializes, thus ensuring that the connection notifier will be sent to the
292					// Node when it becomes available.
293					if let Some(node) = node_pool.get_mut(&addr) {
294						node.telemetry_connection_notifier.push(connection_notifier.clone());
295					} else {
296						pending_connection_notifications.push((addr, connection_notifier.clone()));
297					}
298				}
299			},
300		}
301	}
302
303	// dispatch messages to the telemetry nodes
304	async fn process_message(
305		input: Option<TelemetryMessage>,
306		node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
307		node_map: &HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
308	) {
309		let (id, verbosity, payload) = input.expect("the stream is never closed; qed");
310
311		let ts = chrono::Local::now().to_rfc3339();
312		let mut message = serde_json::Map::new();
313		message.insert("id".into(), id.into());
314		message.insert("ts".into(), ts.into());
315		message.insert("payload".into(), payload.into());
316
317		let nodes = if let Some(nodes) = node_map.get(&id) {
318			nodes
319		} else {
320			// This is a normal error because the telemetry ID exists before the telemetry is
321			// initialized.
322			log::trace!(
323				target: "telemetry",
324				"Received telemetry log for unknown id ({:?}): {}",
325				id,
326				serde_json::to_string(&message)
327					.unwrap_or_else(|err| format!(
328						"could not be serialized ({}): {:?}",
329						err,
330						message,
331					)),
332			);
333			return
334		};
335
336		for (node_max_verbosity, addr) in nodes {
337			if verbosity > *node_max_verbosity {
338				continue
339			}
340
341			if let Some(node) = node_pool.get_mut(addr) {
342				let _ = node.send(message.clone()).await;
343			} else {
344				log::debug!(
345					target: "telemetry",
346					"Received message for unknown node ({}). This is a bug. \
347					Message sent: {}",
348					addr,
349					serde_json::to_string(&message)
350						.unwrap_or_else(|err| format!(
351							"could not be serialized ({}): {:?}",
352							err,
353							message,
354						)),
355				);
356			}
357		}
358	}
359}
360
361/// Handle to the [`TelemetryWorker`] thats allows initializing the telemetry for a Substrate node.
362#[derive(Debug, Clone)]
363pub struct TelemetryWorkerHandle {
364	message_sender: mpsc::Sender<TelemetryMessage>,
365	register_sender: TracingUnboundedSender<Register>,
366	id_counter: Arc<atomic::AtomicU64>,
367}
368
369impl TelemetryWorkerHandle {
370	/// Instantiate a new [`Telemetry`] object.
371	pub fn new_telemetry(&mut self, endpoints: TelemetryEndpoints) -> Telemetry {
372		let addresses = endpoints.0.iter().map(|(addr, _)| addr.clone()).collect();
373
374		Telemetry {
375			message_sender: self.message_sender.clone(),
376			register_sender: self.register_sender.clone(),
377			id: self.id_counter.fetch_add(1, atomic::Ordering::Relaxed),
378			connection_notifier: TelemetryConnectionNotifier {
379				register_sender: self.register_sender.clone(),
380				addresses,
381			},
382			endpoints: Some(endpoints),
383		}
384	}
385}
386
387/// A telemetry instance that can be used to send telemetry messages.
388#[derive(Debug)]
389pub struct Telemetry {
390	message_sender: mpsc::Sender<TelemetryMessage>,
391	register_sender: TracingUnboundedSender<Register>,
392	id: Id,
393	connection_notifier: TelemetryConnectionNotifier,
394	endpoints: Option<TelemetryEndpoints>,
395}
396
397impl Telemetry {
398	/// Initialize the telemetry with the endpoints provided in argument for the current substrate
399	/// node.
400	///
401	/// This method must be called during the substrate node initialization.
402	///
403	/// The `endpoints` argument is a collection of telemetry WebSocket servers with a corresponding
404	/// verbosity level.
405	///
406	/// The `connection_message` argument is a JSON object that is sent every time the connection
407	/// (re-)establishes.
408	pub fn start_telemetry(&mut self, connection_message: ConnectionMessage) -> Result<()> {
409		let endpoints = self.endpoints.take().ok_or(Error::TelemetryAlreadyInitialized)?;
410
411		self.register_sender
412			.unbounded_send(Register::Telemetry { id: self.id, endpoints, connection_message })
413			.map_err(|_| Error::TelemetryWorkerDropped)
414	}
415
416	/// Make a new clonable handle to this [`Telemetry`]. This is used for reporting telemetries.
417	pub fn handle(&self) -> TelemetryHandle {
418		TelemetryHandle {
419			message_sender: Arc::new(Mutex::new(self.message_sender.clone())),
420			id: self.id,
421			connection_notifier: self.connection_notifier.clone(),
422		}
423	}
424}
425
426/// Handle to a [`Telemetry`].
427///
428/// Used to report telemetry messages.
429#[derive(Debug, Clone)]
430pub struct TelemetryHandle {
431	message_sender: Arc<Mutex<mpsc::Sender<TelemetryMessage>>>,
432	id: Id,
433	connection_notifier: TelemetryConnectionNotifier,
434}
435
436impl TelemetryHandle {
437	/// Send telemetry messages.
438	pub fn send_telemetry(&self, verbosity: VerbosityLevel, payload: TelemetryPayload) {
439		match self.message_sender.lock().try_send((self.id, verbosity, payload)) {
440			Ok(()) => {},
441			Err(err) if err.is_full() => log::trace!(
442				target: "telemetry",
443				"Telemetry channel full.",
444			),
445			Err(_) => log::trace!(
446				target: "telemetry",
447				"Telemetry channel closed.",
448			),
449		}
450	}
451
452	/// Get event stream for telemetry connection established events.
453	///
454	/// This function will return an error if the telemetry has already been started by
455	/// [`Telemetry::start_telemetry`].
456	pub fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
457		self.connection_notifier.on_connect_stream()
458	}
459}
460
461/// Used to create a stream of events with only one event: when a telemetry connection
462/// (re-)establishes.
463#[derive(Clone, Debug)]
464pub struct TelemetryConnectionNotifier {
465	register_sender: TracingUnboundedSender<Register>,
466	addresses: Vec<Multiaddr>,
467}
468
469impl TelemetryConnectionNotifier {
470	fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
471		let (message_sender, message_receiver) = connection_notifier_channel();
472		if let Err(err) = self.register_sender.unbounded_send(Register::Notifier {
473			addresses: self.addresses.clone(),
474			connection_notifier: message_sender,
475		}) {
476			error!(
477				target: "telemetry",
478				"Could not create a telemetry connection notifier: \
479				the telemetry is probably already running: {}",
480				err,
481			);
482		}
483		message_receiver
484	}
485}
486
487#[derive(Debug)]
488enum Register {
489	Telemetry { id: Id, endpoints: TelemetryEndpoints, connection_message: ConnectionMessage },
490	Notifier { addresses: Vec<Multiaddr>, connection_notifier: ConnectionNotifierSender },
491}
492
493/// Report a telemetry.
494///
495/// Translates to `tracing::info`, but contains an additional verbosity parameter which the log
496/// record is tagged with. Additionally the verbosity parameter is added to the record as a
497/// key-value pair.
498///
499/// # Example
500///
501/// ```no_run
502/// # use sc_telemetry::*;
503/// # let authority_id = 42_u64;
504/// # let set_id = (43_u64, 44_u64);
505/// # let authorities = vec![45_u64];
506/// # let telemetry: Option<TelemetryHandle> = None;
507/// telemetry!(
508///     telemetry;      // an `Option<TelemetryHandle>`
509///     CONSENSUS_INFO;
510///     "afg.authority_set";
511///     "authority_id" => authority_id.to_string(),
512///     "authority_set_id" => ?set_id,
513///     "authorities" => authorities,
514/// );
515/// ```
516#[macro_export(local_inner_macros)]
517macro_rules! telemetry {
518	( $telemetry:expr; $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
519		if let Some(telemetry) = $telemetry.as_ref() {
520			let verbosity: $crate::VerbosityLevel = $verbosity;
521			match format_fields_to_json!($($t)*) {
522				Err(err) => {
523					$crate::log::debug!(
524						target: "telemetry",
525						"Could not serialize value for telemetry: {}",
526						err,
527					);
528				},
529				Ok(mut json) => {
530					json.insert("msg".into(), $msg.into());
531					telemetry.send_telemetry(verbosity, json);
532				},
533			}
534		}
535	}};
536}
537
538#[macro_export(local_inner_macros)]
539#[doc(hidden)]
540macro_rules! format_fields_to_json {
541	( $k:literal => $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
542		$crate::serde_json::to_value(&$v)
543			.map(|value| {
544				let mut map = $crate::serde_json::Map::new();
545				map.insert($k.into(), value);
546				map
547			})
548			$(
549				.and_then(|mut prev_map| {
550					format_fields_to_json!($($t)*)
551						.map(move |mut other_map| {
552							prev_map.append(&mut other_map);
553							prev_map
554						})
555				})
556			)*
557	}};
558	( $k:literal => ? $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
559		let mut map = $crate::serde_json::Map::new();
560		map.insert($k.into(), std::format!("{:?}", &$v).into());
561		$crate::serde_json::Result::Ok(map)
562		$(
563			.and_then(|mut prev_map| {
564				format_fields_to_json!($($t)*)
565					.map(move |mut other_map| {
566						prev_map.append(&mut other_map);
567						prev_map
568					})
569			})
570		)*
571	}};
572}