1use std::{
18 collections::{HashMap, HashSet},
19 time::{Duration, Instant},
20};
21
22use polkadot_node_subsystem::prometheus::prometheus::HistogramTimer;
23use polkadot_node_subsystem_util::metrics::{self, prometheus};
24use polkadot_primitives::{BlockNumber, CandidateReceiptV2 as CandidateReceipt, Hash};
25use sp_core::H256;
26
27use super::collation::CollationStatus;
28
29#[derive(Clone, Default)]
30pub struct Metrics(Option<MetricsInner>);
31
32impl Metrics {
33 pub fn on_collation_backed(&self, latency: f64) {
35 if let Some(metrics) = &self.0 {
36 metrics.collation_backing_latency.observe(latency);
37 }
38 }
39
40 pub fn on_collation_included(&self, latency: f64) {
42 if let Some(metrics) = &self.0 {
43 metrics.collation_inclusion_latency.observe(latency);
44 }
45 }
46
47 pub fn on_advertisement_made(&self) {
48 if let Some(metrics) = &self.0 {
49 metrics.advertisements_made.inc();
50 }
51 }
52
53 pub fn on_collation_sent_requested(&self) {
54 if let Some(metrics) = &self.0 {
55 metrics.collations_send_requested.inc();
56 }
57 }
58
59 pub fn on_collation_sent(&self) {
60 if let Some(metrics) = &self.0 {
61 metrics.collations_sent.inc();
62 }
63 }
64
65 pub fn time_process_msg(&self) -> Option<prometheus::prometheus::HistogramTimer> {
67 self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
68 }
69
70 pub fn time_collation_distribution(
72 &self,
73 label: &'static str,
74 ) -> Option<prometheus::prometheus::HistogramTimer> {
75 self.0.as_ref().map(|metrics| {
76 metrics.collation_distribution_time.with_label_values(&[label]).start_timer()
77 })
78 }
79
80 pub fn time_collation_fetch_latency(&self) -> Option<prometheus::prometheus::HistogramTimer> {
82 self.0.as_ref().map(|metrics| metrics.collation_fetch_latency.start_timer())
83 }
84
85 pub fn time_collation_backing_latency(&self) -> Option<prometheus::prometheus::HistogramTimer> {
87 self.0
88 .as_ref()
89 .map(|metrics| metrics.collation_backing_latency_time.start_timer())
90 }
91
92 pub fn on_collation_expired(&self, latency: f64, state: &'static str) {
95 if let Some(metrics) = &self.0 {
96 metrics.collation_expired_total.with_label_values(&[state]).observe(latency);
97 }
98 }
99}
100
101#[derive(Clone)]
102struct MetricsInner {
103 advertisements_made: prometheus::Counter<prometheus::U64>,
104 collations_sent: prometheus::Counter<prometheus::U64>,
105 collations_send_requested: prometheus::Counter<prometheus::U64>,
106 process_msg: prometheus::Histogram,
107 collation_distribution_time: prometheus::HistogramVec,
108 collation_fetch_latency: prometheus::Histogram,
109 collation_backing_latency_time: prometheus::Histogram,
110 collation_backing_latency: prometheus::Histogram,
111 collation_inclusion_latency: prometheus::Histogram,
112 collation_expired_total: prometheus::HistogramVec,
113}
114
115impl metrics::Metrics for Metrics {
116 fn try_register(
117 registry: &prometheus::Registry,
118 ) -> std::result::Result<Self, prometheus::PrometheusError> {
119 let metrics = MetricsInner {
120 advertisements_made: prometheus::register(
121 prometheus::Counter::new(
122 "polkadot_parachain_collation_advertisements_made_total",
123 "A number of collation advertisements sent to validators.",
124 )?,
125 registry,
126 )?,
127 collations_send_requested: prometheus::register(
128 prometheus::Counter::new(
129 "polkadot_parachain_collations_sent_requested_total",
130 "A number of collations requested to be sent to validators.",
131 )?,
132 registry,
133 )?,
134 collations_sent: prometheus::register(
135 prometheus::Counter::new(
136 "polkadot_parachain_collations_sent_total",
137 "A number of collations sent to validators.",
138 )?,
139 registry,
140 )?,
141 process_msg: prometheus::register(
142 prometheus::Histogram::with_opts(
143 prometheus::HistogramOpts::new(
144 "polkadot_parachain_collator_protocol_collator_process_msg",
145 "Time spent within `collator_protocol_collator::process_msg`",
146 )
147 .buckets(vec![
148 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
149 1.0,
150 ]),
151 )?,
152 registry,
153 )?,
154 collation_distribution_time: prometheus::register(
155 prometheus::HistogramVec::new(
156 prometheus::HistogramOpts::new(
157 "polkadot_parachain_collator_protocol_collator_distribution_time",
158 "Time spent within `collator_protocol_collator::distribute_collation`",
159 )
160 .buckets(vec![
161 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75,
162 1.0,
163 ]),
164 &["state"],
165 )?,
166 registry,
167 )?,
168 collation_fetch_latency: prometheus::register(
169 prometheus::Histogram::with_opts(
170 prometheus::HistogramOpts::new(
171 "polkadot_parachain_collation_fetch_latency",
172 "How much time collations spend waiting to be fetched",
173 )
174 .buckets(vec![
175 0.001, 0.01, 0.025, 0.05, 0.1, 0.15, 0.25, 0.35, 0.5, 0.75, 1.0, 2.0, 5.0,
176 ]),
177 )?,
178 registry,
179 )?,
180 collation_backing_latency_time: prometheus::register(
181 prometheus::Histogram::with_opts(
182 prometheus::HistogramOpts::new(
183 "polkadot_parachain_collation_backing_latency_time",
184 "How much time it takes for a fetched collation to be backed",
185 )
186 .buckets(vec![
187 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 12.0, 15.0, 18.0, 24.0, 30.0,
188 ]),
189 )?,
190 registry,
191 )?,
192 collation_backing_latency: prometheus::register(
193 prometheus::Histogram::with_opts(
194 prometheus::HistogramOpts::new(
195 "polkadot_parachain_collation_backing_latency",
196 "How many blocks away from the relay parent are collations backed",
197 )
198 .buckets(vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]),
199 )?,
200 registry,
201 )?,
202 collation_inclusion_latency: prometheus::register(
203 prometheus::Histogram::with_opts(
204 prometheus::HistogramOpts::new(
205 "polkadot_parachain_collation_inclusion_latency",
206 "How many blocks it takes for a backed collation to be included",
207 )
208 .buckets(vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]),
209 )?,
210 registry,
211 )?,
212 collation_expired_total: prometheus::register(
213 prometheus::HistogramVec::new(
214 prometheus::HistogramOpts::new(
215 "polkadot_parachain_collation_expired",
216 "How many collations expired (not backed or not included)",
217 )
218 .buckets(vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]),
219 &["state"],
220 )?,
221 registry,
222 )?,
223 };
224
225 Ok(Metrics(Some(metrics)))
226 }
227}
228
229pub(crate) const MAX_BACKING_DELAY: BlockNumber = 3;
231pub(crate) const MAX_AVAILABILITY_DELAY: BlockNumber = 10;
233
234#[derive(Default)]
236pub(crate) struct CollationTracker {
237 expire: HashMap<BlockNumber, HashSet<Hash>>,
239 entries: HashMap<Hash, CollationStats>,
241}
242
243impl CollationTracker {
244 pub fn collation_backed(
251 &mut self,
252 block_number: BlockNumber,
253 leaf: H256,
254 receipt: CandidateReceipt,
255 metrics: &Metrics,
256 ) -> Option<CollationStats> {
257 let head = receipt.descriptor.para_head();
258
259 self.entries.remove(&head).map(|mut entry| {
260 let para_id = receipt.descriptor.para_id();
261 let relay_parent = receipt.descriptor.relay_parent();
262
263 entry.backed_at = Some(block_number);
264
265 let maybe_latency =
267 entry.backed_latency_metric.take().map(|metric| metric.stop_and_record());
268
269 gum::debug!(
270 target: crate::LOG_TARGET_STATS,
271 latency_blocks = ?entry.backed(),
272 latency_time = ?maybe_latency,
273 relay_block = ?leaf,
274 ?relay_parent,
275 ?para_id,
276 ?head,
277 "A fetched collation was backed on relay chain",
278 );
279
280 metrics.on_collation_backed(
281 (block_number.saturating_sub(entry.relay_parent_number)) as f64,
282 );
283
284 entry
285 })
286 }
287
288 pub fn collation_included(
294 &mut self,
295 block_number: BlockNumber,
296 leaf: H256,
297 receipt: CandidateReceipt,
298 metrics: &Metrics,
299 ) -> Option<CollationStats> {
300 let head = receipt.descriptor.para_head();
301
302 self.entries.remove(&head).map(|mut entry| {
303 entry.included_at = Some(block_number);
304
305 if let Some(latency) = entry.included() {
306 metrics.on_collation_included(latency as f64);
307
308 let para_id = receipt.descriptor.para_id();
309 let relay_parent = receipt.descriptor.relay_parent();
310
311 gum::debug!(
312 target: crate::LOG_TARGET_STATS,
313 ?latency,
314 relay_block = ?leaf,
315 ?relay_parent,
316 ?para_id,
317 head = ?receipt.descriptor.para_head(),
318 "Collation included on relay chain",
319 );
320 }
321
322 entry
323 })
324 }
325
326 pub fn drain_expired(&mut self, block_number: BlockNumber) -> Vec<CollationStats> {
328 let Some(expired) = self.expire.remove(&block_number) else {
329 return Vec::new()
331 };
332
333 expired
334 .iter()
335 .filter_map(|head| self.entries.remove(head))
336 .map(|mut entry| {
337 entry.expired_at = Some(block_number);
338 entry
339 })
340 .collect::<Vec<_>>()
341 }
342
343 pub fn track(&mut self, mut stats: CollationStats) {
347 let ttl = if stats.fetch_latency().is_none() {
349 if let Some(fetch_latency_metric) = stats.fetch_latency_metric.take() {
351 fetch_latency_metric.stop_and_discard();
352 }
353 0
355 } else if stats.backed().is_none() {
356 MAX_BACKING_DELAY
357 } else if stats.included().is_none() {
358 stats.backed().unwrap_or_default() + MAX_AVAILABILITY_DELAY
360 } else {
361 return
363 };
364
365 self.expire
366 .entry(stats.relay_parent_number + ttl)
367 .and_modify(|heads| {
368 heads.insert(stats.head);
369 })
370 .or_insert_with(|| HashSet::from_iter(vec![stats.head].into_iter()));
371 self.entries.insert(stats.head, stats);
372 }
373}
374
375pub(crate) struct CollationStats {
377 pre_backing_status: CollationStatus,
379 head: Hash,
381 relay_parent_number: BlockNumber,
383 expired_at: Option<BlockNumber>,
385 backed_at: Option<BlockNumber>,
387 included_at: Option<BlockNumber>,
389 fetched_at: Option<Instant>,
391 advertised_at: Instant,
393 fetch_latency_metric: Option<HistogramTimer>,
395 backed_latency_metric: Option<HistogramTimer>,
398}
399
400impl CollationStats {
401 pub fn new(head: Hash, relay_parent_number: BlockNumber, metrics: &Metrics) -> Self {
403 Self {
404 pre_backing_status: CollationStatus::Created,
405 head,
406 relay_parent_number,
407 advertised_at: std::time::Instant::now(),
408 backed_at: None,
409 expired_at: None,
410 fetched_at: None,
411 included_at: None,
412 fetch_latency_metric: metrics.time_collation_fetch_latency(),
413 backed_latency_metric: None,
414 }
415 }
416
417 pub fn expired(&self) -> Option<BlockNumber> {
419 let expired_at = self.expired_at?;
420 Some(expired_at.saturating_sub(self.relay_parent_number))
421 }
422
423 pub fn backed(&self) -> Option<BlockNumber> {
425 let backed_at = self.backed_at?;
426 Some(backed_at.saturating_sub(self.relay_parent_number))
427 }
428
429 pub fn included(&self) -> Option<BlockNumber> {
431 let included_at = self.included_at?;
432 let backed_at = self.backed_at?;
433 Some(included_at.saturating_sub(backed_at))
434 }
435
436 pub fn fetch_latency(&self) -> Option<Duration> {
438 let fetched_at = self.fetched_at?;
439 Some(fetched_at - self.advertised_at)
440 }
441
442 pub fn head(&self) -> H256 {
444 self.head
445 }
446
447 pub fn set_fetched_at(&mut self, fetched_at: Instant) {
449 self.fetched_at = Some(fetched_at);
450 }
451
452 pub fn set_pre_backing_status(&mut self, status: CollationStatus) {
454 self.pre_backing_status = status;
455 }
456
457 pub fn pre_backing_status(&self) -> &CollationStatus {
459 &self.pre_backing_status
460 }
461
462 pub fn take_fetch_latency_metric(&mut self) -> Option<HistogramTimer> {
464 self.fetch_latency_metric.take()
465 }
466
467 pub fn set_backed_latency_metric(&mut self, timer: Option<HistogramTimer>) {
469 self.backed_latency_metric = timer;
470 }
471}
472
473impl Drop for CollationStats {
474 fn drop(&mut self) {
475 if let Some(fetch_latency_metric) = self.fetch_latency_metric.take() {
476 fetch_latency_metric.stop_and_discard();
481 }
482 if let Some(backed_latency_metric) = self.backed_latency_metric.take() {
484 backed_latency_metric.stop_and_discard();
485 }
486 }
487}