1use crate::{service::traits::BandwidthSink, ProtocolName};
20
21use prometheus_endpoint::{
22 self as prometheus, Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, MetricSource, Opts,
23 PrometheusError, Registry, SourcedCounter, SourcedGauge, U64,
24};
25
26use std::{
27 str,
28 sync::{
29 atomic::{AtomicUsize, Ordering},
30 Arc,
31 },
32};
33
34pub use prometheus_endpoint::{Histogram, HistogramVec};
35
36pub fn register(registry: &Registry, sources: MetricSources) -> Result<Metrics, PrometheusError> {
38 BandwidthCounters::register(registry, sources.bandwidth)?;
39 NumConnectedGauge::register(registry, sources.connected_peers)?;
40 Metrics::register(registry)
41}
42
43pub fn register_without_sources(registry: &Registry) -> Result<Metrics, PrometheusError> {
45 Metrics::register(registry)
46}
47
48pub struct MetricSources {
50 pub bandwidth: Arc<dyn BandwidthSink>,
51 pub connected_peers: Arc<AtomicUsize>,
52}
53
54impl MetricSources {
55 pub fn register(
56 registry: &Registry,
57 bandwidth: Arc<dyn BandwidthSink>,
58 connected_peers: Arc<AtomicUsize>,
59 ) -> Result<(), PrometheusError> {
60 BandwidthCounters::register(registry, bandwidth)?;
61 NumConnectedGauge::register(registry, connected_peers)
62 }
63}
64
65#[derive(Clone)]
67pub struct Metrics {
68 pub connections_closed_total: CounterVec<U64>,
70 pub connections_opened_total: CounterVec<U64>,
71 pub distinct_peers_connections_closed_total: Counter<U64>,
72 pub distinct_peers_connections_opened_total: Counter<U64>,
73 pub incoming_connections_errors_total: CounterVec<U64>,
74 pub incoming_connections_total: Counter<U64>,
75 pub kademlia_query_duration: HistogramVec,
76 pub kademlia_random_queries_total: Counter<U64>,
77 pub kademlia_records_count: Gauge<U64>,
78 pub kademlia_records_sizes_total: Gauge<U64>,
79 pub kbuckets_num_nodes: GaugeVec<U64>,
80 pub listeners_local_addresses: Gauge<U64>,
81 pub listeners_errors_total: Counter<U64>,
82 pub pending_connections: Gauge<U64>,
83 pub pending_connections_errors_total: CounterVec<U64>,
84 pub requests_in_failure_total: CounterVec<U64>,
85 pub requests_in_success_total: HistogramVec,
86 pub requests_out_failure_total: CounterVec<U64>,
87 pub requests_out_success_total: HistogramVec,
88 pub requests_response_bytes_total: CounterVec<U64>,
89}
90
91impl Metrics {
92 fn register(registry: &Registry) -> Result<Self, PrometheusError> {
93 Ok(Self {
94 connections_closed_total: prometheus::register(CounterVec::new(
96 Opts::new(
97 "substrate_sub_libp2p_connections_closed_total",
98 "Total number of connections closed, by direction and reason"
99 ),
100 &["direction", "reason"]
101 )?, registry)?,
102 connections_opened_total: prometheus::register(CounterVec::new(
103 Opts::new(
104 "substrate_sub_libp2p_connections_opened_total",
105 "Total number of connections opened by direction"
106 ),
107 &["direction"]
108 )?, registry)?,
109 distinct_peers_connections_closed_total: prometheus::register(Counter::new(
110 "substrate_sub_libp2p_distinct_peers_connections_closed_total",
111 "Total number of connections closed with distinct peers"
112 )?, registry)?,
113 distinct_peers_connections_opened_total: prometheus::register(Counter::new(
114 "substrate_sub_libp2p_distinct_peers_connections_opened_total",
115 "Total number of connections opened with distinct peers"
116 )?, registry)?,
117 incoming_connections_errors_total: prometheus::register(CounterVec::new(
118 Opts::new(
119 "substrate_sub_libp2p_incoming_connections_handshake_errors_total",
120 "Total number of incoming connections that have failed during the \
121 initial handshake"
122 ),
123 &["reason"]
124 )?, registry)?,
125 incoming_connections_total: prometheus::register(Counter::new(
126 "substrate_sub_libp2p_incoming_connections_total",
127 "Total number of incoming connections on the listening sockets"
128 )?, registry)?,
129 kademlia_query_duration: prometheus::register(HistogramVec::new(
130 HistogramOpts {
131 common_opts: Opts::new(
132 "substrate_sub_libp2p_kademlia_query_duration",
133 "Duration of Kademlia queries per query type"
134 ),
135 buckets: prometheus::exponential_buckets(0.5, 2.0, 10)
136 .expect("parameters are always valid values; qed"),
137 },
138 &["type"]
139 )?, registry)?,
140 kademlia_random_queries_total: prometheus::register(Counter::new(
141 "substrate_sub_libp2p_kademlia_random_queries_total",
142 "Number of random Kademlia queries started",
143 )?, registry)?,
144 kademlia_records_count: prometheus::register(Gauge::new(
145 "substrate_sub_libp2p_kademlia_records_count",
146 "Number of records in the Kademlia records store",
147 )?, registry)?,
148 kademlia_records_sizes_total: prometheus::register(Gauge::new(
149 "substrate_sub_libp2p_kademlia_records_sizes_total",
150 "Total size of all the records in the Kademlia records store",
151 )?, registry)?,
152 kbuckets_num_nodes: prometheus::register(GaugeVec::new(
153 Opts::new(
154 "substrate_sub_libp2p_kbuckets_num_nodes",
155 "Number of nodes per kbucket per Kademlia instance"
156 ),
157 &["lower_ilog2_bucket_bound"]
158 )?, registry)?,
159 listeners_local_addresses: prometheus::register(Gauge::new(
160 "substrate_sub_libp2p_listeners_local_addresses",
161 "Number of local addresses we're listening on"
162 )?, registry)?,
163 listeners_errors_total: prometheus::register(Counter::new(
164 "substrate_sub_libp2p_listeners_errors_total",
165 "Total number of non-fatal errors reported by a listener"
166 )?, registry)?,
167 pending_connections: prometheus::register(Gauge::new(
168 "substrate_sub_libp2p_pending_connections",
169 "Number of connections in the process of being established",
170 )?, registry)?,
171 pending_connections_errors_total: prometheus::register(CounterVec::new(
172 Opts::new(
173 "substrate_sub_libp2p_pending_connections_errors_total",
174 "Total number of pending connection errors"
175 ),
176 &["reason"]
177 )?, registry)?,
178 requests_in_failure_total: prometheus::register(CounterVec::new(
179 Opts::new(
180 "substrate_sub_libp2p_requests_in_failure_total",
181 "Total number of incoming requests that the node has failed to answer"
182 ),
183 &["protocol", "reason"]
184 )?, registry)?,
185 requests_in_success_total: prometheus::register(HistogramVec::new(
186 HistogramOpts {
187 common_opts: Opts::new(
188 "substrate_sub_libp2p_requests_in_success_total",
189 "For successful incoming requests, time between receiving the request and \
190 starting to send the response"
191 ),
192 buckets: prometheus::exponential_buckets(0.001, 2.0, 16)
193 .expect("parameters are always valid values; qed"),
194 },
195 &["protocol"]
196 )?, registry)?,
197 requests_out_failure_total: prometheus::register(CounterVec::new(
198 Opts::new(
199 "substrate_sub_libp2p_requests_out_failure_total",
200 "Total number of requests that have failed"
201 ),
202 &["protocol", "reason"]
203 )?, registry)?,
204 requests_out_success_total: prometheus::register(HistogramVec::new(
205 HistogramOpts {
206 common_opts: Opts::new(
207 "substrate_sub_libp2p_requests_out_success_total",
208 "For successful outgoing requests, time between a request's start and finish"
209 ),
210 buckets: prometheus::exponential_buckets(0.001, 2.0, 16)
211 .expect("parameters are always valid values; qed"),
212 },
213 &["protocol"]
214 )?, registry)?,
215 requests_response_bytes_total: prometheus::register(CounterVec::new(
216 Opts::new(
217 "substrate_sub_libp2p_requests_response_bytes_total",
218 "Total bytes sent and received by request-response protocols"
219 ),
220 &["direction", "protocol"]
221 )?, registry)?,
222 })
223 }
224}
225
226#[derive(Clone, Debug)]
228pub struct PeerStoreMetrics {
229 pub num_banned_peers: Gauge<U64>,
230 pub num_discovered: Gauge<U64>,
231}
232
233impl PeerStoreMetrics {
234 pub fn register(registry: &Registry) -> Result<Self, PrometheusError> {
235 Ok(Self {
236 num_banned_peers: prometheus::register(
237 Gauge::new(
238 "substrate_sub_libp2p_peerset_num_banned_peers",
239 "Number of banned peers stored in the peerset manager",
240 )?,
241 registry,
242 )?,
243 num_discovered: prometheus::register(
244 Gauge::new(
245 "substrate_sub_libp2p_peerset_num_discovered",
246 "Number of nodes stored in the peerset manager",
247 )?,
248 registry,
249 )?,
250 })
251 }
252}
253
254#[derive(Clone)]
256pub struct BandwidthCounters(Arc<dyn BandwidthSink>);
257
258impl BandwidthCounters {
259 fn register(registry: &Registry, sinks: Arc<dyn BandwidthSink>) -> Result<(), PrometheusError> {
262 prometheus::register(
263 SourcedCounter::new(
264 &Opts::new("substrate_sub_libp2p_network_bytes_total", "Total bandwidth usage")
265 .variable_label("direction"),
266 BandwidthCounters(sinks),
267 )?,
268 registry,
269 )?;
270
271 Ok(())
272 }
273}
274
275impl MetricSource for BandwidthCounters {
276 type N = u64;
277
278 fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
279 set(&["in"], self.0.total_inbound());
280 set(&["out"], self.0.total_outbound());
281 }
282}
283
284#[derive(Clone)]
286pub struct NumConnectedGauge(Arc<AtomicUsize>);
287
288impl NumConnectedGauge {
289 fn register(registry: &Registry, value: Arc<AtomicUsize>) -> Result<(), PrometheusError> {
292 prometheus::register(
293 SourcedGauge::new(
294 &Opts::new("substrate_sub_libp2p_peers_count", "Number of connected peers"),
295 NumConnectedGauge(value),
296 )?,
297 registry,
298 )?;
299
300 Ok(())
301 }
302}
303
304impl MetricSource for NumConnectedGauge {
305 type N = u64;
306
307 fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
308 set(&[], self.0.load(Ordering::Relaxed) as u64);
309 }
310}
311
312#[derive(Debug, Clone)]
316pub struct NotificationMetrics {
317 metrics: Option<InnerNotificationMetrics>,
319}
320
321impl NotificationMetrics {
322 pub fn new(registry: Option<&Registry>) -> NotificationMetrics {
324 let metrics = match registry {
325 Some(registry) => InnerNotificationMetrics::register(registry).ok(),
326 None => None,
327 };
328
329 Self { metrics }
330 }
331
332 pub fn register_substream_opened(&self, protocol: &ProtocolName) {
334 if let Some(metrics) = &self.metrics {
335 metrics.notifications_streams_opened_total.with_label_values(&[&protocol]).inc();
336 }
337 }
338
339 pub fn register_substream_closed(&self, protocol: &ProtocolName) {
341 if let Some(metrics) = &self.metrics {
342 metrics
343 .notifications_streams_closed_total
344 .with_label_values(&[&protocol[..]])
345 .inc();
346 }
347 }
348
349 pub fn register_notification_sent(&self, protocol: &ProtocolName, size: usize) {
351 if let Some(metrics) = &self.metrics {
352 metrics
353 .notifications_sizes
354 .with_label_values(&["out", protocol])
355 .observe(size as f64);
356 }
357 }
358
359 pub fn register_notification_received(&self, protocol: &ProtocolName, size: usize) {
361 if let Some(metrics) = &self.metrics {
362 metrics
363 .notifications_sizes
364 .with_label_values(&["in", protocol])
365 .observe(size as f64);
366 }
367 }
368
369 pub fn set_peerset_num_connected(
371 &self,
372 protocol: &ProtocolName,
373 in_reserved: usize,
374 in_non_reserved: usize,
375 out_reserved: usize,
376 out_non_reserved: usize,
377 num_disconnected: usize,
378 num_backoff: usize,
379 ) {
380 if let Some(metrics) = &self.metrics {
381 metrics
382 .peerset_num_connected
383 .with_label_values(&["in", "reserved", protocol])
384 .set(in_reserved as u64);
385 metrics
386 .peerset_num_connected
387 .with_label_values(&["in", "non-reserved", protocol])
388 .set(in_non_reserved as u64);
389 metrics
390 .peerset_num_connected
391 .with_label_values(&["out", "reserved", protocol])
392 .set(out_reserved as u64);
393 metrics
394 .peerset_num_connected
395 .with_label_values(&["out", "non-reserved", protocol])
396 .set(out_non_reserved as u64);
397
398 metrics
399 .peerset_num_state
400 .with_label_values(&["disconnected", protocol])
401 .set(num_disconnected as u64);
402 metrics
403 .peerset_num_state
404 .with_label_values(&["backoff", protocol])
405 .set(num_backoff as u64);
406 }
407 }
408}
409
410#[derive(Debug, Clone)]
412struct InnerNotificationMetrics {
413 pub notifications_streams_opened_total: CounterVec<U64>,
415
416 pub notifications_streams_closed_total: CounterVec<U64>,
418
419 pub notifications_sizes: HistogramVec,
421
422 pub peerset_num_connected: GaugeVec<U64>,
424
425 pub peerset_num_state: GaugeVec<U64>,
427}
428
429impl InnerNotificationMetrics {
430 fn register(registry: &Registry) -> Result<Self, PrometheusError> {
431 Ok(Self {
432 notifications_sizes: prometheus::register(
433 HistogramVec::new(
434 HistogramOpts {
435 common_opts: Opts::new(
436 "substrate_sub_libp2p_notifications_sizes",
437 "Sizes of the notifications send to and received from all nodes",
438 ),
439 buckets: prometheus::exponential_buckets(64.0, 4.0, 8)
440 .expect("parameters are always valid values; qed"),
441 },
442 &["direction", "protocol"],
443 )?,
444 registry,
445 )?,
446 notifications_streams_closed_total: prometheus::register(
447 CounterVec::new(
448 Opts::new(
449 "substrate_sub_libp2p_notifications_streams_closed_total",
450 "Total number of notification substreams that have been closed",
451 ),
452 &["protocol"],
453 )?,
454 registry,
455 )?,
456 notifications_streams_opened_total: prometheus::register(
457 CounterVec::new(
458 Opts::new(
459 "substrate_sub_libp2p_notifications_streams_opened_total",
460 "Total number of notification substreams that have been opened",
461 ),
462 &["protocol"],
463 )?,
464 registry,
465 )?,
466 peerset_num_connected: prometheus::register(
467 GaugeVec::new(
468 Opts::new(
469 "substrate_sub_libp2p_peerset_num_connected",
470 "Number of connected peers per direction, reservation status and protocol",
471 ),
472 &["direction", "kind", "protocol"],
473 )?,
474 registry,
475 )?,
476 peerset_num_state: prometheus::register(
477 GaugeVec::new(
478 Opts::new(
479 "substrate_sub_libp2p_peerset_num_state",
480 "Number of peers per state in the peerset manager",
481 ),
482 &["state", "protocol"],
483 )?,
484 registry,
485 )?,
486 })
487 }
488}