#![warn(missing_docs)]
use futures::{channel::mpsc, prelude::*};
use libp2p::Multiaddr;
use log::{error, warn};
use parking_lot::Mutex;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use serde::Serialize;
use std::{
collections::{
hash_map::Entry::{Occupied, Vacant},
HashMap,
},
sync::{atomic, Arc},
};
pub use log;
pub use serde_json;
mod endpoints;
mod error;
mod node;
mod transport;
pub use endpoints::*;
pub use error::*;
use node::*;
use transport::*;
pub const SUBSTRATE_DEBUG: VerbosityLevel = 9;
pub const SUBSTRATE_INFO: VerbosityLevel = 0;
pub const CONSENSUS_TRACE: VerbosityLevel = 9;
pub const CONSENSUS_DEBUG: VerbosityLevel = 5;
pub const CONSENSUS_WARN: VerbosityLevel = 4;
pub const CONSENSUS_INFO: VerbosityLevel = 1;
pub type VerbosityLevel = u8;
pub(crate) type Id = u64;
pub(crate) type TelemetryPayload = serde_json::Map<String, serde_json::Value>;
pub(crate) type TelemetryMessage = (Id, VerbosityLevel, TelemetryPayload);
#[derive(Debug, Serialize)]
pub struct ConnectionMessage {
pub name: String,
pub implementation: String,
pub version: String,
pub config: String,
pub chain: String,
pub genesis_hash: String,
pub authority: bool,
pub startup_time: String,
pub network_id: String,
pub target_os: String,
pub target_arch: String,
pub target_env: String,
pub sysinfo: Option<SysInfo>,
}
#[derive(Debug, Serialize)]
pub struct SysInfo {
pub cpu: Option<String>,
pub memory: Option<u64>,
pub core_count: Option<u32>,
pub linux_kernel: Option<String>,
pub linux_distro: Option<String>,
pub is_virtual_machine: Option<bool>,
}
#[derive(Debug)]
pub struct TelemetryWorker {
message_receiver: mpsc::Receiver<TelemetryMessage>,
message_sender: mpsc::Sender<TelemetryMessage>,
register_receiver: TracingUnboundedReceiver<Register>,
register_sender: TracingUnboundedSender<Register>,
id_counter: Arc<atomic::AtomicU64>,
}
impl TelemetryWorker {
pub fn new(buffer_size: usize) -> Result<Self> {
let _transport = initialize_transport()?;
let (message_sender, message_receiver) = mpsc::channel(buffer_size);
let (register_sender, register_receiver) =
tracing_unbounded("mpsc_telemetry_register", 10_000);
Ok(Self {
message_receiver,
message_sender,
register_receiver,
register_sender,
id_counter: Arc::new(atomic::AtomicU64::new(1)),
})
}
pub fn handle(&self) -> TelemetryWorkerHandle {
TelemetryWorkerHandle {
message_sender: self.message_sender.clone(),
register_sender: self.register_sender.clone(),
id_counter: self.id_counter.clone(),
}
}
pub async fn run(mut self) {
let mut node_map: HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>> = HashMap::new();
let mut node_pool: HashMap<Multiaddr, _> = HashMap::new();
let mut pending_connection_notifications: Vec<_> = Vec::new();
loop {
futures::select! {
message = self.message_receiver.next() => Self::process_message(
message,
&mut node_pool,
&node_map,
).await,
init_payload = self.register_receiver.next() => Self::process_register(
init_payload,
&mut node_pool,
&mut node_map,
&mut pending_connection_notifications,
).await,
}
}
}
async fn process_register(
input: Option<Register>,
node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
node_map: &mut HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
pending_connection_notifications: &mut Vec<(Multiaddr, ConnectionNotifierSender)>,
) {
let input = input.expect("the stream is never closed; qed");
match input {
Register::Telemetry { id, endpoints, connection_message } => {
let endpoints = endpoints.0;
let connection_message = match serde_json::to_value(&connection_message) {
Ok(serde_json::Value::Object(mut value)) => {
value.insert("msg".into(), "system.connected".into());
let mut obj = serde_json::Map::new();
obj.insert("id".to_string(), id.into());
obj.insert("payload".to_string(), value.into());
Some(obj)
},
Ok(_) => {
unreachable!("ConnectionMessage always serialize to an object; qed")
},
Err(err) => {
log::error!(
target: "telemetry",
"Could not serialize connection message: {}",
err,
);
None
},
};
for (addr, verbosity) in endpoints {
log::trace!(
target: "telemetry",
"Initializing telemetry for: {:?}",
addr,
);
node_map.entry(id).or_default().push((verbosity, addr.clone()));
let node = match node_pool.entry(addr.clone()) {
Occupied(entry) => entry.into_mut(),
Vacant(entry) => {
let transport = initialize_transport();
let transport = match transport {
Ok(t) => t,
Err(err) => {
log::error!(
target: "telemetry",
"Could not initialise transport: {}",
err,
);
continue
},
};
entry.insert(Node::new(transport, addr.clone(), Vec::new(), Vec::new()))
},
};
node.connection_messages.extend(connection_message.clone());
pending_connection_notifications.retain(|(addr_b, connection_message)| {
if *addr_b == addr {
node.telemetry_connection_notifier.push(connection_message.clone());
false
} else {
true
}
});
}
},
Register::Notifier { addresses, connection_notifier } => {
for addr in addresses {
if let Some(node) = node_pool.get_mut(&addr) {
node.telemetry_connection_notifier.push(connection_notifier.clone());
} else {
pending_connection_notifications.push((addr, connection_notifier.clone()));
}
}
},
}
}
async fn process_message(
input: Option<TelemetryMessage>,
node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
node_map: &HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
) {
let (id, verbosity, payload) = input.expect("the stream is never closed; qed");
let ts = chrono::Local::now().to_rfc3339();
let mut message = serde_json::Map::new();
message.insert("id".into(), id.into());
message.insert("ts".into(), ts.into());
message.insert("payload".into(), payload.into());
let nodes = if let Some(nodes) = node_map.get(&id) {
nodes
} else {
log::trace!(
target: "telemetry",
"Received telemetry log for unknown id ({:?}): {}",
id,
serde_json::to_string(&message)
.unwrap_or_else(|err| format!(
"could not be serialized ({}): {:?}",
err,
message,
)),
);
return
};
for (node_max_verbosity, addr) in nodes {
if verbosity > *node_max_verbosity {
continue
}
if let Some(node) = node_pool.get_mut(addr) {
let _ = node.send(message.clone()).await;
} else {
log::debug!(
target: "telemetry",
"Received message for unknown node ({}). This is a bug. \
Message sent: {}",
addr,
serde_json::to_string(&message)
.unwrap_or_else(|err| format!(
"could not be serialized ({}): {:?}",
err,
message,
)),
);
}
}
}
}
#[derive(Debug, Clone)]
pub struct TelemetryWorkerHandle {
message_sender: mpsc::Sender<TelemetryMessage>,
register_sender: TracingUnboundedSender<Register>,
id_counter: Arc<atomic::AtomicU64>,
}
impl TelemetryWorkerHandle {
pub fn new_telemetry(&mut self, endpoints: TelemetryEndpoints) -> Telemetry {
let addresses = endpoints.0.iter().map(|(addr, _)| addr.clone()).collect();
Telemetry {
message_sender: self.message_sender.clone(),
register_sender: self.register_sender.clone(),
id: self.id_counter.fetch_add(1, atomic::Ordering::Relaxed),
connection_notifier: TelemetryConnectionNotifier {
register_sender: self.register_sender.clone(),
addresses,
},
endpoints: Some(endpoints),
}
}
}
#[derive(Debug)]
pub struct Telemetry {
message_sender: mpsc::Sender<TelemetryMessage>,
register_sender: TracingUnboundedSender<Register>,
id: Id,
connection_notifier: TelemetryConnectionNotifier,
endpoints: Option<TelemetryEndpoints>,
}
impl Telemetry {
pub fn start_telemetry(&mut self, connection_message: ConnectionMessage) -> Result<()> {
let endpoints = self.endpoints.take().ok_or(Error::TelemetryAlreadyInitialized)?;
self.register_sender
.unbounded_send(Register::Telemetry { id: self.id, endpoints, connection_message })
.map_err(|_| Error::TelemetryWorkerDropped)
}
pub fn handle(&self) -> TelemetryHandle {
TelemetryHandle {
message_sender: Arc::new(Mutex::new(self.message_sender.clone())),
id: self.id,
connection_notifier: self.connection_notifier.clone(),
}
}
}
#[derive(Debug, Clone)]
pub struct TelemetryHandle {
message_sender: Arc<Mutex<mpsc::Sender<TelemetryMessage>>>,
id: Id,
connection_notifier: TelemetryConnectionNotifier,
}
impl TelemetryHandle {
pub fn send_telemetry(&self, verbosity: VerbosityLevel, payload: TelemetryPayload) {
match self.message_sender.lock().try_send((self.id, verbosity, payload)) {
Ok(()) => {},
Err(err) if err.is_full() => log::trace!(
target: "telemetry",
"Telemetry channel full.",
),
Err(_) => log::trace!(
target: "telemetry",
"Telemetry channel closed.",
),
}
}
pub fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
self.connection_notifier.on_connect_stream()
}
}
#[derive(Clone, Debug)]
pub struct TelemetryConnectionNotifier {
register_sender: TracingUnboundedSender<Register>,
addresses: Vec<Multiaddr>,
}
impl TelemetryConnectionNotifier {
fn on_connect_stream(&self) -> ConnectionNotifierReceiver {
let (message_sender, message_receiver) = connection_notifier_channel();
if let Err(err) = self.register_sender.unbounded_send(Register::Notifier {
addresses: self.addresses.clone(),
connection_notifier: message_sender,
}) {
error!(
target: "telemetry",
"Could not create a telemetry connection notifier: \
the telemetry is probably already running: {}",
err,
);
}
message_receiver
}
}
#[derive(Debug)]
enum Register {
Telemetry { id: Id, endpoints: TelemetryEndpoints, connection_message: ConnectionMessage },
Notifier { addresses: Vec<Multiaddr>, connection_notifier: ConnectionNotifierSender },
}
#[macro_export(local_inner_macros)]
macro_rules! telemetry {
( $telemetry:expr; $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{
if let Some(telemetry) = $telemetry.as_ref() {
let verbosity: $crate::VerbosityLevel = $verbosity;
match format_fields_to_json!($($t)*) {
Err(err) => {
$crate::log::debug!(
target: "telemetry",
"Could not serialize value for telemetry: {}",
err,
);
},
Ok(mut json) => {
json.insert("msg".into(), $msg.into());
telemetry.send_telemetry(verbosity, json);
},
}
}
}};
}
#[macro_export(local_inner_macros)]
#[doc(hidden)]
macro_rules! format_fields_to_json {
( $k:literal => $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
$crate::serde_json::to_value(&$v)
.map(|value| {
let mut map = $crate::serde_json::Map::new();
map.insert($k.into(), value);
map
})
$(
.and_then(|mut prev_map| {
format_fields_to_json!($($t)*)
.map(move |mut other_map| {
prev_map.append(&mut other_map);
prev_map
})
})
)*
}};
( $k:literal => ? $v:expr $(,)? $(, $($t:tt)+ )? ) => {{
let mut map = $crate::serde_json::Map::new();
map.insert($k.into(), std::format!("{:?}", &$v).into());
$crate::serde_json::Result::Ok(map)
$(
.and_then(|mut prev_map| {
format_fields_to_json!($($t)*)
.map(move |mut other_map| {
prev_map.append(&mut other_map);
prev_map
})
})
)*
}};
}