use super::tx_mem_pool::InsertionInfo;
use crate::{
common::metrics::{GenericMetricsLink, MetricsRegistrant},
graph::{self, BlockHash, ExtrinsicHash},
LOG_TARGET,
};
use futures::{FutureExt, StreamExt};
use prometheus_endpoint::{
exponential_buckets, histogram_opts, linear_buckets, register, Counter, Gauge, Histogram,
PrometheusError, Registry, U64,
};
#[cfg(doc)]
use sc_transaction_pool_api::TransactionPool;
use sc_transaction_pool_api::TransactionStatus;
use sc_utils::mpsc;
use std::{
collections::{hash_map::Entry, HashMap},
future::Future,
pin::Pin,
time::{Duration, Instant},
};
use tracing::trace;
pub type MetricsLink = GenericMetricsLink<Metrics>;
pub struct Metrics {
pub submitted_transactions: Counter<U64>,
pub active_views: Gauge<U64>,
pub inactive_views: Gauge<U64>,
pub watched_txs: Gauge<U64>,
pub unwatched_txs: Gauge<U64>,
pub reported_invalid_txs: Counter<U64>,
pub removed_invalid_txs: Counter<U64>,
pub unknown_from_block_import_txs: Counter<U64>,
pub finalized_txs: Counter<U64>,
pub maintain_duration: Histogram,
pub resubmitted_retracted_txs: Counter<U64>,
pub submitted_from_mempool_txs: Counter<U64>,
pub mempool_revalidation_invalid_txs: Counter<U64>,
pub view_revalidation_invalid_txs: Counter<U64>,
pub view_revalidation_resubmitted_txs: Counter<U64>,
pub view_revalidation_duration: Histogram,
pub non_cloned_views: Counter<U64>,
pub events_histograms: EventsHistograms,
}
pub struct EventsHistograms {
pub future: Histogram,
pub ready: Histogram,
pub broadcast: Histogram,
pub in_block: Histogram,
pub retracted: Histogram,
pub finality_timeout: Histogram,
pub finalized: Histogram,
pub usurped: Histogram,
pub dropped: Histogram,
pub invalid: Histogram,
}
impl EventsHistograms {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
future: register(
Histogram::with_opts(histogram_opts!(
"substrate_sub_txpool_timing_event_future",
"Histogram of timings for reporting Future event",
exponential_buckets(0.01, 2.0, 16).unwrap()
))?,
registry,
)?,
ready: register(
Histogram::with_opts(histogram_opts!(
"substrate_sub_txpool_timing_event_ready",
"Histogram of timings for reporting Ready event",
exponential_buckets(0.01, 2.0, 16).unwrap()
))?,
registry,
)?,
broadcast: register(
Histogram::with_opts(histogram_opts!(
"substrate_sub_txpool_timing_event_broadcast",
"Histogram of timings for reporting Broadcast event",
linear_buckets(0.01, 0.25, 16).unwrap()
))?,
registry,
)?,
in_block: register(
Histogram::with_opts(histogram_opts!(
"substrate_sub_txpool_timing_event_in_block",
"Histogram of timings for reporting InBlock event",
linear_buckets(0.0, 3.0, 20).unwrap()
))?,
registry,
)?,
retracted: register(
Histogram::with_opts(histogram_opts!(
"substrate_sub_txpool_timing_event_retracted",
"Histogram of timings for reporting Retracted event",
linear_buckets(0.0, 3.0, 20).unwrap()
))?,
registry,
)?,
finality_timeout: register(
Histogram::with_opts(histogram_opts!(
"substrate_sub_txpool_timing_event_finality_timeout",
"Histogram of timings for reporting FinalityTimeout event",
linear_buckets(0.0, 40.0, 20).unwrap()
))?,
registry,
)?,
finalized: register(
Histogram::with_opts(histogram_opts!(
"substrate_sub_txpool_timing_event_finalized",
"Histogram of timings for reporting Finalized event",
linear_buckets(0.0, 40.0, 20).unwrap()
))?,
registry,
)?,
usurped: register(
Histogram::with_opts(histogram_opts!(
"substrate_sub_txpool_timing_event_usurped",
"Histogram of timings for reporting Usurped event",
linear_buckets(0.0, 3.0, 20).unwrap()
))?,
registry,
)?,
dropped: register(
Histogram::with_opts(histogram_opts!(
"substrate_sub_txpool_timing_event_dropped",
"Histogram of timings for reporting Dropped event",
linear_buckets(0.0, 3.0, 20).unwrap()
))?,
registry,
)?,
invalid: register(
Histogram::with_opts(histogram_opts!(
"substrate_sub_txpool_timing_event_invalid",
"Histogram of timings for reporting Invalid event",
linear_buckets(0.0, 3.0, 20).unwrap()
))?,
registry,
)?,
})
}
pub fn observe<Hash, BlockHash>(
&self,
status: TransactionStatus<Hash, BlockHash>,
duration: Duration,
) {
let duration = duration.as_secs_f64();
let histogram = match status {
TransactionStatus::Future => &self.future,
TransactionStatus::Ready => &self.ready,
TransactionStatus::Broadcast(..) => &self.broadcast,
TransactionStatus::InBlock(..) => &self.in_block,
TransactionStatus::Retracted(..) => &self.retracted,
TransactionStatus::FinalityTimeout(..) => &self.finality_timeout,
TransactionStatus::Finalized(..) => &self.finalized,
TransactionStatus::Usurped(..) => &self.usurped,
TransactionStatus::Dropped => &self.dropped,
TransactionStatus::Invalid => &self.invalid,
};
histogram.observe(duration);
}
}
impl MetricsRegistrant for Metrics {
fn register(registry: &Registry) -> Result<Box<Self>, PrometheusError> {
Ok(Box::from(Self {
submitted_transactions: register(
Counter::new(
"substrate_sub_txpool_submitted_txs_total",
"Total number of transactions submitted",
)?,
registry,
)?,
active_views: register(
Gauge::new(
"substrate_sub_txpool_active_views",
"Total number of currently maintained views.",
)?,
registry,
)?,
inactive_views: register(
Gauge::new(
"substrate_sub_txpool_inactive_views",
"Total number of current inactive views.",
)?,
registry,
)?,
watched_txs: register(
Gauge::new(
"substrate_sub_txpool_watched_txs",
"Total number of watched transactions in txpool.",
)?,
registry,
)?,
unwatched_txs: register(
Gauge::new(
"substrate_sub_txpool_unwatched_txs",
"Total number of unwatched transactions in txpool.",
)?,
registry,
)?,
reported_invalid_txs: register(
Counter::new(
"substrate_sub_txpool_reported_invalid_txs_total",
"Total number of transactions reported as invalid by external entities using TxPool API.",
)?,
registry,
)?,
removed_invalid_txs: register(
Counter::new(
"substrate_sub_txpool_removed_invalid_txs_total",
"Total number of transactions removed as invalid.",
)?,
registry,
)?,
unknown_from_block_import_txs: register(
Counter::new(
"substrate_sub_txpool_unknown_from_block_import_txs_total",
"Total number of transactions from imported blocks that are unknown to the pool.",
)?,
registry,
)?,
finalized_txs: register(
Counter::new(
"substrate_sub_txpool_finalized_txs_total",
"Total number of finalized transactions.",
)?,
registry,
)?,
maintain_duration: register(
Histogram::with_opts(histogram_opts!(
"substrate_sub_txpool_maintain_duration_seconds",
"Histogram of maintain durations.",
linear_buckets(0.0, 0.25, 13).unwrap()
))?,
registry,
)?,
resubmitted_retracted_txs: register(
Counter::new(
"substrate_sub_txpool_resubmitted_retracted_txs_total",
"Total number of transactions resubmitted from retracted forks.",
)?,
registry,
)?,
submitted_from_mempool_txs: register(
Counter::new(
"substrate_sub_txpool_submitted_from_mempool_txs_total",
"Total number of transactions submitted from mempool to views.",
)?,
registry,
)?,
mempool_revalidation_invalid_txs: register(
Counter::new(
"substrate_sub_txpool_mempool_revalidation_invalid_txs_total",
"Total number of transactions found as invalid during mempool revalidation.",
)?,
registry,
)?,
view_revalidation_invalid_txs: register(
Counter::new(
"substrate_sub_txpool_view_revalidation_invalid_txs_total",
"Total number of transactions found as invalid during view revalidation.",
)?,
registry,
)?,
view_revalidation_resubmitted_txs: register(
Counter::new(
"substrate_sub_txpool_view_revalidation_resubmitted_txs_total",
"Total number of valid transactions processed during view revalidation.",
)?,
registry,
)?,
view_revalidation_duration: register(
Histogram::with_opts(histogram_opts!(
"substrate_sub_txpool_view_revalidation_duration_seconds",
"Histogram of view revalidation durations.",
linear_buckets(0.0, 0.25, 13).unwrap()
))?,
registry,
)?,
non_cloned_views: register(
Counter::new(
"substrate_sub_txpool_non_cloned_views_total",
"Total number of the views created w/o cloning existing view.",
)?,
registry,
)?,
events_histograms: EventsHistograms::register(registry)?,
}))
}
}
enum EventMetricsMessage<Hash, BlockHash> {
Submitted(Instant, Hash),
Status(Instant, Hash, TransactionStatus<Hash, BlockHash>),
}
pub struct EventsMetricsCollector<ChainApi: graph::ChainApi> {
metrics_message_sink: Option<MessageSink<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>>,
}
impl<ChainApi: graph::ChainApi> Default for EventsMetricsCollector<ChainApi> {
fn default() -> Self {
Self { metrics_message_sink: None }
}
}
impl<ChainApi: graph::ChainApi> Clone for EventsMetricsCollector<ChainApi> {
fn clone(&self) -> Self {
Self { metrics_message_sink: self.metrics_message_sink.clone() }
}
}
impl<ChainApi: graph::ChainApi> EventsMetricsCollector<ChainApi> {
pub fn report_status(
&self,
tx_hash: ExtrinsicHash<ChainApi>,
status: TransactionStatus<BlockHash<ChainApi>, ExtrinsicHash<ChainApi>>,
) {
self.metrics_message_sink.as_ref().map(|sink| {
if let Err(error) =
sink.unbounded_send(EventMetricsMessage::Status(Instant::now(), tx_hash, status))
{
trace!(target: LOG_TARGET, %error, "tx status metrics message send failed")
}
});
}
pub fn report_submitted(&self, insertion_info: &InsertionInfo<ExtrinsicHash<ChainApi>>) {
self.metrics_message_sink.as_ref().map(|sink| {
if let Err(error) = sink.unbounded_send(EventMetricsMessage::Submitted(
insertion_info
.source
.timestamp
.expect("timestamp is set in fork-aware pool. qed"),
insertion_info.hash,
)) {
trace!(target: LOG_TARGET, %error, "tx status metrics message send failed")
}
});
}
}
pub type EventsMetricsCollectorTask = Pin<Box<dyn Future<Output = ()> + Send>>;
type MessageSink<Hash, BlockHash> =
mpsc::TracingUnboundedSender<EventMetricsMessage<Hash, BlockHash>>;
type MessageReceiver<Hash, BlockHash> =
mpsc::TracingUnboundedReceiver<EventMetricsMessage<Hash, BlockHash>>;
struct TransactionEventMetricsData {
ready_seen: bool,
broadcast_seen: bool,
future_seen: bool,
in_block_seen: bool,
retracted_seen: bool,
submit_timestamp: Instant,
}
impl TransactionEventMetricsData {
fn new(submit_timestamp: Instant) -> Self {
Self {
submit_timestamp,
future_seen: false,
ready_seen: false,
broadcast_seen: false,
in_block_seen: false,
retracted_seen: false,
}
}
fn set_true_once(flag: &mut bool) -> bool {
if *flag {
false
} else {
*flag = true;
true
}
}
fn update<Hash, BlockHash>(
&mut self,
status: &TransactionStatus<Hash, BlockHash>,
) -> Option<Instant> {
let flag = match *status {
TransactionStatus::Ready => &mut self.ready_seen,
TransactionStatus::Future => &mut self.future_seen,
TransactionStatus::Broadcast(..) => &mut self.broadcast_seen,
TransactionStatus::InBlock(..) => &mut self.in_block_seen,
TransactionStatus::Retracted(..) => &mut self.retracted_seen,
_ => return Some(self.submit_timestamp),
};
Self::set_true_once(flag).then_some(self.submit_timestamp)
}
}
impl<ChainApi> EventsMetricsCollector<ChainApi>
where
ChainApi: graph::ChainApi + 'static,
{
fn handle_status(
hash: ExtrinsicHash<ChainApi>,
status: TransactionStatus<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
timestamp: Instant,
submitted_timestamp_map: &mut HashMap<ExtrinsicHash<ChainApi>, TransactionEventMetricsData>,
metrics: &MetricsLink,
) {
let Entry::Occupied(mut entry) = submitted_timestamp_map.entry(hash) else { return };
let remove = status.is_final();
if let Some(submit_timestamp) = entry.get_mut().update(&status) {
metrics.report(|metrics| {
metrics
.events_histograms
.observe(status, timestamp.duration_since(submit_timestamp))
});
}
remove.then(|| entry.remove());
}
async fn task(
mut rx: MessageReceiver<ExtrinsicHash<ChainApi>, BlockHash<ChainApi>>,
metrics: MetricsLink,
) {
let mut submitted_timestamp_map =
HashMap::<ExtrinsicHash<ChainApi>, TransactionEventMetricsData>::default();
loop {
match rx.next().await {
Some(EventMetricsMessage::Submitted(timestamp, hash)) => {
submitted_timestamp_map
.insert(hash, TransactionEventMetricsData::new(timestamp));
},
Some(EventMetricsMessage::Status(timestamp, hash, status)) => {
Self::handle_status(
hash,
status,
timestamp,
&mut submitted_timestamp_map,
&metrics,
);
},
None => {
return },
};
}
}
pub fn new_with_worker(metrics: MetricsLink) -> (Self, EventsMetricsCollectorTask) {
const QUEUE_WARN_SIZE: usize = 100_000;
let (metrics_message_sink, rx) =
mpsc::tracing_unbounded("txpool-event-metrics-collector", QUEUE_WARN_SIZE);
let task = Self::task(rx, metrics);
(Self { metrics_message_sink: Some(metrics_message_sink) }, task.boxed())
}
}