1#![warn(missing_docs)]
38
39use futures::{channel::mpsc, prelude::*};
40use libp2p::Multiaddr;
41use log::{error, warn};
42use parking_lot::Mutex;
43use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
44use serde::Serialize;
45use std::{
46	collections::{
47		hash_map::Entry::{Occupied, Vacant},
48		HashMap,
49	},
50	sync::{atomic, Arc},
51};
52
53pub use log;
54pub use serde_json;
55
56mod endpoints;
57mod error;
58mod node;
59mod transport;
60
61pub use endpoints::*;
62pub use error::*;
63use node::*;
64use transport::*;
65
66pub const SUBSTRATE_DEBUG: VerbosityLevel = 9;
68pub const SUBSTRATE_INFO: VerbosityLevel = 0;
70
71pub const CONSENSUS_TRACE: VerbosityLevel = 9;
73pub const CONSENSUS_DEBUG: VerbosityLevel = 5;
75pub const CONSENSUS_WARN: VerbosityLevel = 4;
77pub const CONSENSUS_INFO: VerbosityLevel = 1;
79
80pub type VerbosityLevel = u8;
82
83pub(crate) type Id = u64;
84pub(crate) type TelemetryPayload = serde_json::Map<String, serde_json::Value>;
85pub(crate) type TelemetryMessage = (Id, VerbosityLevel, TelemetryPayload);
86
87#[derive(Debug, Serialize)]
89pub struct ConnectionMessage {
90	pub name: String,
92	pub implementation: String,
94	pub version: String,
96	pub config: String,
98	pub chain: String,
100	pub genesis_hash: String,
102	pub authority: bool,
104	pub startup_time: String,
106	pub network_id: String,
108
109	pub target_os: String,
111
112	pub target_arch: String,
114
115	pub target_env: String,
117
118	pub sysinfo: Option<SysInfo>,
120}
121
122#[derive(Debug, Serialize)]
127pub struct SysInfo {
128	pub cpu: Option<String>,
130	pub memory: Option<u64>,
132	pub core_count: Option<u32>,
134	pub linux_kernel: Option<String>,
136	pub linux_distro: Option<String>,
138	pub is_virtual_machine: Option<bool>,
140}
141
142#[derive(Debug)]
148pub struct TelemetryWorker {
149	message_receiver: mpsc::Receiver<TelemetryMessage>,
150	message_sender: mpsc::Sender<TelemetryMessage>,
151	register_receiver: TracingUnboundedReceiver<Register>,
152	register_sender: TracingUnboundedSender<Register>,
153	id_counter: Arc<atomic::AtomicU64>,
154}
155
156impl TelemetryWorker {
157	pub fn new(buffer_size: usize) -> Result<Self> {
161		let _transport = initialize_transport()?;
166		let (message_sender, message_receiver) = mpsc::channel(buffer_size);
167		let (register_sender, register_receiver) =
168			tracing_unbounded("mpsc_telemetry_register", 10_000);
169
170		Ok(Self {
171			message_receiver,
172			message_sender,
173			register_receiver,
174			register_sender,
175			id_counter: Arc::new(atomic::AtomicU64::new(1)),
176		})
177	}
178
179	pub fn handle(&self) -> TelemetryWorkerHandle {
183		TelemetryWorkerHandle {
184			message_sender: self.message_sender.clone(),
185			register_sender: self.register_sender.clone(),
186			id_counter: self.id_counter.clone(),
187		}
188	}
189
190	pub async fn run(mut self) {
194		let mut node_map: HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>> = HashMap::new();
195		let mut node_pool: HashMap<Multiaddr, _> = HashMap::new();
196		let mut pending_connection_notifications: Vec<_> = Vec::new();
197
198		loop {
199			futures::select! {
200				message = self.message_receiver.next() => Self::process_message(
201					message,
202					&mut node_pool,
203					&node_map,
204				).await,
205				init_payload = self.register_receiver.next() => Self::process_register(
206					init_payload,
207					&mut node_pool,
208					&mut node_map,
209					&mut pending_connection_notifications,
210				).await,
211			}
212		}
213	}
214
215	async fn process_register(
216		input: Option<Register>,
217		node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
218		node_map: &mut HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
219		pending_connection_notifications: &mut Vec<(Multiaddr, ConnectionNotifierSender)>,
220	) {
221		let input = input.expect("the stream is never closed; qed");
222
223		match input {
224			Register::Telemetry { id, endpoints, connection_message } => {
225				let endpoints = endpoints.0;
226
227				let connection_message = match serde_json::to_value(&connection_message) {
228					Ok(serde_json::Value::Object(mut value)) => {
229						value.insert("msg".into(), "system.connected".into());
230						let mut obj = serde_json::Map::new();
231						obj.insert("id".to_string(), id.into());
232						obj.insert("payload".to_string(), value.into());
233						Some(obj)
234					},
235					Ok(_) => {
236						unreachable!("ConnectionMessage always serialize to an object; qed")
237					},
238					Err(err) => {
239						log::error!(
240							target: "telemetry",
241							"Could not serialize connection message: {}",
242							err,
243						);
244						None
245					},
246				};
247
248				for (addr, verbosity) in endpoints {
249					log::trace!(
250						target: "telemetry",
251						"Initializing telemetry for: {:?}",
252						addr,
253					);
254					node_map.entry(id).or_default().push((verbosity, addr.clone()));
255
256					let node = match node_pool.entry(addr.clone()) {
257						Occupied(entry) => entry.into_mut(),
258						Vacant(entry) => {
259							let transport = initialize_transport();
260							let transport = match transport {
261								Ok(t) => t,
262								Err(err) => {
263									log::error!(
264										target: "telemetry",
265										"Could not initialise transport: {}",
266										err,
267									);
268									continue
269								},
270							};
271							entry.insert(Node::new(transport, addr.clone(), Vec::new(), Vec::new()))
272						},
273					};
274
275					node.connection_messages.extend(connection_message.clone());
276
277					pending_connection_notifications.retain(|(addr_b, connection_message)| {
278						if *addr_b == addr {
279							node.telemetry_connection_notifier.push(connection_message.clone());
280							false
281						} else {
282							true
283						}
284					});
285				}
286			},
287			Register::Notifier { addresses, connection_notifier } => {
288				for addr in addresses {
289					if let Some(node) = node_pool.get_mut(&addr) {
294						node.telemetry_connection_notifier.push(connection_notifier.clone());
295					} else {
296						pending_connection_notifications.push((addr, connection_notifier.clone()));
297					}
298				}
299			},
300		}
301	}
302
303	async fn process_message(
305		input: Option<TelemetryMessage>,
306		node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
307		node_map: &HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
308	) {
309		let (id, verbosity, payload) = input.expect("the stream is never closed; qed");
310
311		let ts = chrono::Local::now().to_rfc3339();
312		let mut message = serde_json::Map::new();
313		message.insert("id".into(), id.into());
314		message.insert("ts".into(), ts.into());
315		message.insert("payload".into(), payload.into());
316
317		let nodes = if let Some(nodes) = node_map.get(&id) {
318			nodes
319		} else {
320			log::trace!(
323				target: "telemetry",
324				"Received telemetry log for unknown id ({:?}): {}",
325				id,
326				serde_json::to_string(&message)
327					.unwrap_or_else(|err| format!(
328						"could not be serialized ({}): {:?}",
329						err,
330						message,
331					)),
332			);
333			return
334		};
335
336		for (node_max_verbosity, addr) in nodes {
337			if verbosity > *node_max_verbosity {
338				continue
339			}
340
341			if let Some(node) = node_pool.get_mut(addr) {
342				let _ = node.send(message.clone()).await;
343			} else {
344				log::debug!(
345					target: "telemetry",
346					"Received message for unknown node ({}). This is a bug. \
347					Message sent: {}",
348					addr,
349					serde_json::to_string(&message)
350						.unwrap_or_else(|err| format!(
351							"could not be serialized ({}): {:?}",
352							err,
353							message,
354						)),
355				);
356			}
357		}
358	}
359}
360
361#[derive(Debug, Clone)]
363pub struct TelemetryWorkerHandle {
364	message_sender: mpsc::Sender<TelemetryMessage>,
365	register_sender: TracingUnboundedSender<Register>,
366	id_counter: Arc<atomic::AtomicU64>,
367}
368
369impl TelemetryWorkerHandle {
370	pub fn new_telemetry(&mut self, endpoints: TelemetryEndpoints) -> Telemetry {
372		let addresses = endpoints.0.iter().map(|(addr, _)| addr.clone()).collect();
373
374		Telemetry {
375			message_sender: self.message_sender.clone(),
376			register_sender: self.register_sender.clone(),
377			id: self.id_counter.fetch_add(1, atomic::Ordering::Relaxed),
378			connection_notifier: TelemetryConnectionNotifier {
379				register_sender: self.register_sender.clone(),
380				addresses,
381			},
382			endpoints: Some(endpoints),
383		}
384	}
385}
386
387#[derive(Debug)]
389pub struct Telemetry {
390	message_sender: mpsc::Sender<TelemetryMessage>,
391	register_sender: TracingUnboundedSender<Register>,
392	id: Id,
393	connection_notifier: TelemetryConnectionNotifier,
394	endpoints: Option<TelemetryEndpoints>,
395}
396
397impl Telemetry {
398	pub fn start_telemetry(&mut self, connection_message: ConnectionMessage) -> Result<()> {
409		let endpoints = self.endpoints.take().ok_or(Error::TelemetryAlreadyInitialized)?;
410
411		self.register_sender
412			.unbounded_send(Register::Telemetry { id: self.id, endpoints, connection_message })
413			.map_err(|_| Error::TelemetryWorkerDropped)
414	}
415
416	pub fn handle(&self) -> TelemetryHandle {
418		TelemetryHandle {
419			message_sender: Arc::new(Mutex::new(self.message_sender.clone())),
420			id: self.id,
421			connection_notifier: self.connection_notifier.clone(),
422		}
423	}
424}
425
426#[derive(Debug, Clone)]
430pub struct TelemetryHandle {
431	message_sender: Arc<Mutex<mpsc::Sender<TelemetryMessage>>>,
432	id: Id,
433	connection_notifier: TelemetryConnectionNotifier,
434}
435
436impl TelemetryHandle {
437	pub fn send_telemetry(&self, verbosity: VerbosityLevel, payload: TelemetryPayload) {
439		match self.message_sender.lock().try_send((self.id, verbosity, payload)) {
440			Ok(()) => {},
441			Err(err) if err.is_full() => log::trace!(
442				target: "telemetry",
443				"Telemetry channel full.",
444			),
445			Err(_) => log::trace!(
446				target: "telemetry",
447				"Telemetry channel closed.",
448			),
449		}
450	}
451
452	pub fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
457		self.connection_notifier.on_connect_stream()
458	}
459}
460
461#[derive(Clone, Debug)]
464pub struct TelemetryConnectionNotifier {
465	register_sender: TracingUnboundedSender<Register>,
466	addresses: Vec<Multiaddr>,
467}
468
469impl TelemetryConnectionNotifier {
470	fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
471		let (message_sender, message_receiver) = connection_notifier_channel();
472		if let Err(err) = self.register_sender.unbounded_send(Register::Notifier {
473			addresses: self.addresses.clone(),
474			connection_notifier: message_sender,
475		}) {
476			error!(
477				target: "telemetry",
478				"Could not create a telemetry connection notifier: \
479				the telemetry is probably already running: {}",
480				err,
481			);
482		}
483		message_receiver
484	}
485}
486
487#[derive(Debug)]
488enum Register {
489	Telemetry { id: Id, endpoints: TelemetryEndpoints, connection_message: ConnectionMessage },
490	Notifier { addresses: Vec<Multiaddr>, connection_notifier: ConnectionNotifierSender },
491}
492
493#[macro_export(local_inner_macros)]
517macro_rules! telemetry {
518	( $telemetry:expr; $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
519		if let Some(telemetry) = $telemetry.as_ref() {
520			let verbosity: $crate::VerbosityLevel = $verbosity;
521			match format_fields_to_json!($($t)*) {
522				Err(err) => {
523					$crate::log::debug!(
524						target: "telemetry",
525						"Could not serialize value for telemetry: {}",
526						err,
527					);
528				},
529				Ok(mut json) => {
530					json.insert("msg".into(), $msg.into());
531					telemetry.send_telemetry(verbosity, json);
532				},
533			}
534		}
535	}};
536}
537
538#[macro_export(local_inner_macros)]
539#[doc(hidden)]
540macro_rules! format_fields_to_json {
541	( $k:literal => $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
542		$crate::serde_json::to_value(&$v)
543			.map(|value| {
544				let mut map = $crate::serde_json::Map::new();
545				map.insert($k.into(), value);
546				map
547			})
548			$(
549				.and_then(|mut prev_map| {
550					format_fields_to_json!($($t)*)
551						.map(move |mut other_map| {
552							prev_map.append(&mut other_map);
553							prev_map
554						})
555				})
556			)*
557	}};
558	( $k:literal => ? $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
559		let mut map = $crate::serde_json::Map::new();
560		map.insert($k.into(), std::format!("{:?}", &$v).into());
561		$crate::serde_json::Result::Ok(map)
562		$(
563			.and_then(|mut prev_map| {
564				format_fields_to_json!($($t)*)
565					.map(move |mut other_map| {
566						prev_map.append(&mut other_map);
567						prev_map
568					})
569			})
570		)*
571	}};
572}