referrerpolicy=no-referrer-when-downgrade

cumulus_client_service/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! Cumulus service
19//!
20//! Provides functions for starting a collator node or a normal full node.
21
22use cumulus_client_cli::CollatorOptions;
23use cumulus_client_consensus_common::ParachainConsensus;
24use cumulus_client_network::{AssumeSybilResistance, RequireSecondedInBlockAnnounce};
25use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelayRange, RecoveryHandle};
26use cumulus_primitives_core::{CollectCollationInfo, ParaId};
27pub use cumulus_primitives_proof_size_hostfunction::storage_proof_size;
28use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
29use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
30use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
31use futures::{channel::mpsc, StreamExt};
32use polkadot_primitives::{CandidateEvent, CollatorPair, OccupiedCoreAssumption};
33use prometheus::{Histogram, HistogramOpts, Registry};
34use sc_client_api::{
35	Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, ProofProvider, UsageProvider,
36};
37use sc_consensus::{
38	import_queue::{ImportQueue, ImportQueueService},
39	BlockImport,
40};
41use sc_network::{
42	config::SyncMode, request_responses::IncomingRequest, service::traits::NetworkService,
43	NetworkBackend,
44};
45use sc_network_sync::SyncingService;
46use sc_network_transactions::TransactionsHandlerController;
47use sc_service::{Configuration, SpawnTaskHandle, TaskManager, WarpSyncConfig};
48use sc_telemetry::{log, TelemetryWorkerHandle};
49use sc_utils::mpsc::TracingUnboundedSender;
50use sp_api::ProvideRuntimeApi;
51use sp_blockchain::{HeaderBackend, HeaderMetadata};
52use sp_core::Decode;
53use sp_runtime::{
54	traits::{Block as BlockT, BlockIdTo, Header},
55	SaturatedConversion, Saturating,
56};
57use std::{
58	sync::Arc,
59	time::{Duration, Instant},
60};
61
62/// Host functions that should be used in parachain nodes.
63///
64/// Contains the standard substrate host functions, as well as a
65/// host function to enable PoV-reclaim on parachain nodes.
66pub type ParachainHostFunctions = (
67	cumulus_primitives_proof_size_hostfunction::storage_proof_size::HostFunctions,
68	sp_io::SubstrateHostFunctions,
69);
70
71// Given the sporadic nature of the explicit recovery operation and the
72// possibility to retry infinite times this value is more than enough.
73// In practice here we expect no more than one queued messages.
74const RECOVERY_CHAN_SIZE: usize = 8;
75const LOG_TARGET_SYNC: &str = "sync::cumulus";
76
77/// A hint about how long the node should wait before attempting to recover missing block data
78/// from the data availability layer.
79pub enum DARecoveryProfile {
80	/// Collators use an aggressive recovery profile by default.
81	Collator,
82	/// Full nodes use a passive recovery profile by default, as they are not direct
83	/// victims of withholding attacks.
84	FullNode,
85	/// Provide an explicit recovery profile.
86	Other(RecoveryDelayRange),
87}
88
89pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawner> {
90	pub block_status: Arc<BS>,
91	pub client: Arc<Client>,
92	pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
93	pub spawner: Spawner,
94	pub para_id: ParaId,
95	pub relay_chain_interface: RCInterface,
96	pub task_manager: &'a mut TaskManager,
97	pub parachain_consensus: Box<dyn ParachainConsensus<Block>>,
98	pub import_queue: Box<dyn ImportQueueService<Block>>,
99	pub collator_key: CollatorPair,
100	pub relay_chain_slot_duration: Duration,
101	pub recovery_handle: Box<dyn RecoveryHandle>,
102	pub sync_service: Arc<SyncingService<Block>>,
103	pub prometheus_registry: Option<&'a Registry>,
104}
105
106/// Parameters given to [`start_relay_chain_tasks`].
107pub struct StartRelayChainTasksParams<'a, Block: BlockT, Client, RCInterface> {
108	pub client: Arc<Client>,
109	pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
110	pub para_id: ParaId,
111	pub relay_chain_interface: RCInterface,
112	pub task_manager: &'a mut TaskManager,
113	pub da_recovery_profile: DARecoveryProfile,
114	pub import_queue: Box<dyn ImportQueueService<Block>>,
115	pub relay_chain_slot_duration: Duration,
116	pub recovery_handle: Box<dyn RecoveryHandle>,
117	pub sync_service: Arc<SyncingService<Block>>,
118	pub prometheus_registry: Option<&'a Registry>,
119}
120
121/// Start necessary consensus tasks related to the relay chain.
122///
123/// Parachain nodes need to track the state of the relay chain and use the
124/// relay chain's data availability service to fetch blocks if they don't
125/// arrive via the normal p2p layer (i.e. when authors withhold their blocks deliberately).
126///
127/// This function spawns work for those side tasks.
128///
129/// It also spawns a parachain informant task that will log the relay chain state and some metrics.
130pub fn start_relay_chain_tasks<Block, Client, Backend, RCInterface>(
131	StartRelayChainTasksParams {
132		client,
133		announce_block,
134		para_id,
135		task_manager,
136		da_recovery_profile,
137		relay_chain_interface,
138		import_queue,
139		relay_chain_slot_duration,
140		recovery_handle,
141		sync_service,
142		prometheus_registry,
143	}: StartRelayChainTasksParams<Block, Client, RCInterface>,
144) -> sc_service::error::Result<()>
145where
146	Block: BlockT,
147	Client: Finalizer<Block, Backend>
148		+ UsageProvider<Block>
149		+ HeaderBackend<Block>
150		+ Send
151		+ Sync
152		+ BlockBackend<Block>
153		+ BlockchainEvents<Block>
154		+ 'static,
155	for<'a> &'a Client: BlockImport<Block>,
156	Backend: BackendT<Block> + 'static,
157	RCInterface: RelayChainInterface + Clone + 'static,
158{
159	let (recovery_chan_tx, recovery_chan_rx) = mpsc::channel(RECOVERY_CHAN_SIZE);
160
161	let consensus = cumulus_client_consensus_common::run_parachain_consensus(
162		para_id,
163		client.clone(),
164		relay_chain_interface.clone(),
165		announce_block.clone(),
166		Some(recovery_chan_tx),
167	);
168
169	task_manager
170		.spawn_essential_handle()
171		.spawn_blocking("cumulus-consensus", None, consensus);
172
173	let da_recovery_profile = match da_recovery_profile {
174		DARecoveryProfile::Collator => {
175			// We want that collators wait at maximum the relay chain slot duration before starting
176			// to recover blocks. Additionally, we wait at least half the slot time to give the
177			// relay chain the chance to increase availability.
178			RecoveryDelayRange {
179				min: relay_chain_slot_duration / 2,
180				max: relay_chain_slot_duration,
181			}
182		},
183		DARecoveryProfile::FullNode => {
184			// Full nodes should at least wait 2.5 minutes (assuming 6 seconds slot duration) and
185			// in maximum 5 minutes before starting to recover blocks. Collators should already
186			// start the recovery way before full nodes try to recover a certain block and then
187			// share the block with the network using "the normal way". Full nodes are just the
188			// "last resort" for block recovery.
189			RecoveryDelayRange {
190				min: relay_chain_slot_duration * 25,
191				max: relay_chain_slot_duration * 50,
192			}
193		},
194		DARecoveryProfile::Other(profile) => profile,
195	};
196
197	let pov_recovery = PoVRecovery::new(
198		recovery_handle,
199		da_recovery_profile,
200		client.clone(),
201		import_queue,
202		relay_chain_interface.clone(),
203		para_id,
204		recovery_chan_rx,
205		sync_service.clone(),
206	);
207
208	task_manager
209		.spawn_essential_handle()
210		.spawn("cumulus-pov-recovery", None, pov_recovery.run());
211
212	let parachain_informant = parachain_informant::<Block, _>(
213		para_id,
214		relay_chain_interface.clone(),
215		client.clone(),
216		prometheus_registry.map(ParachainInformantMetrics::new).transpose()?,
217	);
218	task_manager
219		.spawn_handle()
220		.spawn("parachain-informant", None, parachain_informant);
221
222	Ok(())
223}
224
225/// Prepare the parachain's node configuration
226///
227/// This function will:
228/// * Disable the default announcement of Substrate for the parachain in favor of the one of
229///   Cumulus.
230/// * Set peers needed to start warp sync to 1.
231pub fn prepare_node_config(mut parachain_config: Configuration) -> Configuration {
232	parachain_config.announce_block = false;
233	// Parachains only need 1 peer to start warp sync, because the target block is fetched from the
234	// relay chain.
235	parachain_config.network.min_peers_to_start_warp_sync = Some(1);
236
237	parachain_config
238}
239
240/// Build a relay chain interface.
241/// Will return a minimal relay chain node with RPC
242/// client or an inprocess node, based on the [`CollatorOptions`] passed in.
243pub async fn build_relay_chain_interface(
244	relay_chain_config: Configuration,
245	parachain_config: &Configuration,
246	telemetry_worker_handle: Option<TelemetryWorkerHandle>,
247	task_manager: &mut TaskManager,
248	collator_options: CollatorOptions,
249	hwbench: Option<sc_sysinfo::HwBench>,
250) -> RelayChainResult<(
251	Arc<(dyn RelayChainInterface + 'static)>,
252	Option<CollatorPair>,
253	Arc<dyn NetworkService>,
254	async_channel::Receiver<IncomingRequest>,
255)> {
256	match collator_options.relay_chain_mode {
257		cumulus_client_cli::RelayChainMode::Embedded => build_inprocess_relay_chain(
258			relay_chain_config,
259			parachain_config,
260			telemetry_worker_handle,
261			task_manager,
262			hwbench,
263		),
264		cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =>
265			build_minimal_relay_chain_node_with_rpc(
266				relay_chain_config,
267				parachain_config.prometheus_registry(),
268				task_manager,
269				rpc_target_urls,
270			)
271			.await,
272	}
273}
274
275/// The expected level of collator sybil-resistance on the network. This is used to
276/// configure the type of metadata passed alongside block announcements on the network.
277pub enum CollatorSybilResistance {
278	/// There is a collator-selection protocol which provides sybil-resistance,
279	/// such as Aura. Sybil-resistant collator-selection protocols are able to
280	/// operate more efficiently.
281	Resistant,
282	/// There is no collator-selection protocol providing sybil-resistance.
283	/// In situations such as "free-for-all" collators, the network is unresistant
284	/// and needs to attach more metadata to block announcements, relying on relay-chain
285	/// validators to avoid handling unbounded numbers of blocks.
286	Unresistant,
287}
288
289/// Parameters given to [`build_network`].
290pub struct BuildNetworkParams<
291	'a,
292	Block: BlockT,
293	Client: ProvideRuntimeApi<Block>
294		+ BlockBackend<Block>
295		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
296		+ HeaderBackend<Block>
297		+ BlockIdTo<Block>
298		+ 'static,
299	Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
300	RCInterface,
301	IQ,
302> where
303	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
304{
305	pub parachain_config: &'a Configuration,
306	pub net_config:
307		sc_network::config::FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Network>,
308	pub client: Arc<Client>,
309	pub transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<Block, Client>>,
310	pub para_id: ParaId,
311	pub relay_chain_interface: RCInterface,
312	pub spawn_handle: SpawnTaskHandle,
313	pub import_queue: IQ,
314	pub sybil_resistance_level: CollatorSybilResistance,
315	pub metrics: sc_network::NotificationMetrics,
316}
317
318/// Build the network service, the network status sinks and an RPC sender.
319pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>(
320	BuildNetworkParams {
321		parachain_config,
322		net_config,
323		client,
324		transaction_pool,
325		para_id,
326		spawn_handle,
327		relay_chain_interface,
328		import_queue,
329		sybil_resistance_level,
330		metrics,
331	}: BuildNetworkParams<'a, Block, Client, Network, RCInterface, IQ>,
332) -> sc_service::error::Result<(
333	Arc<dyn NetworkService>,
334	TracingUnboundedSender<sc_rpc::system::Request<Block>>,
335	TransactionsHandlerController<Block::Hash>,
336	Arc<SyncingService<Block>>,
337)>
338where
339	Block: BlockT,
340	Client: UsageProvider<Block>
341		+ HeaderBackend<Block>
342		+ sp_consensus::block_validation::Chain<Block>
343		+ Send
344		+ Sync
345		+ BlockBackend<Block>
346		+ BlockchainEvents<Block>
347		+ ProvideRuntimeApi<Block>
348		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
349		+ BlockIdTo<Block, Error = sp_blockchain::Error>
350		+ ProofProvider<Block>
351		+ 'static,
352	Client::Api: CollectCollationInfo<Block>
353		+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
354	for<'b> &'b Client: BlockImport<Block>,
355	RCInterface: RelayChainInterface + Clone + 'static,
356	IQ: ImportQueue<Block> + 'static,
357	Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
358{
359	let warp_sync_config = match parachain_config.network.sync_mode {
360		SyncMode::Warp => {
361			log::debug!(target: LOG_TARGET_SYNC, "waiting for announce block...");
362
363			let target_block =
364				wait_for_finalized_para_head::<Block, _>(para_id, relay_chain_interface.clone())
365					.await
366					.inspect_err(|e| {
367						log::error!(
368							target: LOG_TARGET_SYNC,
369							"Unable to determine parachain target block {:?}",
370							e
371						);
372					})?;
373			Some(WarpSyncConfig::WithTarget(target_block))
374		},
375		_ => None,
376	};
377
378	let block_announce_validator = match sybil_resistance_level {
379		CollatorSybilResistance::Resistant => {
380			let block_announce_validator = AssumeSybilResistance::allow_seconded_messages();
381			Box::new(block_announce_validator) as Box<_>
382		},
383		CollatorSybilResistance::Unresistant => {
384			let block_announce_validator =
385				RequireSecondedInBlockAnnounce::new(relay_chain_interface, para_id);
386			Box::new(block_announce_validator) as Box<_>
387		},
388	};
389
390	sc_service::build_network(sc_service::BuildNetworkParams {
391		config: parachain_config,
392		net_config,
393		client,
394		transaction_pool,
395		spawn_handle,
396		import_queue,
397		block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)),
398		warp_sync_config,
399		block_relay: None,
400		metrics,
401	})
402}
403
404/// Waits for the relay chain to have finished syncing and then gets the parachain header that
405/// corresponds to the last finalized relay chain block.
406async fn wait_for_finalized_para_head<B, RCInterface>(
407	para_id: ParaId,
408	relay_chain_interface: RCInterface,
409) -> sc_service::error::Result<<B as BlockT>::Header>
410where
411	B: BlockT + 'static,
412	RCInterface: RelayChainInterface + Send + 'static,
413{
414	let mut imported_blocks = relay_chain_interface
415		.import_notification_stream()
416		.await
417		.map_err(|error| {
418			sc_service::Error::Other(format!(
419				"Relay chain import notification stream error when waiting for parachain head: \
420				{error}"
421			))
422		})?
423		.fuse();
424	while imported_blocks.next().await.is_some() {
425		let is_syncing = relay_chain_interface
426			.is_major_syncing()
427			.await
428			.map_err(|e| format!("Unable to determine sync status: {e}"))?;
429
430		if !is_syncing {
431			let relay_chain_best_hash = relay_chain_interface
432				.finalized_block_hash()
433				.await
434				.map_err(|e| Box::new(e) as Box<_>)?;
435
436			let validation_data = relay_chain_interface
437				.persisted_validation_data(
438					relay_chain_best_hash,
439					para_id,
440					OccupiedCoreAssumption::TimedOut,
441				)
442				.await
443				.map_err(|e| format!("{e:?}"))?
444				.ok_or("Could not find parachain head in relay chain")?;
445
446			let finalized_header = B::Header::decode(&mut &validation_data.parent_head.0[..])
447				.map_err(|e| format!("Failed to decode parachain head: {e}"))?;
448
449			log::info!(
450				"๐ŸŽ‰ Received target parachain header #{} ({}) from the relay chain.",
451				finalized_header.number(),
452				finalized_header.hash()
453			);
454			return Ok(finalized_header)
455		}
456	}
457
458	Err("Stopping following imported blocks. Could not determine parachain target block".into())
459}
460
461/// Task for logging candidate events and some related metrics.
462async fn parachain_informant<Block: BlockT, Client>(
463	para_id: ParaId,
464	relay_chain_interface: impl RelayChainInterface + Clone,
465	client: Arc<Client>,
466	metrics: Option<ParachainInformantMetrics>,
467) where
468	Client: HeaderBackend<Block> + Send + Sync + 'static,
469{
470	let mut import_notifications = match relay_chain_interface.import_notification_stream().await {
471		Ok(import_notifications) => import_notifications,
472		Err(e) => {
473			log::error!("Failed to get import notification stream: {e:?}. Parachain informant will not run!");
474			return
475		},
476	};
477	let mut last_backed_block_time: Option<Instant> = None;
478	while let Some(n) = import_notifications.next().await {
479		let candidate_events = match relay_chain_interface.candidate_events(n.hash()).await {
480			Ok(candidate_events) => candidate_events,
481			Err(e) => {
482				log::warn!("Failed to get candidate events for block {}: {e:?}", n.hash());
483				continue
484			},
485		};
486		let mut backed_candidates = Vec::new();
487		let mut included_candidates = Vec::new();
488		let mut timed_out_candidates = Vec::new();
489		for event in candidate_events {
490			match event {
491				CandidateEvent::CandidateBacked(receipt, head, _, _) => {
492					if receipt.descriptor.para_id() != para_id {
493						continue;
494					}
495					let backed_block = match Block::Header::decode(&mut &head.0[..]) {
496						Ok(header) => header,
497						Err(e) => {
498							log::warn!(
499								"Failed to decode parachain header from backed block: {e:?}"
500							);
501							continue
502						},
503					};
504					let backed_block_time = Instant::now();
505					if let Some(last_backed_block_time) = &last_backed_block_time {
506						let duration = backed_block_time.duration_since(*last_backed_block_time);
507						if let Some(metrics) = &metrics {
508							metrics.parachain_block_backed_duration.observe(duration.as_secs_f64());
509						}
510					}
511					last_backed_block_time = Some(backed_block_time);
512					backed_candidates.push(backed_block);
513				},
514				CandidateEvent::CandidateIncluded(receipt, head, _, _) => {
515					if receipt.descriptor.para_id() != para_id {
516						continue;
517					}
518					let included_block = match Block::Header::decode(&mut &head.0[..]) {
519						Ok(header) => header,
520						Err(e) => {
521							log::warn!(
522								"Failed to decode parachain header from included block: {e:?}"
523							);
524							continue
525						},
526					};
527					let unincluded_segment_size =
528						client.info().best_number.saturating_sub(*included_block.number());
529					let unincluded_segment_size: u32 = unincluded_segment_size.saturated_into();
530					if let Some(metrics) = &metrics {
531						metrics.unincluded_segment_size.observe(unincluded_segment_size.into());
532					}
533					included_candidates.push(included_block);
534				},
535				CandidateEvent::CandidateTimedOut(receipt, head, _) => {
536					if receipt.descriptor.para_id() != para_id {
537						continue;
538					}
539					let timed_out_block = match Block::Header::decode(&mut &head.0[..]) {
540						Ok(header) => header,
541						Err(e) => {
542							log::warn!(
543								"Failed to decode parachain header from timed out block: {e:?}"
544							);
545							continue
546						},
547					};
548					timed_out_candidates.push(timed_out_block);
549				},
550			}
551		}
552		let mut log_parts = Vec::new();
553		if !backed_candidates.is_empty() {
554			let backed_candidates = backed_candidates
555				.into_iter()
556				.map(|c| format!("#{} ({})", c.number(), c.hash()))
557				.collect::<Vec<_>>()
558				.join(", ");
559			log_parts.push(format!("backed: {}", backed_candidates));
560		};
561		if !included_candidates.is_empty() {
562			let included_candidates = included_candidates
563				.into_iter()
564				.map(|c| format!("#{} ({})", c.number(), c.hash()))
565				.collect::<Vec<_>>()
566				.join(", ");
567			log_parts.push(format!("included: {}", included_candidates));
568		};
569		if !timed_out_candidates.is_empty() {
570			let timed_out_candidates = timed_out_candidates
571				.into_iter()
572				.map(|c| format!("#{} ({})", c.number(), c.hash()))
573				.collect::<Vec<_>>()
574				.join(", ");
575			log_parts.push(format!("timed out: {}", timed_out_candidates));
576		};
577		if !log_parts.is_empty() {
578			log::info!(
579				"Update at relay chain block #{} ({}) - {}",
580				n.number(),
581				n.hash(),
582				log_parts.join(", ")
583			);
584		}
585	}
586}
587
588struct ParachainInformantMetrics {
589	/// Time between parachain blocks getting backed by the relaychain.
590	parachain_block_backed_duration: Histogram,
591	/// Number of blocks between best block and last included block.
592	unincluded_segment_size: Histogram,
593}
594
595impl ParachainInformantMetrics {
596	fn new(prometheus_registry: &Registry) -> prometheus::Result<Self> {
597		let parachain_block_authorship_duration = Histogram::with_opts(HistogramOpts::new(
598			"parachain_block_backed_duration",
599			"Time between parachain blocks getting backed by the relaychain",
600		))?;
601		prometheus_registry.register(Box::new(parachain_block_authorship_duration.clone()))?;
602
603		let unincluded_segment_size = Histogram::with_opts(
604			HistogramOpts::new(
605				"parachain_unincluded_segment_size",
606				"Number of blocks between best block and last included block",
607			)
608			.buckets((0..=24).into_iter().map(|i| i as f64).collect()),
609		)?;
610		prometheus_registry.register(Box::new(unincluded_segment_size.clone()))?;
611
612		Ok(Self {
613			parachain_block_backed_duration: parachain_block_authorship_duration,
614			unincluded_segment_size,
615		})
616	}
617}