polkadot_node_core_approval_voting_parallel/
metrics.rs1use std::collections::HashMap;
20
21use polkadot_node_metrics::{metered::Meter, metrics};
22use polkadot_overseer::prometheus;
23
24#[derive(Default, Clone)]
25pub struct Metrics(Option<MetricsInner>);
26
27#[derive(Clone)]
29pub struct MetricsInner {
30	approval_distribution: polkadot_approval_distribution::metrics::Metrics,
32	approval_voting: polkadot_node_core_approval_voting::Metrics,
34
35	to_worker_bounded_tof: prometheus::HistogramVec,
37	to_worker_bounded_sent: prometheus::GaugeVec<prometheus::U64>,
39	to_worker_bounded_received: prometheus::GaugeVec<prometheus::U64>,
41	to_worker_bounded_blocked: prometheus::GaugeVec<prometheus::U64>,
43	to_worker_unbounded_tof: prometheus::HistogramVec,
45	to_worker_unbounded_sent: prometheus::GaugeVec<prometheus::U64>,
47	to_worker_unbounded_received: prometheus::GaugeVec<prometheus::U64>,
49}
50
51impl Metrics {
52	pub fn approval_distribution_metrics(
54		&self,
55	) -> polkadot_approval_distribution::metrics::Metrics {
56		self.0
57			.as_ref()
58			.map(|metrics_inner| metrics_inner.approval_distribution.clone())
59			.unwrap_or_default()
60	}
61
62	pub fn approval_voting_metrics(&self) -> polkadot_node_core_approval_voting::Metrics {
64		self.0
65			.as_ref()
66			.map(|metrics_inner| metrics_inner.approval_voting.clone())
67			.unwrap_or_default()
68	}
69}
70
71impl metrics::Metrics for Metrics {
72	fn try_register(
74		registry: &prometheus::Registry,
75	) -> std::result::Result<Self, prometheus::PrometheusError> {
76		Ok(Metrics(Some(MetricsInner {
77			approval_distribution: polkadot_approval_distribution::metrics::Metrics::try_register(
78				registry,
79			)?,
80			approval_voting: polkadot_node_core_approval_voting::Metrics::try_register(registry)?,
81			to_worker_bounded_tof: prometheus::register(
82				prometheus::HistogramVec::new(
83					prometheus::HistogramOpts::new(
84						"polkadot_approval_voting_parallel_worker_bounded_tof",
85						"Duration spent in a particular approval voting worker channel from entrance to removal",
86					)
87					.buckets(vec![
88						0.0001, 0.0004, 0.0016, 0.0064, 0.0256, 0.1024, 0.4096, 1.6384, 3.2768,
89						4.9152, 6.5536,
90					]),
91					&["worker_name"],
92				)?,
93				registry,
94			)?,
95			to_worker_bounded_sent: prometheus::register(
96				prometheus::GaugeVec::<prometheus::U64>::new(
97					prometheus::Opts::new(
98						"polkadot_approval_voting_parallel_worker_bounded_sent",
99						"Number of elements sent to approval voting workers' bounded queues",
100					),
101					&["worker_name"],
102				)?,
103				registry,
104			)?,
105			to_worker_bounded_received: prometheus::register(
106				prometheus::GaugeVec::<prometheus::U64>::new(
107					prometheus::Opts::new(
108						"polkadot_approval_voting_parallel_worker_bounded_received",
109						"Number of elements received by approval voting workers' bounded queues",
110					),
111					&["worker_name"],
112				)?,
113				registry,
114			)?,
115			to_worker_bounded_blocked: prometheus::register(
116				prometheus::GaugeVec::<prometheus::U64>::new(
117					prometheus::Opts::new(
118						"polkadot_approval_voting_parallel_worker_bounded_blocked",
119						"Number of times approval voting workers blocked while sending messages to a subsystem",
120					),
121					&["worker_name"],
122				)?,
123				registry,
124			)?,
125			to_worker_unbounded_tof: prometheus::register(
126				prometheus::HistogramVec::new(
127					prometheus::HistogramOpts::new(
128						"polkadot_approval_voting_parallel_worker_unbounded_tof",
129						"Duration spent in a particular approval voting worker channel from entrance to removal",
130					)
131					.buckets(vec![
132						0.0001, 0.0004, 0.0016, 0.0064, 0.0256, 0.1024, 0.4096, 1.6384, 3.2768,
133						4.9152, 6.5536,
134					]),
135					&["worker_name"],
136				)?,
137				registry,
138			)?,
139			to_worker_unbounded_sent: prometheus::register(
140				prometheus::GaugeVec::<prometheus::U64>::new(
141					prometheus::Opts::new(
142						"polkadot_approval_voting_parallel_worker_unbounded_sent",
143						"Number of elements sent to approval voting workers' unbounded queues",
144					),
145					&["worker_name"],
146				)?,
147				registry,
148			)?,
149			to_worker_unbounded_received: prometheus::register(
150				prometheus::GaugeVec::<prometheus::U64>::new(
151					prometheus::Opts::new(
152						"polkadot_approval_voting_parallel_worker_unbounded_received",
153						"Number of elements received by approval voting workers' unbounded queues",
154					),
155					&["worker_name"],
156				)?,
157				registry,
158			)?,
159		})))
160	}
161}
162
163#[derive(Clone)]
165pub struct Meters {
166	bounded: Meter,
167	unbounded: Meter,
168}
169
170impl Meters {
171	pub fn new(bounded: &Meter, unbounded: &Meter) -> Self {
172		Self { bounded: bounded.clone(), unbounded: unbounded.clone() }
173	}
174}
175
176pub struct MetricsWatcher {
178	to_watch: HashMap<String, Meters>,
179	metrics: Metrics,
180}
181
182impl MetricsWatcher {
183	pub fn new(metrics: Metrics) -> Self {
185		Self { to_watch: HashMap::new(), metrics }
186	}
187
188	pub fn watch(&mut self, worker_name: String, meters: Meters) {
190		self.to_watch.insert(worker_name, meters);
191	}
192
193	pub fn collect_metrics(&self) {
195		for (name, meter) in &self.to_watch {
196			let bounded_readouts = meter.bounded.read();
197			let unbounded_readouts = meter.unbounded.read();
198			if let Some(metrics) = self.metrics.0.as_ref() {
199				metrics
200					.to_worker_bounded_sent
201					.with_label_values(&[name])
202					.set(bounded_readouts.sent as u64);
203
204				metrics
205					.to_worker_bounded_received
206					.with_label_values(&[name])
207					.set(bounded_readouts.received as u64);
208
209				metrics
210					.to_worker_bounded_blocked
211					.with_label_values(&[name])
212					.set(bounded_readouts.blocked as u64);
213
214				metrics
215					.to_worker_unbounded_sent
216					.with_label_values(&[name])
217					.set(unbounded_readouts.sent as u64);
218
219				metrics
220					.to_worker_unbounded_received
221					.with_label_values(&[name])
222					.set(unbounded_readouts.received as u64);
223
224				let hist_bounded = metrics.to_worker_bounded_tof.with_label_values(&[name]);
225				for tof in bounded_readouts.tof {
226					hist_bounded.observe(tof.as_f64());
227				}
228
229				let hist_unbounded = metrics.to_worker_unbounded_tof.with_label_values(&[name]);
230				for tof in unbounded_readouts.tof {
231					hist_unbounded.observe(tof.as_f64());
232				}
233			}
234		}
235	}
236}