referrerpolicy=no-referrer-when-downgrade

cumulus_client_service/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! Cumulus service
19//!
20//! Provides functions for starting a collator node or a normal full node.
21
22use cumulus_client_cli::CollatorOptions;
23use cumulus_client_consensus_common::ParachainConsensus;
24use cumulus_client_network::{AssumeSybilResistance, RequireSecondedInBlockAnnounce};
25use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelayRange, RecoveryHandle};
26use cumulus_primitives_core::{CollectCollationInfo, ParaId};
27pub use cumulus_primitives_proof_size_hostfunction::storage_proof_size;
28use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
29use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
30use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
31use futures::{channel::mpsc, StreamExt};
32use polkadot_primitives::{CandidateEvent, CollatorPair, OccupiedCoreAssumption};
33use prometheus::{Histogram, HistogramOpts, Registry};
34use sc_client_api::{
35	Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, ProofProvider, UsageProvider,
36};
37use sc_consensus::{
38	import_queue::{ImportQueue, ImportQueueService},
39	BlockImport,
40};
41use sc_network::{
42	config::SyncMode, request_responses::IncomingRequest, service::traits::NetworkService,
43	NetworkBackend,
44};
45use sc_network_sync::SyncingService;
46use sc_network_transactions::TransactionsHandlerController;
47use sc_service::{Configuration, SpawnTaskHandle, TaskManager, WarpSyncConfig};
48use sc_telemetry::{log, TelemetryWorkerHandle};
49use sc_utils::mpsc::TracingUnboundedSender;
50use sp_api::ProvideRuntimeApi;
51use sp_blockchain::{HeaderBackend, HeaderMetadata};
52use sp_core::{traits::SpawnNamed, Decode};
53use sp_runtime::{
54	traits::{Block as BlockT, BlockIdTo, Header},
55	SaturatedConversion, Saturating,
56};
57use std::{
58	sync::Arc,
59	time::{Duration, Instant},
60};
61
62/// Host functions that should be used in parachain nodes.
63///
64/// Contains the standard substrate host functions, as well as a
65/// host function to enable PoV-reclaim on parachain nodes.
66pub type ParachainHostFunctions = (
67	cumulus_primitives_proof_size_hostfunction::storage_proof_size::HostFunctions,
68	sp_io::SubstrateHostFunctions,
69);
70
71// Given the sporadic nature of the explicit recovery operation and the
72// possibility to retry infinite times this value is more than enough.
73// In practice here we expect no more than one queued messages.
74const RECOVERY_CHAN_SIZE: usize = 8;
75const LOG_TARGET_SYNC: &str = "sync::cumulus";
76
77/// A hint about how long the node should wait before attempting to recover missing block data
78/// from the data availability layer.
79pub enum DARecoveryProfile {
80	/// Collators use an aggressive recovery profile by default.
81	Collator,
82	/// Full nodes use a passive recovery profile by default, as they are not direct
83	/// victims of withholding attacks.
84	FullNode,
85	/// Provide an explicit recovery profile.
86	Other(RecoveryDelayRange),
87}
88
89pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawner> {
90	pub block_status: Arc<BS>,
91	pub client: Arc<Client>,
92	pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
93	pub spawner: Spawner,
94	pub para_id: ParaId,
95	pub relay_chain_interface: RCInterface,
96	pub task_manager: &'a mut TaskManager,
97	pub parachain_consensus: Box<dyn ParachainConsensus<Block>>,
98	pub import_queue: Box<dyn ImportQueueService<Block>>,
99	pub collator_key: CollatorPair,
100	pub relay_chain_slot_duration: Duration,
101	pub recovery_handle: Box<dyn RecoveryHandle>,
102	pub sync_service: Arc<SyncingService<Block>>,
103	pub prometheus_registry: Option<&'a Registry>,
104}
105
106/// Parameters given to [`start_relay_chain_tasks`].
107pub struct StartRelayChainTasksParams<'a, Block: BlockT, Client, RCInterface> {
108	pub client: Arc<Client>,
109	pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
110	pub para_id: ParaId,
111	pub relay_chain_interface: RCInterface,
112	pub task_manager: &'a mut TaskManager,
113	pub da_recovery_profile: DARecoveryProfile,
114	pub import_queue: Box<dyn ImportQueueService<Block>>,
115	pub relay_chain_slot_duration: Duration,
116	pub recovery_handle: Box<dyn RecoveryHandle>,
117	pub sync_service: Arc<SyncingService<Block>>,
118	pub prometheus_registry: Option<&'a Registry>,
119}
120
121/// Parameters given to [`start_full_node`].
122pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface> {
123	pub para_id: ParaId,
124	pub client: Arc<Client>,
125	pub relay_chain_interface: RCInterface,
126	pub task_manager: &'a mut TaskManager,
127	pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
128	pub relay_chain_slot_duration: Duration,
129	pub import_queue: Box<dyn ImportQueueService<Block>>,
130	pub recovery_handle: Box<dyn RecoveryHandle>,
131	pub sync_service: Arc<SyncingService<Block>>,
132	pub prometheus_registry: Option<&'a Registry>,
133}
134
135/// Start a collator node for a parachain.
136///
137/// A collator is similar to a validator in a normal blockchain.
138/// It is responsible for producing blocks and sending the blocks to a
139/// parachain validator for validation and inclusion into the relay chain.
140#[deprecated = "use start_relay_chain_tasks instead"]
141pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner>(
142	StartCollatorParams {
143		block_status,
144		client,
145		announce_block,
146		spawner,
147		para_id,
148		task_manager,
149		relay_chain_interface,
150		parachain_consensus,
151		import_queue,
152		collator_key,
153		relay_chain_slot_duration,
154		recovery_handle,
155		sync_service,
156		prometheus_registry,
157	}: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner>,
158) -> sc_service::error::Result<()>
159where
160	Block: BlockT,
161	BS: BlockBackend<Block> + Send + Sync + 'static,
162	Client: Finalizer<Block, Backend>
163		+ UsageProvider<Block>
164		+ HeaderBackend<Block>
165		+ Send
166		+ Sync
167		+ BlockBackend<Block>
168		+ BlockchainEvents<Block>
169		+ ProvideRuntimeApi<Block>
170		+ 'static,
171	Client::Api: CollectCollationInfo<Block>,
172	for<'b> &'b Client: BlockImport<Block>,
173	Spawner: SpawnNamed + Clone + Send + Sync + 'static,
174	RCInterface: RelayChainInterface + Clone + 'static,
175	Backend: BackendT<Block> + 'static,
176{
177	let overseer_handle = relay_chain_interface
178		.overseer_handle()
179		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
180
181	start_relay_chain_tasks(StartRelayChainTasksParams {
182		client: client.clone(),
183		announce_block: announce_block.clone(),
184		para_id,
185		task_manager,
186		da_recovery_profile: DARecoveryProfile::Collator,
187		relay_chain_interface,
188		import_queue,
189		relay_chain_slot_duration,
190		recovery_handle,
191		sync_service,
192		prometheus_registry,
193	})?;
194
195	#[allow(deprecated)]
196	cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams {
197		runtime_api: client,
198		block_status,
199		announce_block,
200		overseer_handle,
201		spawner,
202		para_id,
203		key: collator_key,
204		parachain_consensus,
205	})
206	.await;
207
208	Ok(())
209}
210
211/// Start necessary consensus tasks related to the relay chain.
212///
213/// Parachain nodes need to track the state of the relay chain and use the
214/// relay chain's data availability service to fetch blocks if they don't
215/// arrive via the normal p2p layer (i.e. when authors withhold their blocks deliberately).
216///
217/// This function spawns work for those side tasks.
218///
219/// It also spawns a parachain informant task that will log the relay chain state and some metrics.
220pub fn start_relay_chain_tasks<Block, Client, Backend, RCInterface>(
221	StartRelayChainTasksParams {
222		client,
223		announce_block,
224		para_id,
225		task_manager,
226		da_recovery_profile,
227		relay_chain_interface,
228		import_queue,
229		relay_chain_slot_duration,
230		recovery_handle,
231		sync_service,
232		prometheus_registry,
233	}: StartRelayChainTasksParams<Block, Client, RCInterface>,
234) -> sc_service::error::Result<()>
235where
236	Block: BlockT,
237	Client: Finalizer<Block, Backend>
238		+ UsageProvider<Block>
239		+ HeaderBackend<Block>
240		+ Send
241		+ Sync
242		+ BlockBackend<Block>
243		+ BlockchainEvents<Block>
244		+ 'static,
245	for<'a> &'a Client: BlockImport<Block>,
246	Backend: BackendT<Block> + 'static,
247	RCInterface: RelayChainInterface + Clone + 'static,
248{
249	let (recovery_chan_tx, recovery_chan_rx) = mpsc::channel(RECOVERY_CHAN_SIZE);
250
251	let consensus = cumulus_client_consensus_common::run_parachain_consensus(
252		para_id,
253		client.clone(),
254		relay_chain_interface.clone(),
255		announce_block.clone(),
256		Some(recovery_chan_tx),
257	);
258
259	task_manager
260		.spawn_essential_handle()
261		.spawn_blocking("cumulus-consensus", None, consensus);
262
263	let da_recovery_profile = match da_recovery_profile {
264		DARecoveryProfile::Collator => {
265			// We want that collators wait at maximum the relay chain slot duration before starting
266			// to recover blocks. Additionally, we wait at least half the slot time to give the
267			// relay chain the chance to increase availability.
268			RecoveryDelayRange {
269				min: relay_chain_slot_duration / 2,
270				max: relay_chain_slot_duration,
271			}
272		},
273		DARecoveryProfile::FullNode => {
274			// Full nodes should at least wait 2.5 minutes (assuming 6 seconds slot duration) and
275			// in maximum 5 minutes before starting to recover blocks. Collators should already
276			// start the recovery way before full nodes try to recover a certain block and then
277			// share the block with the network using "the normal way". Full nodes are just the
278			// "last resort" for block recovery.
279			RecoveryDelayRange {
280				min: relay_chain_slot_duration * 25,
281				max: relay_chain_slot_duration * 50,
282			}
283		},
284		DARecoveryProfile::Other(profile) => profile,
285	};
286
287	let pov_recovery = PoVRecovery::new(
288		recovery_handle,
289		da_recovery_profile,
290		client.clone(),
291		import_queue,
292		relay_chain_interface.clone(),
293		para_id,
294		recovery_chan_rx,
295		sync_service.clone(),
296	);
297
298	task_manager
299		.spawn_essential_handle()
300		.spawn("cumulus-pov-recovery", None, pov_recovery.run());
301
302	let parachain_informant = parachain_informant::<Block, _>(
303		para_id,
304		relay_chain_interface.clone(),
305		client.clone(),
306		prometheus_registry.map(ParachainInformantMetrics::new).transpose()?,
307	);
308	task_manager
309		.spawn_handle()
310		.spawn("parachain-informant", None, parachain_informant);
311
312	Ok(())
313}
314
315/// Start a full node for a parachain.
316///
317/// A full node will only sync the given parachain and will follow the
318/// tip of the chain.
319#[deprecated = "use start_relay_chain_tasks instead"]
320pub fn start_full_node<Block, Client, Backend, RCInterface>(
321	StartFullNodeParams {
322		client,
323		announce_block,
324		task_manager,
325		relay_chain_interface,
326		para_id,
327		relay_chain_slot_duration,
328		import_queue,
329		recovery_handle,
330		sync_service,
331		prometheus_registry,
332	}: StartFullNodeParams<Block, Client, RCInterface>,
333) -> sc_service::error::Result<()>
334where
335	Block: BlockT,
336	Client: Finalizer<Block, Backend>
337		+ UsageProvider<Block>
338		+ HeaderBackend<Block>
339		+ Send
340		+ Sync
341		+ BlockBackend<Block>
342		+ BlockchainEvents<Block>
343		+ 'static,
344	for<'a> &'a Client: BlockImport<Block>,
345	Backend: BackendT<Block> + 'static,
346	RCInterface: RelayChainInterface + Clone + 'static,
347{
348	start_relay_chain_tasks(StartRelayChainTasksParams {
349		client,
350		announce_block,
351		task_manager,
352		relay_chain_interface,
353		para_id,
354		relay_chain_slot_duration,
355		import_queue,
356		recovery_handle,
357		sync_service,
358		da_recovery_profile: DARecoveryProfile::FullNode,
359		prometheus_registry,
360	})
361}
362
363/// Re-exports of old parachain consensus loop start logic.
364#[deprecated = "This is old consensus architecture only for backwards compatibility \
365	and will be removed in the future"]
366pub mod old_consensus {
367	#[allow(deprecated)]
368	pub use cumulus_client_collator::{start_collator, start_collator_sync, StartCollatorParams};
369}
370
371/// Prepare the parachain's node configuration
372///
373/// This function will:
374/// * Disable the default announcement of Substrate for the parachain in favor of the one of
375///   Cumulus.
376/// * Set peers needed to start warp sync to 1.
377pub fn prepare_node_config(mut parachain_config: Configuration) -> Configuration {
378	parachain_config.announce_block = false;
379	// Parachains only need 1 peer to start warp sync, because the target block is fetched from the
380	// relay chain.
381	parachain_config.network.min_peers_to_start_warp_sync = Some(1);
382
383	parachain_config
384}
385
386/// Build a relay chain interface.
387/// Will return a minimal relay chain node with RPC
388/// client or an inprocess node, based on the [`CollatorOptions`] passed in.
389pub async fn build_relay_chain_interface(
390	relay_chain_config: Configuration,
391	parachain_config: &Configuration,
392	telemetry_worker_handle: Option<TelemetryWorkerHandle>,
393	task_manager: &mut TaskManager,
394	collator_options: CollatorOptions,
395	hwbench: Option<sc_sysinfo::HwBench>,
396) -> RelayChainResult<(
397	Arc<(dyn RelayChainInterface + 'static)>,
398	Option<CollatorPair>,
399	Arc<dyn NetworkService>,
400	async_channel::Receiver<IncomingRequest>,
401)> {
402	match collator_options.relay_chain_mode {
403		cumulus_client_cli::RelayChainMode::Embedded => build_inprocess_relay_chain(
404			relay_chain_config,
405			parachain_config,
406			telemetry_worker_handle,
407			task_manager,
408			hwbench,
409		),
410		cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =>
411			build_minimal_relay_chain_node_with_rpc(
412				relay_chain_config,
413				parachain_config.prometheus_registry(),
414				task_manager,
415				rpc_target_urls,
416			)
417			.await,
418	}
419}
420
421/// The expected level of collator sybil-resistance on the network. This is used to
422/// configure the type of metadata passed alongside block announcements on the network.
423pub enum CollatorSybilResistance {
424	/// There is a collator-selection protocol which provides sybil-resistance,
425	/// such as Aura. Sybil-resistant collator-selection protocols are able to
426	/// operate more efficiently.
427	Resistant,
428	/// There is no collator-selection protocol providing sybil-resistance.
429	/// In situations such as "free-for-all" collators, the network is unresistant
430	/// and needs to attach more metadata to block announcements, relying on relay-chain
431	/// validators to avoid handling unbounded numbers of blocks.
432	Unresistant,
433}
434
435/// Parameters given to [`build_network`].
436pub struct BuildNetworkParams<
437	'a,
438	Block: BlockT,
439	Client: ProvideRuntimeApi<Block>
440		+ BlockBackend<Block>
441		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
442		+ HeaderBackend<Block>
443		+ BlockIdTo<Block>
444		+ 'static,
445	Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
446	RCInterface,
447	IQ,
448> where
449	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
450{
451	pub parachain_config: &'a Configuration,
452	pub net_config:
453		sc_network::config::FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Network>,
454	pub client: Arc<Client>,
455	pub transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<Block, Client>>,
456	pub para_id: ParaId,
457	pub relay_chain_interface: RCInterface,
458	pub spawn_handle: SpawnTaskHandle,
459	pub import_queue: IQ,
460	pub sybil_resistance_level: CollatorSybilResistance,
461	pub metrics: sc_network::NotificationMetrics,
462}
463
464/// Build the network service, the network status sinks and an RPC sender.
465pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>(
466	BuildNetworkParams {
467		parachain_config,
468		net_config,
469		client,
470		transaction_pool,
471		para_id,
472		spawn_handle,
473		relay_chain_interface,
474		import_queue,
475		sybil_resistance_level,
476		metrics,
477	}: BuildNetworkParams<'a, Block, Client, Network, RCInterface, IQ>,
478) -> sc_service::error::Result<(
479	Arc<dyn NetworkService>,
480	TracingUnboundedSender<sc_rpc::system::Request<Block>>,
481	TransactionsHandlerController<Block::Hash>,
482	Arc<SyncingService<Block>>,
483)>
484where
485	Block: BlockT,
486	Client: UsageProvider<Block>
487		+ HeaderBackend<Block>
488		+ sp_consensus::block_validation::Chain<Block>
489		+ Send
490		+ Sync
491		+ BlockBackend<Block>
492		+ BlockchainEvents<Block>
493		+ ProvideRuntimeApi<Block>
494		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
495		+ BlockIdTo<Block, Error = sp_blockchain::Error>
496		+ ProofProvider<Block>
497		+ 'static,
498	Client::Api: CollectCollationInfo<Block>
499		+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
500	for<'b> &'b Client: BlockImport<Block>,
501	RCInterface: RelayChainInterface + Clone + 'static,
502	IQ: ImportQueue<Block> + 'static,
503	Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
504{
505	let warp_sync_config = match parachain_config.network.sync_mode {
506		SyncMode::Warp => {
507			log::debug!(target: LOG_TARGET_SYNC, "waiting for announce block...");
508
509			let target_block =
510				wait_for_finalized_para_head::<Block, _>(para_id, relay_chain_interface.clone())
511					.await
512					.inspect_err(|e| {
513						log::error!(
514							target: LOG_TARGET_SYNC,
515							"Unable to determine parachain target block {:?}",
516							e
517						);
518					})?;
519			Some(WarpSyncConfig::WithTarget(target_block))
520		},
521		_ => None,
522	};
523
524	let block_announce_validator = match sybil_resistance_level {
525		CollatorSybilResistance::Resistant => {
526			let block_announce_validator = AssumeSybilResistance::allow_seconded_messages();
527			Box::new(block_announce_validator) as Box<_>
528		},
529		CollatorSybilResistance::Unresistant => {
530			let block_announce_validator =
531				RequireSecondedInBlockAnnounce::new(relay_chain_interface, para_id);
532			Box::new(block_announce_validator) as Box<_>
533		},
534	};
535
536	sc_service::build_network(sc_service::BuildNetworkParams {
537		config: parachain_config,
538		net_config,
539		client,
540		transaction_pool,
541		spawn_handle,
542		import_queue,
543		block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)),
544		warp_sync_config,
545		block_relay: None,
546		metrics,
547	})
548}
549
550/// Waits for the relay chain to have finished syncing and then gets the parachain header that
551/// corresponds to the last finalized relay chain block.
552async fn wait_for_finalized_para_head<B, RCInterface>(
553	para_id: ParaId,
554	relay_chain_interface: RCInterface,
555) -> sc_service::error::Result<<B as BlockT>::Header>
556where
557	B: BlockT + 'static,
558	RCInterface: RelayChainInterface + Send + 'static,
559{
560	let mut imported_blocks = relay_chain_interface
561		.import_notification_stream()
562		.await
563		.map_err(|error| {
564			sc_service::Error::Other(format!(
565				"Relay chain import notification stream error when waiting for parachain head: \
566				{error}"
567			))
568		})?
569		.fuse();
570	while imported_blocks.next().await.is_some() {
571		let is_syncing = relay_chain_interface
572			.is_major_syncing()
573			.await
574			.map_err(|e| format!("Unable to determine sync status: {e}"))?;
575
576		if !is_syncing {
577			let relay_chain_best_hash = relay_chain_interface
578				.finalized_block_hash()
579				.await
580				.map_err(|e| Box::new(e) as Box<_>)?;
581
582			let validation_data = relay_chain_interface
583				.persisted_validation_data(
584					relay_chain_best_hash,
585					para_id,
586					OccupiedCoreAssumption::TimedOut,
587				)
588				.await
589				.map_err(|e| format!("{e:?}"))?
590				.ok_or("Could not find parachain head in relay chain")?;
591
592			let finalized_header = B::Header::decode(&mut &validation_data.parent_head.0[..])
593				.map_err(|e| format!("Failed to decode parachain head: {e}"))?;
594
595			log::info!(
596				"๐ŸŽ‰ Received target parachain header #{} ({}) from the relay chain.",
597				finalized_header.number(),
598				finalized_header.hash()
599			);
600			return Ok(finalized_header)
601		}
602	}
603
604	Err("Stopping following imported blocks. Could not determine parachain target block".into())
605}
606
607/// Task for logging candidate events and some related metrics.
608async fn parachain_informant<Block: BlockT, Client>(
609	para_id: ParaId,
610	relay_chain_interface: impl RelayChainInterface + Clone,
611	client: Arc<Client>,
612	metrics: Option<ParachainInformantMetrics>,
613) where
614	Client: HeaderBackend<Block> + Send + Sync + 'static,
615{
616	let mut import_notifications = match relay_chain_interface.import_notification_stream().await {
617		Ok(import_notifications) => import_notifications,
618		Err(e) => {
619			log::error!("Failed to get import notification stream: {e:?}. Parachain informant will not run!");
620			return
621		},
622	};
623	let mut last_backed_block_time: Option<Instant> = None;
624	while let Some(n) = import_notifications.next().await {
625		let candidate_events = match relay_chain_interface.candidate_events(n.hash()).await {
626			Ok(candidate_events) => candidate_events,
627			Err(e) => {
628				log::warn!("Failed to get candidate events for block {}: {e:?}", n.hash());
629				continue
630			},
631		};
632		let mut backed_candidates = Vec::new();
633		let mut included_candidates = Vec::new();
634		let mut timed_out_candidates = Vec::new();
635		for event in candidate_events {
636			match event {
637				CandidateEvent::CandidateBacked(receipt, head, _, _) => {
638					if receipt.descriptor.para_id() != para_id {
639						continue;
640					}
641					let backed_block = match Block::Header::decode(&mut &head.0[..]) {
642						Ok(header) => header,
643						Err(e) => {
644							log::warn!(
645								"Failed to decode parachain header from backed block: {e:?}"
646							);
647							continue
648						},
649					};
650					let backed_block_time = Instant::now();
651					if let Some(last_backed_block_time) = &last_backed_block_time {
652						let duration = backed_block_time.duration_since(*last_backed_block_time);
653						if let Some(metrics) = &metrics {
654							metrics.parachain_block_backed_duration.observe(duration.as_secs_f64());
655						}
656					}
657					last_backed_block_time = Some(backed_block_time);
658					backed_candidates.push(backed_block);
659				},
660				CandidateEvent::CandidateIncluded(receipt, head, _, _) => {
661					if receipt.descriptor.para_id() != para_id {
662						continue;
663					}
664					let included_block = match Block::Header::decode(&mut &head.0[..]) {
665						Ok(header) => header,
666						Err(e) => {
667							log::warn!(
668								"Failed to decode parachain header from included block: {e:?}"
669							);
670							continue
671						},
672					};
673					let unincluded_segment_size =
674						client.info().best_number.saturating_sub(*included_block.number());
675					let unincluded_segment_size: u32 = unincluded_segment_size.saturated_into();
676					if let Some(metrics) = &metrics {
677						metrics.unincluded_segment_size.observe(unincluded_segment_size.into());
678					}
679					included_candidates.push(included_block);
680				},
681				CandidateEvent::CandidateTimedOut(receipt, head, _) => {
682					if receipt.descriptor.para_id() != para_id {
683						continue;
684					}
685					let timed_out_block = match Block::Header::decode(&mut &head.0[..]) {
686						Ok(header) => header,
687						Err(e) => {
688							log::warn!(
689								"Failed to decode parachain header from timed out block: {e:?}"
690							);
691							continue
692						},
693					};
694					timed_out_candidates.push(timed_out_block);
695				},
696			}
697		}
698		let mut log_parts = Vec::new();
699		if !backed_candidates.is_empty() {
700			let backed_candidates = backed_candidates
701				.into_iter()
702				.map(|c| format!("#{} ({})", c.number(), c.hash()))
703				.collect::<Vec<_>>()
704				.join(", ");
705			log_parts.push(format!("backed: {}", backed_candidates));
706		};
707		if !included_candidates.is_empty() {
708			let included_candidates = included_candidates
709				.into_iter()
710				.map(|c| format!("#{} ({})", c.number(), c.hash()))
711				.collect::<Vec<_>>()
712				.join(", ");
713			log_parts.push(format!("included: {}", included_candidates));
714		};
715		if !timed_out_candidates.is_empty() {
716			let timed_out_candidates = timed_out_candidates
717				.into_iter()
718				.map(|c| format!("#{} ({})", c.number(), c.hash()))
719				.collect::<Vec<_>>()
720				.join(", ");
721			log_parts.push(format!("timed out: {}", timed_out_candidates));
722		};
723		if !log_parts.is_empty() {
724			log::info!(
725				"Update at relay chain block #{} ({}) - {}",
726				n.number(),
727				n.hash(),
728				log_parts.join(", ")
729			);
730		}
731	}
732}
733
734struct ParachainInformantMetrics {
735	/// Time between parachain blocks getting backed by the relaychain.
736	parachain_block_backed_duration: Histogram,
737	/// Number of blocks between best block and last included block.
738	unincluded_segment_size: Histogram,
739}
740
741impl ParachainInformantMetrics {
742	fn new(prometheus_registry: &Registry) -> prometheus::Result<Self> {
743		let parachain_block_authorship_duration = Histogram::with_opts(HistogramOpts::new(
744			"parachain_block_backed_duration",
745			"Time between parachain blocks getting backed by the relaychain",
746		))?;
747		prometheus_registry.register(Box::new(parachain_block_authorship_duration.clone()))?;
748
749		let unincluded_segment_size = Histogram::with_opts(
750			HistogramOpts::new(
751				"parachain_unincluded_segment_size",
752				"Number of blocks between best block and last included block",
753			)
754			.buckets((0..=24).into_iter().map(|i| i as f64).collect()),
755		)?;
756		prometheus_registry.register(Box::new(unincluded_segment_size.clone()))?;
757
758		Ok(Self {
759			parachain_block_backed_duration: parachain_block_authorship_duration,
760			unincluded_segment_size,
761		})
762	}
763}