1#![warn(missing_docs)]
38
39use futures::{channel::mpsc, prelude::*};
40use libp2p::Multiaddr;
41use log::{error, warn};
42use parking_lot::Mutex;
43use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
44use serde::Serialize;
45use std::{
46 collections::{
47 hash_map::Entry::{Occupied, Vacant},
48 HashMap,
49 },
50 sync::{atomic, Arc},
51};
52
53pub use log;
54pub use serde_json;
55
56mod endpoints;
57mod error;
58mod node;
59mod transport;
60
61pub use endpoints::*;
62pub use error::*;
63use node::*;
64use transport::*;
65
66pub const SUBSTRATE_DEBUG: VerbosityLevel = 9;
68pub const SUBSTRATE_INFO: VerbosityLevel = 0;
70
71pub const CONSENSUS_TRACE: VerbosityLevel = 9;
73pub const CONSENSUS_DEBUG: VerbosityLevel = 5;
75pub const CONSENSUS_WARN: VerbosityLevel = 4;
77pub const CONSENSUS_INFO: VerbosityLevel = 1;
79
80pub type VerbosityLevel = u8;
82
83pub(crate) type Id = u64;
84pub(crate) type TelemetryPayload = serde_json::Map<String, serde_json::Value>;
85pub(crate) type TelemetryMessage = (Id, VerbosityLevel, TelemetryPayload);
86
87#[derive(Debug, Serialize)]
89pub struct ConnectionMessage {
90 pub name: String,
92 pub implementation: String,
94 pub version: String,
96 pub config: String,
98 pub chain: String,
100 pub genesis_hash: String,
102 pub authority: bool,
104 pub startup_time: String,
106 pub network_id: String,
108
109 pub target_os: String,
111
112 pub target_arch: String,
114
115 pub target_env: String,
117
118 pub sysinfo: Option<SysInfo>,
120}
121
122#[derive(Debug, Serialize)]
127pub struct SysInfo {
128 pub cpu: Option<String>,
130 pub memory: Option<u64>,
132 pub core_count: Option<u32>,
134 pub linux_kernel: Option<String>,
136 pub linux_distro: Option<String>,
138 pub is_virtual_machine: Option<bool>,
140}
141
142#[derive(Debug)]
148pub struct TelemetryWorker {
149 message_receiver: mpsc::Receiver<TelemetryMessage>,
150 message_sender: mpsc::Sender<TelemetryMessage>,
151 register_receiver: TracingUnboundedReceiver<Register>,
152 register_sender: TracingUnboundedSender<Register>,
153 id_counter: Arc<atomic::AtomicU64>,
154}
155
156impl TelemetryWorker {
157 pub fn new(buffer_size: usize) -> Result<Self> {
161 let _transport = initialize_transport()?;
166 let (message_sender, message_receiver) = mpsc::channel(buffer_size);
167 let (register_sender, register_receiver) =
168 tracing_unbounded("mpsc_telemetry_register", 10_000);
169
170 Ok(Self {
171 message_receiver,
172 message_sender,
173 register_receiver,
174 register_sender,
175 id_counter: Arc::new(atomic::AtomicU64::new(1)),
176 })
177 }
178
179 pub fn handle(&self) -> TelemetryWorkerHandle {
183 TelemetryWorkerHandle {
184 message_sender: self.message_sender.clone(),
185 register_sender: self.register_sender.clone(),
186 id_counter: self.id_counter.clone(),
187 }
188 }
189
190 pub async fn run(mut self) {
194 let mut node_map: HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>> = HashMap::new();
195 let mut node_pool: HashMap<Multiaddr, _> = HashMap::new();
196 let mut pending_connection_notifications: Vec<_> = Vec::new();
197
198 loop {
199 futures::select! {
200 message = self.message_receiver.next() => Self::process_message(
201 message,
202 &mut node_pool,
203 &node_map,
204 ).await,
205 init_payload = self.register_receiver.next() => Self::process_register(
206 init_payload,
207 &mut node_pool,
208 &mut node_map,
209 &mut pending_connection_notifications,
210 ).await,
211 }
212 }
213 }
214
215 async fn process_register(
216 input: Option<Register>,
217 node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
218 node_map: &mut HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
219 pending_connection_notifications: &mut Vec<(Multiaddr, ConnectionNotifierSender)>,
220 ) {
221 let input = input.expect("the stream is never closed; qed");
222
223 match input {
224 Register::Telemetry { id, endpoints, connection_message } => {
225 let endpoints = endpoints.0;
226
227 let connection_message = match serde_json::to_value(&connection_message) {
228 Ok(serde_json::Value::Object(mut value)) => {
229 value.insert("msg".into(), "system.connected".into());
230 let mut obj = serde_json::Map::new();
231 obj.insert("id".to_string(), id.into());
232 obj.insert("payload".to_string(), value.into());
233 Some(obj)
234 },
235 Ok(_) => {
236 unreachable!("ConnectionMessage always serialize to an object; qed")
237 },
238 Err(err) => {
239 log::error!(
240 target: "telemetry",
241 "Could not serialize connection message: {}",
242 err,
243 );
244 None
245 },
246 };
247
248 for (addr, verbosity) in endpoints {
249 log::trace!(
250 target: "telemetry",
251 "Initializing telemetry for: {:?}",
252 addr,
253 );
254 node_map.entry(id).or_default().push((verbosity, addr.clone()));
255
256 let node = match node_pool.entry(addr.clone()) {
257 Occupied(entry) => entry.into_mut(),
258 Vacant(entry) => {
259 let transport = initialize_transport();
260 let transport = match transport {
261 Ok(t) => t,
262 Err(err) => {
263 log::error!(
264 target: "telemetry",
265 "Could not initialise transport: {}",
266 err,
267 );
268 continue
269 },
270 };
271 entry.insert(Node::new(transport, addr.clone(), Vec::new(), Vec::new()))
272 },
273 };
274
275 node.connection_messages.extend(connection_message.clone());
276
277 pending_connection_notifications.retain(|(addr_b, connection_message)| {
278 if *addr_b == addr {
279 node.telemetry_connection_notifier.push(connection_message.clone());
280 false
281 } else {
282 true
283 }
284 });
285 }
286 },
287 Register::Notifier { addresses, connection_notifier } => {
288 for addr in addresses {
289 if let Some(node) = node_pool.get_mut(&addr) {
294 node.telemetry_connection_notifier.push(connection_notifier.clone());
295 } else {
296 pending_connection_notifications.push((addr, connection_notifier.clone()));
297 }
298 }
299 },
300 }
301 }
302
303 async fn process_message(
305 input: Option<TelemetryMessage>,
306 node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
307 node_map: &HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
308 ) {
309 let (id, verbosity, payload) = input.expect("the stream is never closed; qed");
310
311 let ts = chrono::Local::now().to_rfc3339();
312 let mut message = serde_json::Map::new();
313 message.insert("id".into(), id.into());
314 message.insert("ts".into(), ts.into());
315 message.insert("payload".into(), payload.into());
316
317 let nodes = if let Some(nodes) = node_map.get(&id) {
318 nodes
319 } else {
320 log::trace!(
323 target: "telemetry",
324 "Received telemetry log for unknown id ({:?}): {}",
325 id,
326 serde_json::to_string(&message)
327 .unwrap_or_else(|err| format!(
328 "could not be serialized ({}): {:?}",
329 err,
330 message,
331 )),
332 );
333 return
334 };
335
336 for (node_max_verbosity, addr) in nodes {
337 if verbosity > *node_max_verbosity {
338 continue
339 }
340
341 if let Some(node) = node_pool.get_mut(addr) {
342 let _ = node.send(message.clone()).await;
343 } else {
344 log::debug!(
345 target: "telemetry",
346 "Received message for unknown node ({}). This is a bug. \
347 Message sent: {}",
348 addr,
349 serde_json::to_string(&message)
350 .unwrap_or_else(|err| format!(
351 "could not be serialized ({}): {:?}",
352 err,
353 message,
354 )),
355 );
356 }
357 }
358 }
359}
360
361#[derive(Debug, Clone)]
363pub struct TelemetryWorkerHandle {
364 message_sender: mpsc::Sender<TelemetryMessage>,
365 register_sender: TracingUnboundedSender<Register>,
366 id_counter: Arc<atomic::AtomicU64>,
367}
368
369impl TelemetryWorkerHandle {
370 pub fn new_telemetry(&mut self, endpoints: TelemetryEndpoints) -> Telemetry {
372 let addresses = endpoints.0.iter().map(|(addr, _)| addr.clone()).collect();
373
374 Telemetry {
375 message_sender: self.message_sender.clone(),
376 register_sender: self.register_sender.clone(),
377 id: self.id_counter.fetch_add(1, atomic::Ordering::Relaxed),
378 connection_notifier: TelemetryConnectionNotifier {
379 register_sender: self.register_sender.clone(),
380 addresses,
381 },
382 endpoints: Some(endpoints),
383 }
384 }
385}
386
387#[derive(Debug)]
389pub struct Telemetry {
390 message_sender: mpsc::Sender<TelemetryMessage>,
391 register_sender: TracingUnboundedSender<Register>,
392 id: Id,
393 connection_notifier: TelemetryConnectionNotifier,
394 endpoints: Option<TelemetryEndpoints>,
395}
396
397impl Telemetry {
398 pub fn start_telemetry(&mut self, connection_message: ConnectionMessage) -> Result<()> {
409 let endpoints = self.endpoints.take().ok_or(Error::TelemetryAlreadyInitialized)?;
410
411 self.register_sender
412 .unbounded_send(Register::Telemetry { id: self.id, endpoints, connection_message })
413 .map_err(|_| Error::TelemetryWorkerDropped)
414 }
415
416 pub fn handle(&self) -> TelemetryHandle {
418 TelemetryHandle {
419 message_sender: Arc::new(Mutex::new(self.message_sender.clone())),
420 id: self.id,
421 connection_notifier: self.connection_notifier.clone(),
422 }
423 }
424}
425
426#[derive(Debug, Clone)]
430pub struct TelemetryHandle {
431 message_sender: Arc<Mutex<mpsc::Sender<TelemetryMessage>>>,
432 id: Id,
433 connection_notifier: TelemetryConnectionNotifier,
434}
435
436impl TelemetryHandle {
437 pub fn send_telemetry(&self, verbosity: VerbosityLevel, payload: TelemetryPayload) {
439 match self.message_sender.lock().try_send((self.id, verbosity, payload)) {
440 Ok(()) => {},
441 Err(err) if err.is_full() => log::trace!(
442 target: "telemetry",
443 "Telemetry channel full.",
444 ),
445 Err(_) => log::trace!(
446 target: "telemetry",
447 "Telemetry channel closed.",
448 ),
449 }
450 }
451
452 pub fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
457 self.connection_notifier.on_connect_stream()
458 }
459}
460
461#[derive(Clone, Debug)]
464pub struct TelemetryConnectionNotifier {
465 register_sender: TracingUnboundedSender<Register>,
466 addresses: Vec<Multiaddr>,
467}
468
469impl TelemetryConnectionNotifier {
470 fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
471 let (message_sender, message_receiver) = connection_notifier_channel();
472 if let Err(err) = self.register_sender.unbounded_send(Register::Notifier {
473 addresses: self.addresses.clone(),
474 connection_notifier: message_sender,
475 }) {
476 error!(
477 target: "telemetry",
478 "Could not create a telemetry connection notifier: \
479 the telemetry is probably already running: {}",
480 err,
481 );
482 }
483 message_receiver
484 }
485}
486
487#[derive(Debug)]
488enum Register {
489 Telemetry { id: Id, endpoints: TelemetryEndpoints, connection_message: ConnectionMessage },
490 Notifier { addresses: Vec<Multiaddr>, connection_notifier: ConnectionNotifierSender },
491}
492
493#[macro_export(local_inner_macros)]
517macro_rules! telemetry {
518 ( $telemetry:expr; $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
519 if let Some(telemetry) = $telemetry.as_ref() {
520 let verbosity: $crate::VerbosityLevel = $verbosity;
521 match format_fields_to_json!($($t)*) {
522 Err(err) => {
523 $crate::log::debug!(
524 target: "telemetry",
525 "Could not serialize value for telemetry: {}",
526 err,
527 );
528 },
529 Ok(mut json) => {
530 json.insert("msg".into(), $msg.into());
531 telemetry.send_telemetry(verbosity, json);
532 },
533 }
534 }
535 }};
536}
537
538#[macro_export(local_inner_macros)]
539#[doc(hidden)]
540macro_rules! format_fields_to_json {
541 ( $k:literal => $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
542 $crate::serde_json::to_value(&$v)
543 .map(|value| {
544 let mut map = $crate::serde_json::Map::new();
545 map.insert($k.into(), value);
546 map
547 })
548 $(
549 .and_then(|mut prev_map| {
550 format_fields_to_json!($($t)*)
551 .map(move |mut other_map| {
552 prev_map.append(&mut other_map);
553 prev_map
554 })
555 })
556 )*
557 }};
558 ( $k:literal => ? $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
559 let mut map = $crate::serde_json::Map::new();
560 map.insert($k.into(), std::format!("{:?}", &$v).into());
561 $crate::serde_json::Result::Ok(map)
562 $(
563 .and_then(|mut prev_map| {
564 format_fields_to_json!($($t)*)
565 .map(move |mut other_map| {
566 prev_map.append(&mut other_map);
567 prev_map
568 })
569 })
570 )*
571 }};
572}