referrerpolicy=no-referrer-when-downgrade

sc_transaction_pool/fork_aware_txpool/
metrics.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Prometheus's metrics for a fork-aware transaction pool.
20
21use super::tx_mem_pool::InsertionInfo;
22use crate::{
23	common::metrics::{GenericMetricsLink, MetricsRegistrant},
24	graph::{self, BlockHash, ExtrinsicHash},
25	LOG_TARGET,
26};
27use futures::{FutureExt, StreamExt};
28use prometheus_endpoint::{
29	exponential_buckets, histogram_opts, linear_buckets, register, Counter, Gauge, Histogram,
30	PrometheusError, Registry, U64,
31};
32#[cfg(doc)]
33use sc_transaction_pool_api::TransactionPool;
34use sc_transaction_pool_api::TransactionStatus;
35use sc_utils::mpsc;
36use std::{
37	collections::{hash_map::Entry, HashMap},
38	future::Future,
39	pin::Pin,
40	time::{Duration, Instant},
41};
42use tracing::trace;
43
44/// A helper alias for the Prometheus's metrics endpoint.
45pub type MetricsLink = GenericMetricsLink<Metrics>;
46
47/// Transaction pool Prometheus metrics.
48pub struct Metrics {
49	/// Total number of transactions submitted.
50	pub submitted_transactions: Counter<U64>,
51	/// Total number of currently maintained views.
52	pub active_views: Gauge<U64>,
53	/// Total number of current inactive views.
54	pub inactive_views: Gauge<U64>,
55	/// Total number of watched transactions in txpool.
56	pub watched_txs: Gauge<U64>,
57	/// Total number of unwatched transactions in txpool.
58	pub unwatched_txs: Gauge<U64>,
59	/// Total number of transactions reported as invalid.
60	///
61	/// This only includes transaction reported as invalid by the
62	/// [`TransactionPool::report_invalid`] method.
63	pub reported_invalid_txs: Counter<U64>,
64	/// Total number of transactions removed as invalid.
65	pub removed_invalid_txs: Counter<U64>,
66	/// Total number of transactions from imported blocks that are unknown to the pool.
67	pub unknown_from_block_import_txs: Counter<U64>,
68	/// Total number of finalized transactions.
69	pub finalized_txs: Counter<U64>,
70	/// Histogram of maintain durations.
71	pub maintain_duration: Histogram,
72	/// Total number of transactions resubmitted from retracted forks.
73	pub resubmitted_retracted_txs: Counter<U64>,
74	/// Total number of transactions submitted from mempool to views.
75	pub submitted_from_mempool_txs: Counter<U64>,
76	/// Total number of transactions found as invalid during mempool revalidation.
77	pub mempool_revalidation_invalid_txs: Counter<U64>,
78	/// Total number of transactions found as invalid during view revalidation.
79	pub view_revalidation_invalid_txs: Counter<U64>,
80	/// Total number of valid transactions processed during view revalidation.
81	pub view_revalidation_resubmitted_txs: Counter<U64>,
82	/// Histogram of view revalidation durations.
83	pub view_revalidation_duration: Histogram,
84	/// Total number of the views created w/o cloning existing view.
85	pub non_cloned_views: Counter<U64>,
86	/// Histograms to track the timing distribution of individual transaction pool events.
87	pub events_histograms: EventsHistograms,
88}
89
90/// Represents a collection of histogram timings for different transaction statuses.
91pub struct EventsHistograms {
92	/// Histogram of timings for reporting `TransactionStatus::Future` event
93	pub future: Histogram,
94	/// Histogram of timings for reporting `TransactionStatus::Ready` event
95	pub ready: Histogram,
96	/// Histogram of timings for reporting `TransactionStatus::Broadcast` event
97	pub broadcast: Histogram,
98	/// Histogram of timings for reporting `TransactionStatus::InBlock` event
99	pub in_block: Histogram,
100	/// Histogram of timings for reporting `TransactionStatus::Retracted` event
101	pub retracted: Histogram,
102	/// Histogram of timings for reporting `TransactionStatus::FinalityTimeout` event
103	pub finality_timeout: Histogram,
104	/// Histogram of timings for reporting `TransactionStatus::Finalized` event
105	pub finalized: Histogram,
106	/// Histogram of timings for reporting `TransactionStatus::Usurped(Hash)` event
107	pub usurped: Histogram,
108	/// Histogram of timings for reporting `TransactionStatus::Dropped` event
109	pub dropped: Histogram,
110	/// Histogram of timings for reporting `TransactionStatus::Invalid` event
111	pub invalid: Histogram,
112}
113
114impl EventsHistograms {
115	fn register(registry: &Registry) -> Result<Self, PrometheusError> {
116		Ok(Self {
117			future: register(
118				Histogram::with_opts(histogram_opts!(
119					"substrate_sub_txpool_timing_event_future",
120					"Histogram of timings for reporting Future event",
121					exponential_buckets(0.01, 2.0, 16).unwrap()
122				))?,
123				registry,
124			)?,
125			ready: register(
126				Histogram::with_opts(histogram_opts!(
127					"substrate_sub_txpool_timing_event_ready",
128					"Histogram of timings for reporting Ready event",
129					exponential_buckets(0.01, 2.0, 16).unwrap()
130				))?,
131				registry,
132			)?,
133			broadcast: register(
134				Histogram::with_opts(histogram_opts!(
135					"substrate_sub_txpool_timing_event_broadcast",
136					"Histogram of timings for reporting Broadcast event",
137					linear_buckets(0.01, 0.25, 16).unwrap()
138				))?,
139				registry,
140			)?,
141			in_block: register(
142				Histogram::with_opts(
143					histogram_opts!(
144						"substrate_sub_txpool_timing_event_in_block",
145						"Histogram of timings for reporting InBlock event"
146					)
147					.buckets(
148						[
149							linear_buckets(0.0, 3.0, 20).unwrap(),
150							// requested in #9158
151							vec![60.0, 75.0, 90.0, 120.0, 180.0],
152						]
153						.concat(),
154					),
155				)?,
156				registry,
157			)?,
158			retracted: register(
159				Histogram::with_opts(histogram_opts!(
160					"substrate_sub_txpool_timing_event_retracted",
161					"Histogram of timings for reporting Retracted event",
162					linear_buckets(0.0, 3.0, 20).unwrap()
163				))?,
164				registry,
165			)?,
166			finality_timeout: register(
167				Histogram::with_opts(histogram_opts!(
168					"substrate_sub_txpool_timing_event_finality_timeout",
169					"Histogram of timings for reporting FinalityTimeout event",
170					linear_buckets(0.0, 40.0, 20).unwrap()
171				))?,
172				registry,
173			)?,
174			finalized: register(
175				Histogram::with_opts(
176					histogram_opts!(
177						"substrate_sub_txpool_timing_event_finalized",
178						"Histogram of timings for reporting Finalized event"
179					)
180					.buckets(
181						[
182							// requested in #9158
183							linear_buckets(0.0, 5.0, 8).unwrap(),
184							linear_buckets(40.0, 40.0, 19).unwrap(),
185						]
186						.concat(),
187					),
188				)?,
189				registry,
190			)?,
191			usurped: register(
192				Histogram::with_opts(
193					histogram_opts!(
194						"substrate_sub_txpool_timing_event_usurped",
195						"Histogram of timings for reporting Usurped event"
196					)
197					.buckets(
198						[
199							linear_buckets(0.0, 3.0, 20).unwrap(),
200							// requested in #9158
201							vec![60.0, 75.0, 90.0, 120.0, 180.0],
202						]
203						.concat(),
204					),
205				)?,
206				registry,
207			)?,
208			dropped: register(
209				Histogram::with_opts(
210					histogram_opts!(
211						"substrate_sub_txpool_timing_event_dropped",
212						"Histogram of timings for reporting Dropped event"
213					)
214					.buckets(
215						[
216							linear_buckets(0.0, 3.0, 20).unwrap(),
217							// requested in #9158
218							vec![60.0, 75.0, 90.0, 120.0, 180.0],
219						]
220						.concat(),
221					),
222				)?,
223				registry,
224			)?,
225			invalid: register(
226				Histogram::with_opts(
227					histogram_opts!(
228						"substrate_sub_txpool_timing_event_invalid",
229						"Histogram of timings for reporting Invalid event"
230					)
231					.buckets(
232						[
233							linear_buckets(0.0, 3.0, 20).unwrap(),
234							// requested in #9158
235							vec![60.0, 75.0, 90.0, 120.0, 180.0],
236						]
237						.concat(),
238					),
239				)?,
240				registry,
241			)?,
242		})
243	}
244
245	/// Records the timing for a given transaction status.
246	///
247	/// This method records the duration, representing the time elapsed since the
248	/// transaction was submitted until the event was reported. Based on the
249	/// transaction status, it utilizes the appropriate histogram to log this duration.
250	pub fn observe<Hash, BlockHash>(
251		&self,
252		status: TransactionStatus<Hash, BlockHash>,
253		duration: Duration,
254	) {
255		let duration = duration.as_secs_f64();
256		let histogram = match status {
257			TransactionStatus::Future => &self.future,
258			TransactionStatus::Ready => &self.ready,
259			TransactionStatus::Broadcast(..) => &self.broadcast,
260			TransactionStatus::InBlock(..) => &self.in_block,
261			TransactionStatus::Retracted(..) => &self.retracted,
262			TransactionStatus::FinalityTimeout(..) => &self.finality_timeout,
263			TransactionStatus::Finalized(..) => &self.finalized,
264			TransactionStatus::Usurped(..) => &self.usurped,
265			TransactionStatus::Dropped => &self.dropped,
266			TransactionStatus::Invalid => &self.invalid,
267		};
268		histogram.observe(duration);
269	}
270}
271
272impl MetricsRegistrant for Metrics {
273	fn register(registry: &Registry) -> Result<Box<Self>, PrometheusError> {
274		Ok(Box::from(Self {
275			submitted_transactions: register(
276				Counter::new(
277					"substrate_sub_txpool_submitted_txs_total",
278					"Total number of transactions submitted",
279				)?,
280				registry,
281			)?,
282			active_views: register(
283				Gauge::new(
284					"substrate_sub_txpool_active_views",
285					"Total number of currently maintained views.",
286				)?,
287				registry,
288			)?,
289			inactive_views: register(
290				Gauge::new(
291					"substrate_sub_txpool_inactive_views",
292					"Total number of current inactive views.",
293				)?,
294				registry,
295			)?,
296			watched_txs: register(
297				Gauge::new(
298					"substrate_sub_txpool_watched_txs",
299					"Total number of watched transactions in txpool.",
300				)?,
301				registry,
302			)?,
303			unwatched_txs: register(
304				Gauge::new(
305					"substrate_sub_txpool_unwatched_txs",
306					"Total number of unwatched transactions in txpool.",
307				)?,
308				registry,
309			)?,
310			reported_invalid_txs: register(
311				Counter::new(
312					"substrate_sub_txpool_reported_invalid_txs_total",
313					"Total number of transactions reported as invalid by external entities using TxPool API.",
314				)?,
315				registry,
316			)?,
317			removed_invalid_txs: register(
318				Counter::new(
319					"substrate_sub_txpool_removed_invalid_txs_total",
320					"Total number of transactions removed as invalid.",
321				)?,
322				registry,
323			)?,
324			unknown_from_block_import_txs: register(
325				Counter::new(
326					"substrate_sub_txpool_unknown_from_block_import_txs_total",
327					"Total number of transactions from imported blocks that are unknown to the pool.",
328				)?,
329				registry,
330			)?,
331			finalized_txs: register(
332				Counter::new(
333					"substrate_sub_txpool_finalized_txs_total",
334					"Total number of finalized transactions.",
335				)?,
336				registry,
337			)?,
338			maintain_duration: register(
339				Histogram::with_opts(histogram_opts!(
340					"substrate_sub_txpool_maintain_duration_seconds",
341					"Histogram of maintain durations.",
342					linear_buckets(0.0, 0.25, 13).unwrap()
343				))?,
344				registry,
345			)?,
346			resubmitted_retracted_txs: register(
347				Counter::new(
348					"substrate_sub_txpool_resubmitted_retracted_txs_total",
349					"Total number of transactions resubmitted from retracted forks.",
350				)?,
351				registry,
352			)?,
353			submitted_from_mempool_txs: register(
354				Counter::new(
355					"substrate_sub_txpool_submitted_from_mempool_txs_total",
356					"Total number of transactions submitted from mempool to views.",
357				)?,
358				registry,
359			)?,
360			mempool_revalidation_invalid_txs: register(
361				Counter::new(
362					"substrate_sub_txpool_mempool_revalidation_invalid_txs_total",
363					"Total number of transactions found as invalid during mempool revalidation.",
364				)?,
365				registry,
366			)?,
367			view_revalidation_invalid_txs: register(
368				Counter::new(
369					"substrate_sub_txpool_view_revalidation_invalid_txs_total",
370					"Total number of transactions found as invalid during view revalidation.",
371				)?,
372				registry,
373			)?,
374			view_revalidation_resubmitted_txs: register(
375				Counter::new(
376					"substrate_sub_txpool_view_revalidation_resubmitted_txs_total",
377					"Total number of valid transactions processed during view revalidation.",
378				)?,
379				registry,
380			)?,
381			view_revalidation_duration: register(
382				Histogram::with_opts(histogram_opts!(
383					"substrate_sub_txpool_view_revalidation_duration_seconds",
384					"Histogram of view revalidation durations.",
385					linear_buckets(0.0, 0.25, 13).unwrap()
386				))?,
387				registry,
388			)?,
389			non_cloned_views: register(
390				Counter::new(
391					"substrate_sub_txpool_non_cloned_views_total",
392					"Total number of the views created w/o cloning existing view.",
393				)?,
394				registry,
395			)?,
396			events_histograms: EventsHistograms::register(registry)?,
397		}))
398	}
399}
400
401/// Messages used to report and compute event metrics.
402enum EventMetricsMessage<Hash, BlockHash> {
403	/// Message indicating a transaction has been submitted, including the timestamp
404	/// and its hash.
405	Submitted(Instant, Hash),
406	/// Message indicating the new status of a transaction, including the timestamp and transaction
407	/// hash.
408	Status(Instant, Hash, TransactionStatus<Hash, BlockHash>),
409}
410
411/// Collects metrics related to transaction events.
412pub struct EventsMetricsCollector<ChainApi: graph::ChainApi> {
413	/// Optional channel for sending event metrics messages.
414	///
415	/// If `None` no event metrics are collected (e.g. in tests).
416	metrics_message_sink: Option<MessageSink<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>>,
417}
418
419impl<ChainApi: graph::ChainApi> Default for EventsMetricsCollector<ChainApi> {
420	fn default() -> Self {
421		Self { metrics_message_sink: None }
422	}
423}
424
425impl<ChainApi: graph::ChainApi> Clone for EventsMetricsCollector<ChainApi> {
426	fn clone(&self) -> Self {
427		Self { metrics_message_sink: self.metrics_message_sink.clone() }
428	}
429}
430
431impl<ChainApi: graph::ChainApi> EventsMetricsCollector<ChainApi> {
432	/// Reports the status of a transaction.
433	///
434	/// Takes a transaction hash and status, and attempts to send a status
435	/// message to the metrics messages processing task.
436	pub fn report_status(
437		&self,
438		tx_hash: ExtrinsicHash<ChainApi>,
439		status: TransactionStatus<BlockHash<ChainApi>, ExtrinsicHash<ChainApi>>,
440	) {
441		self.metrics_message_sink.as_ref().map(|sink| {
442			if let Err(error) =
443				sink.unbounded_send(EventMetricsMessage::Status(Instant::now(), tx_hash, status))
444			{
445				trace!(target: LOG_TARGET, %error, "tx status metrics message send failed")
446			}
447		});
448	}
449
450	/// Reports that a transaction has been submitted.
451	///
452	/// Takes a transaction hash and its submission timestamp, and attempts to
453	/// send a submission message to the metrics messages processing task.
454	pub fn report_submitted(&self, insertion_info: &InsertionInfo<ExtrinsicHash<ChainApi>>) {
455		self.metrics_message_sink.as_ref().map(|sink| {
456			if let Err(error) = sink.unbounded_send(EventMetricsMessage::Submitted(
457				insertion_info
458					.source
459					.timestamp
460					.expect("timestamp is set in fork-aware pool. qed"),
461				insertion_info.hash,
462			)) {
463				trace!(target: LOG_TARGET, %error, "tx status metrics message send failed")
464			}
465		});
466	}
467}
468
469/// A type alias for a asynchronous task that collects metrics related to events.
470pub type EventsMetricsCollectorTask = Pin<Box<dyn Future<Output = ()> + Send>>;
471
472/// Sink type for sending event metrics messages.
473type MessageSink<Hash, BlockHash> =
474	mpsc::TracingUnboundedSender<EventMetricsMessage<Hash, BlockHash>>;
475
476/// Receiver type for receiving event metrics messages.
477type MessageReceiver<Hash, BlockHash> =
478	mpsc::TracingUnboundedReceiver<EventMetricsMessage<Hash, BlockHash>>;
479
480/// Holds data relevant to transaction event metrics, allowing de-duplication
481/// of certain transaction statuses, and compute the timings of events.
482struct TransactionEventMetricsData {
483	/// Flag indicating if the transaction was seen as `Ready`.
484	ready_seen: bool,
485	/// Flag indicating if the transaction was seen as `Broadcast`.
486	broadcast_seen: bool,
487	/// Flag indicating if the transaction was seen as `Future`.
488	future_seen: bool,
489	/// Flag indicating if the transaction was seen as `InBlock`.
490	in_block_seen: bool,
491	/// Flag indicating if the transaction was seen as `Retracted`.
492	retracted_seen: bool,
493	/// Timestamp when the transaction was submitted.
494	///
495	/// Used to compute a time elapsed until events are reported.
496	submit_timestamp: Instant,
497}
498
499impl TransactionEventMetricsData {
500	/// Creates a new `TransactionEventMetricsData` with the given timestamp.
501	fn new(submit_timestamp: Instant) -> Self {
502		Self {
503			submit_timestamp,
504			future_seen: false,
505			ready_seen: false,
506			broadcast_seen: false,
507			in_block_seen: false,
508			retracted_seen: false,
509		}
510	}
511
512	/// Sets flag to true once.
513	///
514	/// Return true if flag was toggled.
515	fn set_true_once(flag: &mut bool) -> bool {
516		if *flag {
517			false
518		} else {
519			*flag = true;
520			true
521		}
522	}
523
524	/// Updates the status flags based on the given transaction status.
525	///
526	/// Returns the submit timestamp if given status was not seen yet, `None` otherwise.
527	fn update<Hash, BlockHash>(
528		&mut self,
529		status: &TransactionStatus<Hash, BlockHash>,
530	) -> Option<Instant> {
531		let flag = match *status {
532			TransactionStatus::Ready => &mut self.ready_seen,
533			TransactionStatus::Future => &mut self.future_seen,
534			TransactionStatus::Broadcast(..) => &mut self.broadcast_seen,
535			TransactionStatus::InBlock(..) => &mut self.in_block_seen,
536			TransactionStatus::Retracted(..) => &mut self.retracted_seen,
537			_ => return Some(self.submit_timestamp),
538		};
539		Self::set_true_once(flag).then_some(self.submit_timestamp)
540	}
541}
542
543impl<ChainApi> EventsMetricsCollector<ChainApi>
544where
545	ChainApi: graph::ChainApi + 'static,
546{
547	/// Handles the status event.
548	///
549	/// Updates the metrics by observing the time taken for a transaction's status update
550	/// from its submission time.
551	fn handle_status(
552		hash: ExtrinsicHash<ChainApi>,
553		status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
554		timestamp: Instant,
555		submitted_timestamp_map: &mut HashMap<ExtrinsicHash<ChainApi>, TransactionEventMetricsData>,
556		metrics: &MetricsLink,
557	) {
558		let Entry::Occupied(mut entry) = submitted_timestamp_map.entry(hash) else { return };
559		let remove = status.is_final();
560		if let Some(submit_timestamp) = entry.get_mut().update(&status) {
561			metrics.report(|metrics| {
562				metrics
563					.events_histograms
564					.observe(status, timestamp.duration_since(submit_timestamp))
565			});
566		}
567		remove.then(|| entry.remove());
568	}
569
570	/// Asynchronous task to process received messages and compute relevant event metrics.
571	///
572	/// Runs indefinitely, handling arriving messages and updating metrics
573	/// based on the recorded submission times and timestamps of current event statuses.
574	async fn task(
575		mut rx: MessageReceiver<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
576		metrics: MetricsLink,
577	) {
578		let mut submitted_timestamp_map =
579			HashMap::<ExtrinsicHash<ChainApi>, TransactionEventMetricsData>::default();
580
581		loop {
582			match rx.next().await {
583				Some(EventMetricsMessage::Submitted(timestamp, hash)) => {
584					submitted_timestamp_map
585						.insert(hash, TransactionEventMetricsData::new(timestamp));
586				},
587				Some(EventMetricsMessage::Status(timestamp, hash, status)) => {
588					Self::handle_status(
589						hash,
590						status,
591						timestamp,
592						&mut submitted_timestamp_map,
593						&metrics,
594					);
595				},
596				None => {
597					return /* ? */
598				},
599			};
600		}
601	}
602
603	/// Constructs a new `EventsMetricsCollector` and its associated worker task.
604	///
605	/// Returns the collector alongside an asynchronous task. The task shall be polled by caller.
606	pub fn new_with_worker(metrics: MetricsLink) -> (Self, EventsMetricsCollectorTask) {
607		const QUEUE_WARN_SIZE: usize = 100_000;
608		let (metrics_message_sink, rx) =
609			mpsc::tracing_unbounded("txpool-event-metrics-collector", QUEUE_WARN_SIZE);
610		let task = Self::task(rx, metrics);
611
612		(Self { metrics_message_sink: Some(metrics_message_sink) }, task.boxed())
613	}
614}