use std::collections::HashMap;
use polkadot_node_metrics::{metered::Meter, metrics};
use polkadot_overseer::prometheus;
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
#[derive(Clone)]
pub struct MetricsInner {
approval_distribution: polkadot_approval_distribution::metrics::Metrics,
approval_voting: polkadot_node_core_approval_voting::Metrics,
to_worker_bounded_tof: prometheus::HistogramVec,
to_worker_bounded_sent: prometheus::GaugeVec<prometheus::U64>,
to_worker_bounded_received: prometheus::GaugeVec<prometheus::U64>,
to_worker_bounded_blocked: prometheus::GaugeVec<prometheus::U64>,
to_worker_unbounded_tof: prometheus::HistogramVec,
to_worker_unbounded_sent: prometheus::GaugeVec<prometheus::U64>,
to_worker_unbounded_received: prometheus::GaugeVec<prometheus::U64>,
}
impl Metrics {
pub fn approval_distribution_metrics(
&self,
) -> polkadot_approval_distribution::metrics::Metrics {
self.0
.as_ref()
.map(|metrics_inner| metrics_inner.approval_distribution.clone())
.unwrap_or_default()
}
pub fn approval_voting_metrics(&self) -> polkadot_node_core_approval_voting::Metrics {
self.0
.as_ref()
.map(|metrics_inner| metrics_inner.approval_voting.clone())
.unwrap_or_default()
}
}
impl metrics::Metrics for Metrics {
fn try_register(
registry: &prometheus::Registry,
) -> std::result::Result<Self, prometheus::PrometheusError> {
Ok(Metrics(Some(MetricsInner {
approval_distribution: polkadot_approval_distribution::metrics::Metrics::try_register(
registry,
)?,
approval_voting: polkadot_node_core_approval_voting::Metrics::try_register(registry)?,
to_worker_bounded_tof: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_approval_voting_parallel_worker_bounded_tof",
"Duration spent in a particular approval voting worker channel from entrance to removal",
)
.buckets(vec![
0.0001, 0.0004, 0.0016, 0.0064, 0.0256, 0.1024, 0.4096, 1.6384, 3.2768,
4.9152, 6.5536,
]),
&["worker_name"],
)?,
registry,
)?,
to_worker_bounded_sent: prometheus::register(
prometheus::GaugeVec::<prometheus::U64>::new(
prometheus::Opts::new(
"polkadot_approval_voting_parallel_worker_bounded_sent",
"Number of elements sent to approval voting workers' bounded queues",
),
&["worker_name"],
)?,
registry,
)?,
to_worker_bounded_received: prometheus::register(
prometheus::GaugeVec::<prometheus::U64>::new(
prometheus::Opts::new(
"polkadot_approval_voting_parallel_worker_bounded_received",
"Number of elements received by approval voting workers' bounded queues",
),
&["worker_name"],
)?,
registry,
)?,
to_worker_bounded_blocked: prometheus::register(
prometheus::GaugeVec::<prometheus::U64>::new(
prometheus::Opts::new(
"polkadot_approval_voting_parallel_worker_bounded_blocked",
"Number of times approval voting workers blocked while sending messages to a subsystem",
),
&["worker_name"],
)?,
registry,
)?,
to_worker_unbounded_tof: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_approval_voting_parallel_worker_unbounded_tof",
"Duration spent in a particular approval voting worker channel from entrance to removal",
)
.buckets(vec![
0.0001, 0.0004, 0.0016, 0.0064, 0.0256, 0.1024, 0.4096, 1.6384, 3.2768,
4.9152, 6.5536,
]),
&["worker_name"],
)?,
registry,
)?,
to_worker_unbounded_sent: prometheus::register(
prometheus::GaugeVec::<prometheus::U64>::new(
prometheus::Opts::new(
"polkadot_approval_voting_parallel_worker_unbounded_sent",
"Number of elements sent to approval voting workers' unbounded queues",
),
&["worker_name"],
)?,
registry,
)?,
to_worker_unbounded_received: prometheus::register(
prometheus::GaugeVec::<prometheus::U64>::new(
prometheus::Opts::new(
"polkadot_approval_voting_parallel_worker_unbounded_received",
"Number of elements received by approval voting workers' unbounded queues",
),
&["worker_name"],
)?,
registry,
)?,
})))
}
}
#[derive(Clone)]
pub struct Meters {
bounded: Meter,
unbounded: Meter,
}
impl Meters {
pub fn new(bounded: &Meter, unbounded: &Meter) -> Self {
Self { bounded: bounded.clone(), unbounded: unbounded.clone() }
}
}
pub struct MetricsWatcher {
to_watch: HashMap<String, Meters>,
metrics: Metrics,
}
impl MetricsWatcher {
pub fn new(metrics: Metrics) -> Self {
Self { to_watch: HashMap::new(), metrics }
}
pub fn watch(&mut self, worker_name: String, meters: Meters) {
self.to_watch.insert(worker_name, meters);
}
pub fn collect_metrics(&self) {
for (name, meter) in &self.to_watch {
let bounded_readouts = meter.bounded.read();
let unbounded_readouts = meter.unbounded.read();
if let Some(metrics) = self.metrics.0.as_ref() {
metrics
.to_worker_bounded_sent
.with_label_values(&[name])
.set(bounded_readouts.sent as u64);
metrics
.to_worker_bounded_received
.with_label_values(&[name])
.set(bounded_readouts.received as u64);
metrics
.to_worker_bounded_blocked
.with_label_values(&[name])
.set(bounded_readouts.blocked as u64);
metrics
.to_worker_unbounded_sent
.with_label_values(&[name])
.set(unbounded_readouts.sent as u64);
metrics
.to_worker_unbounded_received
.with_label_values(&[name])
.set(unbounded_readouts.received as u64);
let hist_bounded = metrics.to_worker_bounded_tof.with_label_values(&[name]);
for tof in bounded_readouts.tof {
hist_bounded.observe(tof.as_f64());
}
let hist_unbounded = metrics.to_worker_unbounded_tof.with_label_values(&[name]);
for tof in unbounded_readouts.tof {
hist_unbounded.observe(tof.as_f64());
}
}
}
}
}