1use cumulus_client_cli::CollatorOptions;
23use cumulus_client_consensus_common::ParachainConsensus;
24use cumulus_client_network::{AssumeSybilResistance, RequireSecondedInBlockAnnounce};
25use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelayRange, RecoveryHandle};
26use cumulus_primitives_core::{CollectCollationInfo, ParaId};
27pub use cumulus_primitives_proof_size_hostfunction::storage_proof_size;
28use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
29use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
30use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
31use futures::{channel::mpsc, StreamExt};
32use polkadot_primitives::{CandidateEvent, CollatorPair, OccupiedCoreAssumption};
33use prometheus::{Histogram, HistogramOpts, Registry};
34use sc_client_api::{
35 Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, ProofProvider, UsageProvider,
36};
37use sc_consensus::{
38 import_queue::{ImportQueue, ImportQueueService},
39 BlockImport,
40};
41use sc_network::{
42 config::SyncMode, request_responses::IncomingRequest, service::traits::NetworkService,
43 NetworkBackend,
44};
45use sc_network_sync::SyncingService;
46use sc_network_transactions::TransactionsHandlerController;
47use sc_service::{Configuration, SpawnTaskHandle, TaskManager, WarpSyncConfig};
48use sc_telemetry::{log, TelemetryWorkerHandle};
49use sc_utils::mpsc::TracingUnboundedSender;
50use sp_api::ProvideRuntimeApi;
51use sp_blockchain::{HeaderBackend, HeaderMetadata};
52use sp_core::{traits::SpawnNamed, Decode};
53use sp_runtime::{
54 traits::{Block as BlockT, BlockIdTo, Header},
55 SaturatedConversion, Saturating,
56};
57use std::{
58 sync::Arc,
59 time::{Duration, Instant},
60};
61
62pub type ParachainHostFunctions = (
67 cumulus_primitives_proof_size_hostfunction::storage_proof_size::HostFunctions,
68 sp_io::SubstrateHostFunctions,
69);
70
71const RECOVERY_CHAN_SIZE: usize = 8;
75const LOG_TARGET_SYNC: &str = "sync::cumulus";
76
77pub enum DARecoveryProfile {
80 Collator,
82 FullNode,
85 Other(RecoveryDelayRange),
87}
88
89pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawner> {
90 pub block_status: Arc<BS>,
91 pub client: Arc<Client>,
92 pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
93 pub spawner: Spawner,
94 pub para_id: ParaId,
95 pub relay_chain_interface: RCInterface,
96 pub task_manager: &'a mut TaskManager,
97 pub parachain_consensus: Box<dyn ParachainConsensus<Block>>,
98 pub import_queue: Box<dyn ImportQueueService<Block>>,
99 pub collator_key: CollatorPair,
100 pub relay_chain_slot_duration: Duration,
101 pub recovery_handle: Box<dyn RecoveryHandle>,
102 pub sync_service: Arc<SyncingService<Block>>,
103 pub prometheus_registry: Option<&'a Registry>,
104}
105
106pub struct StartRelayChainTasksParams<'a, Block: BlockT, Client, RCInterface> {
108 pub client: Arc<Client>,
109 pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
110 pub para_id: ParaId,
111 pub relay_chain_interface: RCInterface,
112 pub task_manager: &'a mut TaskManager,
113 pub da_recovery_profile: DARecoveryProfile,
114 pub import_queue: Box<dyn ImportQueueService<Block>>,
115 pub relay_chain_slot_duration: Duration,
116 pub recovery_handle: Box<dyn RecoveryHandle>,
117 pub sync_service: Arc<SyncingService<Block>>,
118 pub prometheus_registry: Option<&'a Registry>,
119}
120
121pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface> {
123 pub para_id: ParaId,
124 pub client: Arc<Client>,
125 pub relay_chain_interface: RCInterface,
126 pub task_manager: &'a mut TaskManager,
127 pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
128 pub relay_chain_slot_duration: Duration,
129 pub import_queue: Box<dyn ImportQueueService<Block>>,
130 pub recovery_handle: Box<dyn RecoveryHandle>,
131 pub sync_service: Arc<SyncingService<Block>>,
132 pub prometheus_registry: Option<&'a Registry>,
133}
134
135#[deprecated = "use start_relay_chain_tasks instead"]
141pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner>(
142 StartCollatorParams {
143 block_status,
144 client,
145 announce_block,
146 spawner,
147 para_id,
148 task_manager,
149 relay_chain_interface,
150 parachain_consensus,
151 import_queue,
152 collator_key,
153 relay_chain_slot_duration,
154 recovery_handle,
155 sync_service,
156 prometheus_registry,
157 }: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner>,
158) -> sc_service::error::Result<()>
159where
160 Block: BlockT,
161 BS: BlockBackend<Block> + Send + Sync + 'static,
162 Client: Finalizer<Block, Backend>
163 + UsageProvider<Block>
164 + HeaderBackend<Block>
165 + Send
166 + Sync
167 + BlockBackend<Block>
168 + BlockchainEvents<Block>
169 + ProvideRuntimeApi<Block>
170 + 'static,
171 Client::Api: CollectCollationInfo<Block>,
172 for<'b> &'b Client: BlockImport<Block>,
173 Spawner: SpawnNamed + Clone + Send + Sync + 'static,
174 RCInterface: RelayChainInterface + Clone + 'static,
175 Backend: BackendT<Block> + 'static,
176{
177 let overseer_handle = relay_chain_interface
178 .overseer_handle()
179 .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
180
181 start_relay_chain_tasks(StartRelayChainTasksParams {
182 client: client.clone(),
183 announce_block: announce_block.clone(),
184 para_id,
185 task_manager,
186 da_recovery_profile: DARecoveryProfile::Collator,
187 relay_chain_interface,
188 import_queue,
189 relay_chain_slot_duration,
190 recovery_handle,
191 sync_service,
192 prometheus_registry,
193 })?;
194
195 #[allow(deprecated)]
196 cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams {
197 runtime_api: client,
198 block_status,
199 announce_block,
200 overseer_handle,
201 spawner,
202 para_id,
203 key: collator_key,
204 parachain_consensus,
205 })
206 .await;
207
208 Ok(())
209}
210
211pub fn start_relay_chain_tasks<Block, Client, Backend, RCInterface>(
221 StartRelayChainTasksParams {
222 client,
223 announce_block,
224 para_id,
225 task_manager,
226 da_recovery_profile,
227 relay_chain_interface,
228 import_queue,
229 relay_chain_slot_duration,
230 recovery_handle,
231 sync_service,
232 prometheus_registry,
233 }: StartRelayChainTasksParams<Block, Client, RCInterface>,
234) -> sc_service::error::Result<()>
235where
236 Block: BlockT,
237 Client: Finalizer<Block, Backend>
238 + UsageProvider<Block>
239 + HeaderBackend<Block>
240 + Send
241 + Sync
242 + BlockBackend<Block>
243 + BlockchainEvents<Block>
244 + 'static,
245 for<'a> &'a Client: BlockImport<Block>,
246 Backend: BackendT<Block> + 'static,
247 RCInterface: RelayChainInterface + Clone + 'static,
248{
249 let (recovery_chan_tx, recovery_chan_rx) = mpsc::channel(RECOVERY_CHAN_SIZE);
250
251 let consensus = cumulus_client_consensus_common::run_parachain_consensus(
252 para_id,
253 client.clone(),
254 relay_chain_interface.clone(),
255 announce_block.clone(),
256 Some(recovery_chan_tx),
257 );
258
259 task_manager
260 .spawn_essential_handle()
261 .spawn_blocking("cumulus-consensus", None, consensus);
262
263 let da_recovery_profile = match da_recovery_profile {
264 DARecoveryProfile::Collator => {
265 RecoveryDelayRange {
269 min: relay_chain_slot_duration / 2,
270 max: relay_chain_slot_duration,
271 }
272 },
273 DARecoveryProfile::FullNode => {
274 RecoveryDelayRange {
280 min: relay_chain_slot_duration * 25,
281 max: relay_chain_slot_duration * 50,
282 }
283 },
284 DARecoveryProfile::Other(profile) => profile,
285 };
286
287 let pov_recovery = PoVRecovery::new(
288 recovery_handle,
289 da_recovery_profile,
290 client.clone(),
291 import_queue,
292 relay_chain_interface.clone(),
293 para_id,
294 recovery_chan_rx,
295 sync_service.clone(),
296 );
297
298 task_manager
299 .spawn_essential_handle()
300 .spawn("cumulus-pov-recovery", None, pov_recovery.run());
301
302 let parachain_informant = parachain_informant::<Block, _>(
303 para_id,
304 relay_chain_interface.clone(),
305 client.clone(),
306 prometheus_registry.map(ParachainInformantMetrics::new).transpose()?,
307 );
308 task_manager
309 .spawn_handle()
310 .spawn("parachain-informant", None, parachain_informant);
311
312 Ok(())
313}
314
315#[deprecated = "use start_relay_chain_tasks instead"]
320pub fn start_full_node<Block, Client, Backend, RCInterface>(
321 StartFullNodeParams {
322 client,
323 announce_block,
324 task_manager,
325 relay_chain_interface,
326 para_id,
327 relay_chain_slot_duration,
328 import_queue,
329 recovery_handle,
330 sync_service,
331 prometheus_registry,
332 }: StartFullNodeParams<Block, Client, RCInterface>,
333) -> sc_service::error::Result<()>
334where
335 Block: BlockT,
336 Client: Finalizer<Block, Backend>
337 + UsageProvider<Block>
338 + HeaderBackend<Block>
339 + Send
340 + Sync
341 + BlockBackend<Block>
342 + BlockchainEvents<Block>
343 + 'static,
344 for<'a> &'a Client: BlockImport<Block>,
345 Backend: BackendT<Block> + 'static,
346 RCInterface: RelayChainInterface + Clone + 'static,
347{
348 start_relay_chain_tasks(StartRelayChainTasksParams {
349 client,
350 announce_block,
351 task_manager,
352 relay_chain_interface,
353 para_id,
354 relay_chain_slot_duration,
355 import_queue,
356 recovery_handle,
357 sync_service,
358 da_recovery_profile: DARecoveryProfile::FullNode,
359 prometheus_registry,
360 })
361}
362
363#[deprecated = "This is old consensus architecture only for backwards compatibility \
365 and will be removed in the future"]
366pub mod old_consensus {
367 #[allow(deprecated)]
368 pub use cumulus_client_collator::{start_collator, start_collator_sync, StartCollatorParams};
369}
370
371pub fn prepare_node_config(mut parachain_config: Configuration) -> Configuration {
378 parachain_config.announce_block = false;
379 parachain_config.network.min_peers_to_start_warp_sync = Some(1);
382
383 parachain_config
384}
385
386pub async fn build_relay_chain_interface(
390 relay_chain_config: Configuration,
391 parachain_config: &Configuration,
392 telemetry_worker_handle: Option<TelemetryWorkerHandle>,
393 task_manager: &mut TaskManager,
394 collator_options: CollatorOptions,
395 hwbench: Option<sc_sysinfo::HwBench>,
396) -> RelayChainResult<(
397 Arc<(dyn RelayChainInterface + 'static)>,
398 Option<CollatorPair>,
399 Arc<dyn NetworkService>,
400 async_channel::Receiver<IncomingRequest>,
401)> {
402 match collator_options.relay_chain_mode {
403 cumulus_client_cli::RelayChainMode::Embedded => build_inprocess_relay_chain(
404 relay_chain_config,
405 parachain_config,
406 telemetry_worker_handle,
407 task_manager,
408 hwbench,
409 ),
410 cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =>
411 build_minimal_relay_chain_node_with_rpc(
412 relay_chain_config,
413 parachain_config.prometheus_registry(),
414 task_manager,
415 rpc_target_urls,
416 )
417 .await,
418 }
419}
420
421pub enum CollatorSybilResistance {
424 Resistant,
428 Unresistant,
433}
434
435pub struct BuildNetworkParams<
437 'a,
438 Block: BlockT,
439 Client: ProvideRuntimeApi<Block>
440 + BlockBackend<Block>
441 + HeaderMetadata<Block, Error = sp_blockchain::Error>
442 + HeaderBackend<Block>
443 + BlockIdTo<Block>
444 + 'static,
445 Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
446 RCInterface,
447 IQ,
448> where
449 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
450{
451 pub parachain_config: &'a Configuration,
452 pub net_config:
453 sc_network::config::FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Network>,
454 pub client: Arc<Client>,
455 pub transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<Block, Client>>,
456 pub para_id: ParaId,
457 pub relay_chain_interface: RCInterface,
458 pub spawn_handle: SpawnTaskHandle,
459 pub import_queue: IQ,
460 pub sybil_resistance_level: CollatorSybilResistance,
461 pub metrics: sc_network::NotificationMetrics,
462}
463
464pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>(
466 BuildNetworkParams {
467 parachain_config,
468 net_config,
469 client,
470 transaction_pool,
471 para_id,
472 spawn_handle,
473 relay_chain_interface,
474 import_queue,
475 sybil_resistance_level,
476 metrics,
477 }: BuildNetworkParams<'a, Block, Client, Network, RCInterface, IQ>,
478) -> sc_service::error::Result<(
479 Arc<dyn NetworkService>,
480 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
481 TransactionsHandlerController<Block::Hash>,
482 Arc<SyncingService<Block>>,
483)>
484where
485 Block: BlockT,
486 Client: UsageProvider<Block>
487 + HeaderBackend<Block>
488 + sp_consensus::block_validation::Chain<Block>
489 + Send
490 + Sync
491 + BlockBackend<Block>
492 + BlockchainEvents<Block>
493 + ProvideRuntimeApi<Block>
494 + HeaderMetadata<Block, Error = sp_blockchain::Error>
495 + BlockIdTo<Block, Error = sp_blockchain::Error>
496 + ProofProvider<Block>
497 + 'static,
498 Client::Api: CollectCollationInfo<Block>
499 + sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
500 for<'b> &'b Client: BlockImport<Block>,
501 RCInterface: RelayChainInterface + Clone + 'static,
502 IQ: ImportQueue<Block> + 'static,
503 Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
504{
505 let warp_sync_config = match parachain_config.network.sync_mode {
506 SyncMode::Warp => {
507 log::debug!(target: LOG_TARGET_SYNC, "waiting for announce block...");
508
509 let target_block =
510 wait_for_finalized_para_head::<Block, _>(para_id, relay_chain_interface.clone())
511 .await
512 .inspect_err(|e| {
513 log::error!(
514 target: LOG_TARGET_SYNC,
515 "Unable to determine parachain target block {:?}",
516 e
517 );
518 })?;
519 Some(WarpSyncConfig::WithTarget(target_block))
520 },
521 _ => None,
522 };
523
524 let block_announce_validator = match sybil_resistance_level {
525 CollatorSybilResistance::Resistant => {
526 let block_announce_validator = AssumeSybilResistance::allow_seconded_messages();
527 Box::new(block_announce_validator) as Box<_>
528 },
529 CollatorSybilResistance::Unresistant => {
530 let block_announce_validator =
531 RequireSecondedInBlockAnnounce::new(relay_chain_interface, para_id);
532 Box::new(block_announce_validator) as Box<_>
533 },
534 };
535
536 sc_service::build_network(sc_service::BuildNetworkParams {
537 config: parachain_config,
538 net_config,
539 client,
540 transaction_pool,
541 spawn_handle,
542 import_queue,
543 block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)),
544 warp_sync_config,
545 block_relay: None,
546 metrics,
547 })
548}
549
550async fn wait_for_finalized_para_head<B, RCInterface>(
553 para_id: ParaId,
554 relay_chain_interface: RCInterface,
555) -> sc_service::error::Result<<B as BlockT>::Header>
556where
557 B: BlockT + 'static,
558 RCInterface: RelayChainInterface + Send + 'static,
559{
560 let mut imported_blocks = relay_chain_interface
561 .import_notification_stream()
562 .await
563 .map_err(|error| {
564 sc_service::Error::Other(format!(
565 "Relay chain import notification stream error when waiting for parachain head: \
566 {error}"
567 ))
568 })?
569 .fuse();
570 while imported_blocks.next().await.is_some() {
571 let is_syncing = relay_chain_interface
572 .is_major_syncing()
573 .await
574 .map_err(|e| format!("Unable to determine sync status: {e}"))?;
575
576 if !is_syncing {
577 let relay_chain_best_hash = relay_chain_interface
578 .finalized_block_hash()
579 .await
580 .map_err(|e| Box::new(e) as Box<_>)?;
581
582 let validation_data = relay_chain_interface
583 .persisted_validation_data(
584 relay_chain_best_hash,
585 para_id,
586 OccupiedCoreAssumption::TimedOut,
587 )
588 .await
589 .map_err(|e| format!("{e:?}"))?
590 .ok_or("Could not find parachain head in relay chain")?;
591
592 let finalized_header = B::Header::decode(&mut &validation_data.parent_head.0[..])
593 .map_err(|e| format!("Failed to decode parachain head: {e}"))?;
594
595 log::info!(
596 "๐ Received target parachain header #{} ({}) from the relay chain.",
597 finalized_header.number(),
598 finalized_header.hash()
599 );
600 return Ok(finalized_header)
601 }
602 }
603
604 Err("Stopping following imported blocks. Could not determine parachain target block".into())
605}
606
607async fn parachain_informant<Block: BlockT, Client>(
609 para_id: ParaId,
610 relay_chain_interface: impl RelayChainInterface + Clone,
611 client: Arc<Client>,
612 metrics: Option<ParachainInformantMetrics>,
613) where
614 Client: HeaderBackend<Block> + Send + Sync + 'static,
615{
616 let mut import_notifications = match relay_chain_interface.import_notification_stream().await {
617 Ok(import_notifications) => import_notifications,
618 Err(e) => {
619 log::error!("Failed to get import notification stream: {e:?}. Parachain informant will not run!");
620 return
621 },
622 };
623 let mut last_backed_block_time: Option<Instant> = None;
624 while let Some(n) = import_notifications.next().await {
625 let candidate_events = match relay_chain_interface.candidate_events(n.hash()).await {
626 Ok(candidate_events) => candidate_events,
627 Err(e) => {
628 log::warn!("Failed to get candidate events for block {}: {e:?}", n.hash());
629 continue
630 },
631 };
632 let mut backed_candidates = Vec::new();
633 let mut included_candidates = Vec::new();
634 let mut timed_out_candidates = Vec::new();
635 for event in candidate_events {
636 match event {
637 CandidateEvent::CandidateBacked(receipt, head, _, _) => {
638 if receipt.descriptor.para_id() != para_id {
639 continue;
640 }
641 let backed_block = match Block::Header::decode(&mut &head.0[..]) {
642 Ok(header) => header,
643 Err(e) => {
644 log::warn!(
645 "Failed to decode parachain header from backed block: {e:?}"
646 );
647 continue
648 },
649 };
650 let backed_block_time = Instant::now();
651 if let Some(last_backed_block_time) = &last_backed_block_time {
652 let duration = backed_block_time.duration_since(*last_backed_block_time);
653 if let Some(metrics) = &metrics {
654 metrics.parachain_block_backed_duration.observe(duration.as_secs_f64());
655 }
656 }
657 last_backed_block_time = Some(backed_block_time);
658 backed_candidates.push(backed_block);
659 },
660 CandidateEvent::CandidateIncluded(receipt, head, _, _) => {
661 if receipt.descriptor.para_id() != para_id {
662 continue;
663 }
664 let included_block = match Block::Header::decode(&mut &head.0[..]) {
665 Ok(header) => header,
666 Err(e) => {
667 log::warn!(
668 "Failed to decode parachain header from included block: {e:?}"
669 );
670 continue
671 },
672 };
673 let unincluded_segment_size =
674 client.info().best_number.saturating_sub(*included_block.number());
675 let unincluded_segment_size: u32 = unincluded_segment_size.saturated_into();
676 if let Some(metrics) = &metrics {
677 metrics.unincluded_segment_size.observe(unincluded_segment_size.into());
678 }
679 included_candidates.push(included_block);
680 },
681 CandidateEvent::CandidateTimedOut(receipt, head, _) => {
682 if receipt.descriptor.para_id() != para_id {
683 continue;
684 }
685 let timed_out_block = match Block::Header::decode(&mut &head.0[..]) {
686 Ok(header) => header,
687 Err(e) => {
688 log::warn!(
689 "Failed to decode parachain header from timed out block: {e:?}"
690 );
691 continue
692 },
693 };
694 timed_out_candidates.push(timed_out_block);
695 },
696 }
697 }
698 let mut log_parts = Vec::new();
699 if !backed_candidates.is_empty() {
700 let backed_candidates = backed_candidates
701 .into_iter()
702 .map(|c| format!("#{} ({})", c.number(), c.hash()))
703 .collect::<Vec<_>>()
704 .join(", ");
705 log_parts.push(format!("backed: {}", backed_candidates));
706 };
707 if !included_candidates.is_empty() {
708 let included_candidates = included_candidates
709 .into_iter()
710 .map(|c| format!("#{} ({})", c.number(), c.hash()))
711 .collect::<Vec<_>>()
712 .join(", ");
713 log_parts.push(format!("included: {}", included_candidates));
714 };
715 if !timed_out_candidates.is_empty() {
716 let timed_out_candidates = timed_out_candidates
717 .into_iter()
718 .map(|c| format!("#{} ({})", c.number(), c.hash()))
719 .collect::<Vec<_>>()
720 .join(", ");
721 log_parts.push(format!("timed out: {}", timed_out_candidates));
722 };
723 if !log_parts.is_empty() {
724 log::info!(
725 "Update at relay chain block #{} ({}) - {}",
726 n.number(),
727 n.hash(),
728 log_parts.join(", ")
729 );
730 }
731 }
732}
733
734struct ParachainInformantMetrics {
735 parachain_block_backed_duration: Histogram,
737 unincluded_segment_size: Histogram,
739}
740
741impl ParachainInformantMetrics {
742 fn new(prometheus_registry: &Registry) -> prometheus::Result<Self> {
743 let parachain_block_authorship_duration = Histogram::with_opts(HistogramOpts::new(
744 "parachain_block_backed_duration",
745 "Time between parachain blocks getting backed by the relaychain",
746 ))?;
747 prometheus_registry.register(Box::new(parachain_block_authorship_duration.clone()))?;
748
749 let unincluded_segment_size = Histogram::with_opts(
750 HistogramOpts::new(
751 "parachain_unincluded_segment_size",
752 "Number of blocks between best block and last included block",
753 )
754 .buckets((0..=24).into_iter().map(|i| i as f64).collect()),
755 )?;
756 prometheus_registry.register(Box::new(unincluded_segment_size.clone()))?;
757
758 Ok(Self {
759 parachain_block_backed_duration: parachain_block_authorship_duration,
760 unincluded_segment_size,
761 })
762 }
763}