use crate::{service::traits::BandwidthSink, ProtocolName};
use prometheus_endpoint::{
self as prometheus, Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, MetricSource, Opts,
PrometheusError, Registry, SourcedCounter, SourcedGauge, U64,
};
use std::{
str,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
pub use prometheus_endpoint::{Histogram, HistogramVec};
pub fn register(registry: &Registry, sources: MetricSources) -> Result<Metrics, PrometheusError> {
BandwidthCounters::register(registry, sources.bandwidth)?;
NumConnectedGauge::register(registry, sources.connected_peers)?;
Metrics::register(registry)
}
pub fn register_without_sources(registry: &Registry) -> Result<Metrics, PrometheusError> {
Metrics::register(registry)
}
pub struct MetricSources {
pub bandwidth: Arc<dyn BandwidthSink>,
pub connected_peers: Arc<AtomicUsize>,
}
impl MetricSources {
pub fn register(
registry: &Registry,
bandwidth: Arc<dyn BandwidthSink>,
connected_peers: Arc<AtomicUsize>,
) -> Result<(), PrometheusError> {
BandwidthCounters::register(registry, bandwidth)?;
NumConnectedGauge::register(registry, connected_peers)
}
}
#[derive(Clone)]
pub struct Metrics {
pub connections_closed_total: CounterVec<U64>,
pub connections_opened_total: CounterVec<U64>,
pub distinct_peers_connections_closed_total: Counter<U64>,
pub distinct_peers_connections_opened_total: Counter<U64>,
pub incoming_connections_errors_total: CounterVec<U64>,
pub incoming_connections_total: Counter<U64>,
pub kademlia_query_duration: HistogramVec,
pub kademlia_random_queries_total: Counter<U64>,
pub kademlia_records_count: Gauge<U64>,
pub kademlia_records_sizes_total: Gauge<U64>,
pub kbuckets_num_nodes: GaugeVec<U64>,
pub listeners_local_addresses: Gauge<U64>,
pub listeners_errors_total: Counter<U64>,
pub pending_connections: Gauge<U64>,
pub pending_connections_errors_total: CounterVec<U64>,
pub requests_in_failure_total: CounterVec<U64>,
pub requests_in_success_total: HistogramVec,
pub requests_out_failure_total: CounterVec<U64>,
pub requests_out_success_total: HistogramVec,
}
impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
connections_closed_total: prometheus::register(CounterVec::new(
Opts::new(
"substrate_sub_libp2p_connections_closed_total",
"Total number of connections closed, by direction and reason"
),
&["direction", "reason"]
)?, registry)?,
connections_opened_total: prometheus::register(CounterVec::new(
Opts::new(
"substrate_sub_libp2p_connections_opened_total",
"Total number of connections opened by direction"
),
&["direction"]
)?, registry)?,
distinct_peers_connections_closed_total: prometheus::register(Counter::new(
"substrate_sub_libp2p_distinct_peers_connections_closed_total",
"Total number of connections closed with distinct peers"
)?, registry)?,
distinct_peers_connections_opened_total: prometheus::register(Counter::new(
"substrate_sub_libp2p_distinct_peers_connections_opened_total",
"Total number of connections opened with distinct peers"
)?, registry)?,
incoming_connections_errors_total: prometheus::register(CounterVec::new(
Opts::new(
"substrate_sub_libp2p_incoming_connections_handshake_errors_total",
"Total number of incoming connections that have failed during the \
initial handshake"
),
&["reason"]
)?, registry)?,
incoming_connections_total: prometheus::register(Counter::new(
"substrate_sub_libp2p_incoming_connections_total",
"Total number of incoming connections on the listening sockets"
)?, registry)?,
kademlia_query_duration: prometheus::register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"substrate_sub_libp2p_kademlia_query_duration",
"Duration of Kademlia queries per query type"
),
buckets: prometheus::exponential_buckets(0.5, 2.0, 10)
.expect("parameters are always valid values; qed"),
},
&["type"]
)?, registry)?,
kademlia_random_queries_total: prometheus::register(Counter::new(
"substrate_sub_libp2p_kademlia_random_queries_total",
"Number of random Kademlia queries started",
)?, registry)?,
kademlia_records_count: prometheus::register(Gauge::new(
"substrate_sub_libp2p_kademlia_records_count",
"Number of records in the Kademlia records store",
)?, registry)?,
kademlia_records_sizes_total: prometheus::register(Gauge::new(
"substrate_sub_libp2p_kademlia_records_sizes_total",
"Total size of all the records in the Kademlia records store",
)?, registry)?,
kbuckets_num_nodes: prometheus::register(GaugeVec::new(
Opts::new(
"substrate_sub_libp2p_kbuckets_num_nodes",
"Number of nodes per kbucket per Kademlia instance"
),
&["lower_ilog2_bucket_bound"]
)?, registry)?,
listeners_local_addresses: prometheus::register(Gauge::new(
"substrate_sub_libp2p_listeners_local_addresses",
"Number of local addresses we're listening on"
)?, registry)?,
listeners_errors_total: prometheus::register(Counter::new(
"substrate_sub_libp2p_listeners_errors_total",
"Total number of non-fatal errors reported by a listener"
)?, registry)?,
pending_connections: prometheus::register(Gauge::new(
"substrate_sub_libp2p_pending_connections",
"Number of connections in the process of being established",
)?, registry)?,
pending_connections_errors_total: prometheus::register(CounterVec::new(
Opts::new(
"substrate_sub_libp2p_pending_connections_errors_total",
"Total number of pending connection errors"
),
&["reason"]
)?, registry)?,
requests_in_failure_total: prometheus::register(CounterVec::new(
Opts::new(
"substrate_sub_libp2p_requests_in_failure_total",
"Total number of incoming requests that the node has failed to answer"
),
&["protocol", "reason"]
)?, registry)?,
requests_in_success_total: prometheus::register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"substrate_sub_libp2p_requests_in_success_total",
"For successful incoming requests, time between receiving the request and \
starting to send the response"
),
buckets: prometheus::exponential_buckets(0.001, 2.0, 16)
.expect("parameters are always valid values; qed"),
},
&["protocol"]
)?, registry)?,
requests_out_failure_total: prometheus::register(CounterVec::new(
Opts::new(
"substrate_sub_libp2p_requests_out_failure_total",
"Total number of requests that have failed"
),
&["protocol", "reason"]
)?, registry)?,
requests_out_success_total: prometheus::register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"substrate_sub_libp2p_requests_out_success_total",
"For successful outgoing requests, time between a request's start and finish"
),
buckets: prometheus::exponential_buckets(0.001, 2.0, 16)
.expect("parameters are always valid values; qed"),
},
&["protocol"]
)?, registry)?,
})
}
}
#[derive(Clone, Debug)]
pub struct PeerStoreMetrics {
pub num_banned_peers: Gauge<U64>,
pub num_discovered: Gauge<U64>,
}
impl PeerStoreMetrics {
pub fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
num_banned_peers: prometheus::register(
Gauge::new(
"substrate_sub_libp2p_peerset_num_banned_peers",
"Number of banned peers stored in the peerset manager",
)?,
registry,
)?,
num_discovered: prometheus::register(
Gauge::new(
"substrate_sub_libp2p_peerset_num_discovered",
"Number of nodes stored in the peerset manager",
)?,
registry,
)?,
})
}
}
#[derive(Clone)]
pub struct BandwidthCounters(Arc<dyn BandwidthSink>);
impl BandwidthCounters {
fn register(registry: &Registry, sinks: Arc<dyn BandwidthSink>) -> Result<(), PrometheusError> {
prometheus::register(
SourcedCounter::new(
&Opts::new("substrate_sub_libp2p_network_bytes_total", "Total bandwidth usage")
.variable_label("direction"),
BandwidthCounters(sinks),
)?,
registry,
)?;
Ok(())
}
}
impl MetricSource for BandwidthCounters {
type N = u64;
fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
set(&["in"], self.0.total_inbound());
set(&["out"], self.0.total_outbound());
}
}
#[derive(Clone)]
pub struct NumConnectedGauge(Arc<AtomicUsize>);
impl NumConnectedGauge {
fn register(registry: &Registry, value: Arc<AtomicUsize>) -> Result<(), PrometheusError> {
prometheus::register(
SourcedGauge::new(
&Opts::new("substrate_sub_libp2p_peers_count", "Number of connected peers"),
NumConnectedGauge(value),
)?,
registry,
)?;
Ok(())
}
}
impl MetricSource for NumConnectedGauge {
type N = u64;
fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
set(&[], self.0.load(Ordering::Relaxed) as u64);
}
}
#[derive(Debug, Clone)]
pub struct NotificationMetrics {
metrics: Option<InnerNotificationMetrics>,
}
impl NotificationMetrics {
pub fn new(registry: Option<&Registry>) -> NotificationMetrics {
let metrics = match registry {
Some(registry) => InnerNotificationMetrics::register(registry).ok(),
None => None,
};
Self { metrics }
}
pub fn register_substream_opened(&self, protocol: &ProtocolName) {
if let Some(metrics) = &self.metrics {
metrics.notifications_streams_opened_total.with_label_values(&[&protocol]).inc();
}
}
pub fn register_substream_closed(&self, protocol: &ProtocolName) {
if let Some(metrics) = &self.metrics {
metrics
.notifications_streams_closed_total
.with_label_values(&[&protocol[..]])
.inc();
}
}
pub fn register_notification_sent(&self, protocol: &ProtocolName, size: usize) {
if let Some(metrics) = &self.metrics {
metrics
.notifications_sizes
.with_label_values(&["out", protocol])
.observe(size as f64);
}
}
pub fn register_notification_received(&self, protocol: &ProtocolName, size: usize) {
if let Some(metrics) = &self.metrics {
metrics
.notifications_sizes
.with_label_values(&["in", protocol])
.observe(size as f64);
}
}
}
#[derive(Debug, Clone)]
struct InnerNotificationMetrics {
pub notifications_streams_opened_total: CounterVec<U64>,
pub notifications_streams_closed_total: CounterVec<U64>,
pub notifications_sizes: HistogramVec,
}
impl InnerNotificationMetrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
notifications_sizes: prometheus::register(
HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"substrate_sub_libp2p_notifications_sizes",
"Sizes of the notifications send to and received from all nodes",
),
buckets: prometheus::exponential_buckets(64.0, 4.0, 8)
.expect("parameters are always valid values; qed"),
},
&["direction", "protocol"],
)?,
registry,
)?,
notifications_streams_closed_total: prometheus::register(
CounterVec::new(
Opts::new(
"substrate_sub_libp2p_notifications_streams_closed_total",
"Total number of notification substreams that have been closed",
),
&["protocol"],
)?,
registry,
)?,
notifications_streams_opened_total: prometheus::register(
CounterVec::new(
Opts::new(
"substrate_sub_libp2p_notifications_streams_opened_total",
"Total number of notification substreams that have been opened",
),
&["protocol"],
)?,
registry,
)?,
})
}
}