1use super::tx_mem_pool::InsertionInfo;
22use crate::{
23 common::metrics::{GenericMetricsLink, MetricsRegistrant},
24 graph::{self, BlockHash, ExtrinsicHash},
25 LOG_TARGET,
26};
27use futures::{FutureExt, StreamExt};
28use prometheus_endpoint::{
29 exponential_buckets, histogram_opts, linear_buckets, register, Counter, Gauge, Histogram,
30 PrometheusError, Registry, U64,
31};
32#[cfg(doc)]
33use sc_transaction_pool_api::TransactionPool;
34use sc_transaction_pool_api::TransactionStatus;
35use sc_utils::mpsc;
36use std::{
37 collections::{hash_map::Entry, HashMap},
38 future::Future,
39 pin::Pin,
40 time::{Duration, Instant},
41};
42use tracing::trace;
43
44pub type MetricsLink = GenericMetricsLink<Metrics>;
46
47pub struct Metrics {
49 pub submitted_transactions: Counter<U64>,
51 pub active_views: Gauge<U64>,
53 pub inactive_views: Gauge<U64>,
55 pub watched_txs: Gauge<U64>,
57 pub unwatched_txs: Gauge<U64>,
59 pub reported_invalid_txs: Counter<U64>,
64 pub removed_invalid_txs: Counter<U64>,
66 pub unknown_from_block_import_txs: Counter<U64>,
68 pub finalized_txs: Counter<U64>,
70 pub maintain_duration: Histogram,
72 pub resubmitted_retracted_txs: Counter<U64>,
74 pub submitted_from_mempool_txs: Counter<U64>,
76 pub mempool_revalidation_invalid_txs: Counter<U64>,
78 pub view_revalidation_invalid_txs: Counter<U64>,
80 pub view_revalidation_resubmitted_txs: Counter<U64>,
82 pub view_revalidation_duration: Histogram,
84 pub non_cloned_views: Counter<U64>,
86 pub events_histograms: EventsHistograms,
88}
89
90pub struct EventsHistograms {
92 pub future: Histogram,
94 pub ready: Histogram,
96 pub broadcast: Histogram,
98 pub in_block: Histogram,
100 pub retracted: Histogram,
102 pub finality_timeout: Histogram,
104 pub finalized: Histogram,
106 pub usurped: Histogram,
108 pub dropped: Histogram,
110 pub invalid: Histogram,
112}
113
114impl EventsHistograms {
115 fn register(registry: &Registry) -> Result<Self, PrometheusError> {
116 Ok(Self {
117 future: register(
118 Histogram::with_opts(histogram_opts!(
119 "substrate_sub_txpool_timing_event_future",
120 "Histogram of timings for reporting Future event",
121 exponential_buckets(0.01, 2.0, 16).unwrap()
122 ))?,
123 registry,
124 )?,
125 ready: register(
126 Histogram::with_opts(histogram_opts!(
127 "substrate_sub_txpool_timing_event_ready",
128 "Histogram of timings for reporting Ready event",
129 exponential_buckets(0.01, 2.0, 16).unwrap()
130 ))?,
131 registry,
132 )?,
133 broadcast: register(
134 Histogram::with_opts(histogram_opts!(
135 "substrate_sub_txpool_timing_event_broadcast",
136 "Histogram of timings for reporting Broadcast event",
137 linear_buckets(0.01, 0.25, 16).unwrap()
138 ))?,
139 registry,
140 )?,
141 in_block: register(
142 Histogram::with_opts(
143 histogram_opts!(
144 "substrate_sub_txpool_timing_event_in_block",
145 "Histogram of timings for reporting InBlock event"
146 )
147 .buckets(
148 [
149 linear_buckets(0.0, 3.0, 20).unwrap(),
150 vec![60.0, 75.0, 90.0, 120.0, 180.0],
152 ]
153 .concat(),
154 ),
155 )?,
156 registry,
157 )?,
158 retracted: register(
159 Histogram::with_opts(histogram_opts!(
160 "substrate_sub_txpool_timing_event_retracted",
161 "Histogram of timings for reporting Retracted event",
162 linear_buckets(0.0, 3.0, 20).unwrap()
163 ))?,
164 registry,
165 )?,
166 finality_timeout: register(
167 Histogram::with_opts(histogram_opts!(
168 "substrate_sub_txpool_timing_event_finality_timeout",
169 "Histogram of timings for reporting FinalityTimeout event",
170 linear_buckets(0.0, 40.0, 20).unwrap()
171 ))?,
172 registry,
173 )?,
174 finalized: register(
175 Histogram::with_opts(
176 histogram_opts!(
177 "substrate_sub_txpool_timing_event_finalized",
178 "Histogram of timings for reporting Finalized event"
179 )
180 .buckets(
181 [
182 linear_buckets(0.0, 5.0, 8).unwrap(),
184 linear_buckets(40.0, 40.0, 19).unwrap(),
185 ]
186 .concat(),
187 ),
188 )?,
189 registry,
190 )?,
191 usurped: register(
192 Histogram::with_opts(
193 histogram_opts!(
194 "substrate_sub_txpool_timing_event_usurped",
195 "Histogram of timings for reporting Usurped event"
196 )
197 .buckets(
198 [
199 linear_buckets(0.0, 3.0, 20).unwrap(),
200 vec![60.0, 75.0, 90.0, 120.0, 180.0],
202 ]
203 .concat(),
204 ),
205 )?,
206 registry,
207 )?,
208 dropped: register(
209 Histogram::with_opts(
210 histogram_opts!(
211 "substrate_sub_txpool_timing_event_dropped",
212 "Histogram of timings for reporting Dropped event"
213 )
214 .buckets(
215 [
216 linear_buckets(0.0, 3.0, 20).unwrap(),
217 vec![60.0, 75.0, 90.0, 120.0, 180.0],
219 ]
220 .concat(),
221 ),
222 )?,
223 registry,
224 )?,
225 invalid: register(
226 Histogram::with_opts(
227 histogram_opts!(
228 "substrate_sub_txpool_timing_event_invalid",
229 "Histogram of timings for reporting Invalid event"
230 )
231 .buckets(
232 [
233 linear_buckets(0.0, 3.0, 20).unwrap(),
234 vec![60.0, 75.0, 90.0, 120.0, 180.0],
236 ]
237 .concat(),
238 ),
239 )?,
240 registry,
241 )?,
242 })
243 }
244
245 pub fn observe<Hash, BlockHash>(
251 &self,
252 status: TransactionStatus<Hash, BlockHash>,
253 duration: Duration,
254 ) {
255 let duration = duration.as_secs_f64();
256 let histogram = match status {
257 TransactionStatus::Future => &self.future,
258 TransactionStatus::Ready => &self.ready,
259 TransactionStatus::Broadcast(..) => &self.broadcast,
260 TransactionStatus::InBlock(..) => &self.in_block,
261 TransactionStatus::Retracted(..) => &self.retracted,
262 TransactionStatus::FinalityTimeout(..) => &self.finality_timeout,
263 TransactionStatus::Finalized(..) => &self.finalized,
264 TransactionStatus::Usurped(..) => &self.usurped,
265 TransactionStatus::Dropped => &self.dropped,
266 TransactionStatus::Invalid => &self.invalid,
267 };
268 histogram.observe(duration);
269 }
270}
271
272impl MetricsRegistrant for Metrics {
273 fn register(registry: &Registry) -> Result<Box<Self>, PrometheusError> {
274 Ok(Box::from(Self {
275 submitted_transactions: register(
276 Counter::new(
277 "substrate_sub_txpool_submitted_txs_total",
278 "Total number of transactions submitted",
279 )?,
280 registry,
281 )?,
282 active_views: register(
283 Gauge::new(
284 "substrate_sub_txpool_active_views",
285 "Total number of currently maintained views.",
286 )?,
287 registry,
288 )?,
289 inactive_views: register(
290 Gauge::new(
291 "substrate_sub_txpool_inactive_views",
292 "Total number of current inactive views.",
293 )?,
294 registry,
295 )?,
296 watched_txs: register(
297 Gauge::new(
298 "substrate_sub_txpool_watched_txs",
299 "Total number of watched transactions in txpool.",
300 )?,
301 registry,
302 )?,
303 unwatched_txs: register(
304 Gauge::new(
305 "substrate_sub_txpool_unwatched_txs",
306 "Total number of unwatched transactions in txpool.",
307 )?,
308 registry,
309 )?,
310 reported_invalid_txs: register(
311 Counter::new(
312 "substrate_sub_txpool_reported_invalid_txs_total",
313 "Total number of transactions reported as invalid by external entities using TxPool API.",
314 )?,
315 registry,
316 )?,
317 removed_invalid_txs: register(
318 Counter::new(
319 "substrate_sub_txpool_removed_invalid_txs_total",
320 "Total number of transactions removed as invalid.",
321 )?,
322 registry,
323 )?,
324 unknown_from_block_import_txs: register(
325 Counter::new(
326 "substrate_sub_txpool_unknown_from_block_import_txs_total",
327 "Total number of transactions from imported blocks that are unknown to the pool.",
328 )?,
329 registry,
330 )?,
331 finalized_txs: register(
332 Counter::new(
333 "substrate_sub_txpool_finalized_txs_total",
334 "Total number of finalized transactions.",
335 )?,
336 registry,
337 )?,
338 maintain_duration: register(
339 Histogram::with_opts(histogram_opts!(
340 "substrate_sub_txpool_maintain_duration_seconds",
341 "Histogram of maintain durations.",
342 linear_buckets(0.0, 0.25, 13).unwrap()
343 ))?,
344 registry,
345 )?,
346 resubmitted_retracted_txs: register(
347 Counter::new(
348 "substrate_sub_txpool_resubmitted_retracted_txs_total",
349 "Total number of transactions resubmitted from retracted forks.",
350 )?,
351 registry,
352 )?,
353 submitted_from_mempool_txs: register(
354 Counter::new(
355 "substrate_sub_txpool_submitted_from_mempool_txs_total",
356 "Total number of transactions submitted from mempool to views.",
357 )?,
358 registry,
359 )?,
360 mempool_revalidation_invalid_txs: register(
361 Counter::new(
362 "substrate_sub_txpool_mempool_revalidation_invalid_txs_total",
363 "Total number of transactions found as invalid during mempool revalidation.",
364 )?,
365 registry,
366 )?,
367 view_revalidation_invalid_txs: register(
368 Counter::new(
369 "substrate_sub_txpool_view_revalidation_invalid_txs_total",
370 "Total number of transactions found as invalid during view revalidation.",
371 )?,
372 registry,
373 )?,
374 view_revalidation_resubmitted_txs: register(
375 Counter::new(
376 "substrate_sub_txpool_view_revalidation_resubmitted_txs_total",
377 "Total number of valid transactions processed during view revalidation.",
378 )?,
379 registry,
380 )?,
381 view_revalidation_duration: register(
382 Histogram::with_opts(histogram_opts!(
383 "substrate_sub_txpool_view_revalidation_duration_seconds",
384 "Histogram of view revalidation durations.",
385 linear_buckets(0.0, 0.25, 13).unwrap()
386 ))?,
387 registry,
388 )?,
389 non_cloned_views: register(
390 Counter::new(
391 "substrate_sub_txpool_non_cloned_views_total",
392 "Total number of the views created w/o cloning existing view.",
393 )?,
394 registry,
395 )?,
396 events_histograms: EventsHistograms::register(registry)?,
397 }))
398 }
399}
400
401enum EventMetricsMessage<Hash, BlockHash> {
403 Submitted(Instant, Hash),
406 Status(Instant, Hash, TransactionStatus<Hash, BlockHash>),
409}
410
411pub struct EventsMetricsCollector<ChainApi: graph::ChainApi> {
413 metrics_message_sink: Option<MessageSink<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>>,
417}
418
419impl<ChainApi: graph::ChainApi> Default for EventsMetricsCollector<ChainApi> {
420 fn default() -> Self {
421 Self { metrics_message_sink: None }
422 }
423}
424
425impl<ChainApi: graph::ChainApi> Clone for EventsMetricsCollector<ChainApi> {
426 fn clone(&self) -> Self {
427 Self { metrics_message_sink: self.metrics_message_sink.clone() }
428 }
429}
430
431impl<ChainApi: graph::ChainApi> EventsMetricsCollector<ChainApi> {
432 pub fn report_status(
437 &self,
438 tx_hash: ExtrinsicHash<ChainApi>,
439 status: TransactionStatus<BlockHash<ChainApi>, ExtrinsicHash<ChainApi>>,
440 ) {
441 self.metrics_message_sink.as_ref().map(|sink| {
442 if let Err(error) =
443 sink.unbounded_send(EventMetricsMessage::Status(Instant::now(), tx_hash, status))
444 {
445 trace!(target: LOG_TARGET, %error, "tx status metrics message send failed")
446 }
447 });
448 }
449
450 pub fn report_submitted(&self, insertion_info: &InsertionInfo<ExtrinsicHash<ChainApi>>) {
455 self.metrics_message_sink.as_ref().map(|sink| {
456 if let Err(error) = sink.unbounded_send(EventMetricsMessage::Submitted(
457 insertion_info
458 .source
459 .timestamp
460 .expect("timestamp is set in fork-aware pool. qed"),
461 insertion_info.hash,
462 )) {
463 trace!(target: LOG_TARGET, %error, "tx status metrics message send failed")
464 }
465 });
466 }
467}
468
469pub type EventsMetricsCollectorTask = Pin<Box<dyn Future<Output = ()> + Send>>;
471
472type MessageSink<Hash, BlockHash> =
474 mpsc::TracingUnboundedSender<EventMetricsMessage<Hash, BlockHash>>;
475
476type MessageReceiver<Hash, BlockHash> =
478 mpsc::TracingUnboundedReceiver<EventMetricsMessage<Hash, BlockHash>>;
479
480struct TransactionEventMetricsData {
483 ready_seen: bool,
485 broadcast_seen: bool,
487 future_seen: bool,
489 in_block_seen: bool,
491 retracted_seen: bool,
493 submit_timestamp: Instant,
497}
498
499impl TransactionEventMetricsData {
500 fn new(submit_timestamp: Instant) -> Self {
502 Self {
503 submit_timestamp,
504 future_seen: false,
505 ready_seen: false,
506 broadcast_seen: false,
507 in_block_seen: false,
508 retracted_seen: false,
509 }
510 }
511
512 fn set_true_once(flag: &mut bool) -> bool {
516 if *flag {
517 false
518 } else {
519 *flag = true;
520 true
521 }
522 }
523
524 fn update<Hash, BlockHash>(
528 &mut self,
529 status: &TransactionStatus<Hash, BlockHash>,
530 ) -> Option<Instant> {
531 let flag = match *status {
532 TransactionStatus::Ready => &mut self.ready_seen,
533 TransactionStatus::Future => &mut self.future_seen,
534 TransactionStatus::Broadcast(..) => &mut self.broadcast_seen,
535 TransactionStatus::InBlock(..) => &mut self.in_block_seen,
536 TransactionStatus::Retracted(..) => &mut self.retracted_seen,
537 _ => return Some(self.submit_timestamp),
538 };
539 Self::set_true_once(flag).then_some(self.submit_timestamp)
540 }
541}
542
543impl<ChainApi> EventsMetricsCollector<ChainApi>
544where
545 ChainApi: graph::ChainApi + 'static,
546{
547 fn handle_status(
552 hash: ExtrinsicHash<ChainApi>,
553 status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
554 timestamp: Instant,
555 submitted_timestamp_map: &mut HashMap<ExtrinsicHash<ChainApi>, TransactionEventMetricsData>,
556 metrics: &MetricsLink,
557 ) {
558 let Entry::Occupied(mut entry) = submitted_timestamp_map.entry(hash) else { return };
559 let remove = status.is_final();
560 if let Some(submit_timestamp) = entry.get_mut().update(&status) {
561 metrics.report(|metrics| {
562 metrics
563 .events_histograms
564 .observe(status, timestamp.duration_since(submit_timestamp))
565 });
566 }
567 remove.then(|| entry.remove());
568 }
569
570 async fn task(
575 mut rx: MessageReceiver<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
576 metrics: MetricsLink,
577 ) {
578 let mut submitted_timestamp_map =
579 HashMap::<ExtrinsicHash<ChainApi>, TransactionEventMetricsData>::default();
580
581 loop {
582 match rx.next().await {
583 Some(EventMetricsMessage::Submitted(timestamp, hash)) => {
584 submitted_timestamp_map
585 .insert(hash, TransactionEventMetricsData::new(timestamp));
586 },
587 Some(EventMetricsMessage::Status(timestamp, hash, status)) => {
588 Self::handle_status(
589 hash,
590 status,
591 timestamp,
592 &mut submitted_timestamp_map,
593 &metrics,
594 );
595 },
596 None => {
597 return },
599 };
600 }
601 }
602
603 pub fn new_with_worker(metrics: MetricsLink) -> (Self, EventsMetricsCollectorTask) {
607 const QUEUE_WARN_SIZE: usize = 100_000;
608 let (metrics_message_sink, rx) =
609 mpsc::tracing_unbounded("txpool-event-metrics-collector", QUEUE_WARN_SIZE);
610 let task = Self::task(rx, metrics);
611
612 (Self { metrics_message_sink: Some(metrics_message_sink) }, task.boxed())
613 }
614}