1use cumulus_client_cli::CollatorOptions;
23use cumulus_client_consensus_common::ParachainConsensus;
24use cumulus_client_network::{AssumeSybilResistance, RequireSecondedInBlockAnnounce};
25use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelayRange, RecoveryHandle};
26use cumulus_primitives_core::{CollectCollationInfo, ParaId};
27pub use cumulus_primitives_proof_size_hostfunction::storage_proof_size;
28use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
29use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
30use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
31use futures::{channel::mpsc, StreamExt};
32use polkadot_primitives::{CandidateEvent, CollatorPair, OccupiedCoreAssumption};
33use prometheus::{Histogram, HistogramOpts, Registry};
34use sc_client_api::{
35 Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, ProofProvider, UsageProvider,
36};
37use sc_consensus::{
38 import_queue::{ImportQueue, ImportQueueService},
39 BlockImport,
40};
41use sc_network::{
42 config::SyncMode, request_responses::IncomingRequest, service::traits::NetworkService,
43 NetworkBackend,
44};
45use sc_network_sync::SyncingService;
46use sc_network_transactions::TransactionsHandlerController;
47use sc_service::{Configuration, SpawnTaskHandle, TaskManager, WarpSyncConfig};
48use sc_telemetry::{log, TelemetryWorkerHandle};
49use sc_utils::mpsc::TracingUnboundedSender;
50use sp_api::ProvideRuntimeApi;
51use sp_blockchain::{HeaderBackend, HeaderMetadata};
52use sp_core::Decode;
53use sp_runtime::{
54 traits::{Block as BlockT, BlockIdTo, Header},
55 SaturatedConversion, Saturating,
56};
57use std::{
58 sync::Arc,
59 time::{Duration, Instant},
60};
61
62pub type ParachainHostFunctions = (
67 cumulus_primitives_proof_size_hostfunction::storage_proof_size::HostFunctions,
68 sp_io::SubstrateHostFunctions,
69);
70
71const RECOVERY_CHAN_SIZE: usize = 8;
75const LOG_TARGET_SYNC: &str = "sync::cumulus";
76
77pub enum DARecoveryProfile {
80 Collator,
82 FullNode,
85 Other(RecoveryDelayRange),
87}
88
89pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawner> {
90 pub block_status: Arc<BS>,
91 pub client: Arc<Client>,
92 pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
93 pub spawner: Spawner,
94 pub para_id: ParaId,
95 pub relay_chain_interface: RCInterface,
96 pub task_manager: &'a mut TaskManager,
97 pub parachain_consensus: Box<dyn ParachainConsensus<Block>>,
98 pub import_queue: Box<dyn ImportQueueService<Block>>,
99 pub collator_key: CollatorPair,
100 pub relay_chain_slot_duration: Duration,
101 pub recovery_handle: Box<dyn RecoveryHandle>,
102 pub sync_service: Arc<SyncingService<Block>>,
103 pub prometheus_registry: Option<&'a Registry>,
104}
105
106pub struct StartRelayChainTasksParams<'a, Block: BlockT, Client, RCInterface> {
108 pub client: Arc<Client>,
109 pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
110 pub para_id: ParaId,
111 pub relay_chain_interface: RCInterface,
112 pub task_manager: &'a mut TaskManager,
113 pub da_recovery_profile: DARecoveryProfile,
114 pub import_queue: Box<dyn ImportQueueService<Block>>,
115 pub relay_chain_slot_duration: Duration,
116 pub recovery_handle: Box<dyn RecoveryHandle>,
117 pub sync_service: Arc<SyncingService<Block>>,
118 pub prometheus_registry: Option<&'a Registry>,
119}
120
121pub fn start_relay_chain_tasks<Block, Client, Backend, RCInterface>(
131 StartRelayChainTasksParams {
132 client,
133 announce_block,
134 para_id,
135 task_manager,
136 da_recovery_profile,
137 relay_chain_interface,
138 import_queue,
139 relay_chain_slot_duration,
140 recovery_handle,
141 sync_service,
142 prometheus_registry,
143 }: StartRelayChainTasksParams<Block, Client, RCInterface>,
144) -> sc_service::error::Result<()>
145where
146 Block: BlockT,
147 Client: Finalizer<Block, Backend>
148 + UsageProvider<Block>
149 + HeaderBackend<Block>
150 + Send
151 + Sync
152 + BlockBackend<Block>
153 + BlockchainEvents<Block>
154 + 'static,
155 for<'a> &'a Client: BlockImport<Block>,
156 Backend: BackendT<Block> + 'static,
157 RCInterface: RelayChainInterface + Clone + 'static,
158{
159 let (recovery_chan_tx, recovery_chan_rx) = mpsc::channel(RECOVERY_CHAN_SIZE);
160
161 let consensus = cumulus_client_consensus_common::run_parachain_consensus(
162 para_id,
163 client.clone(),
164 relay_chain_interface.clone(),
165 announce_block.clone(),
166 Some(recovery_chan_tx),
167 );
168
169 task_manager
170 .spawn_essential_handle()
171 .spawn_blocking("cumulus-consensus", None, consensus);
172
173 let da_recovery_profile = match da_recovery_profile {
174 DARecoveryProfile::Collator => {
175 RecoveryDelayRange {
179 min: relay_chain_slot_duration / 2,
180 max: relay_chain_slot_duration,
181 }
182 },
183 DARecoveryProfile::FullNode => {
184 RecoveryDelayRange {
190 min: relay_chain_slot_duration * 25,
191 max: relay_chain_slot_duration * 50,
192 }
193 },
194 DARecoveryProfile::Other(profile) => profile,
195 };
196
197 let pov_recovery = PoVRecovery::new(
198 recovery_handle,
199 da_recovery_profile,
200 client.clone(),
201 import_queue,
202 relay_chain_interface.clone(),
203 para_id,
204 recovery_chan_rx,
205 sync_service.clone(),
206 );
207
208 task_manager
209 .spawn_essential_handle()
210 .spawn("cumulus-pov-recovery", None, pov_recovery.run());
211
212 let parachain_informant = parachain_informant::<Block, _>(
213 para_id,
214 relay_chain_interface.clone(),
215 client.clone(),
216 prometheus_registry.map(ParachainInformantMetrics::new).transpose()?,
217 );
218 task_manager
219 .spawn_handle()
220 .spawn("parachain-informant", None, parachain_informant);
221
222 Ok(())
223}
224
225pub fn prepare_node_config(mut parachain_config: Configuration) -> Configuration {
232 parachain_config.announce_block = false;
233 parachain_config.network.min_peers_to_start_warp_sync = Some(1);
236
237 parachain_config
238}
239
240pub async fn build_relay_chain_interface(
244 relay_chain_config: Configuration,
245 parachain_config: &Configuration,
246 telemetry_worker_handle: Option<TelemetryWorkerHandle>,
247 task_manager: &mut TaskManager,
248 collator_options: CollatorOptions,
249 hwbench: Option<sc_sysinfo::HwBench>,
250) -> RelayChainResult<(
251 Arc<(dyn RelayChainInterface + 'static)>,
252 Option<CollatorPair>,
253 Arc<dyn NetworkService>,
254 async_channel::Receiver<IncomingRequest>,
255)> {
256 match collator_options.relay_chain_mode {
257 cumulus_client_cli::RelayChainMode::Embedded => build_inprocess_relay_chain(
258 relay_chain_config,
259 parachain_config,
260 telemetry_worker_handle,
261 task_manager,
262 hwbench,
263 ),
264 cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =>
265 build_minimal_relay_chain_node_with_rpc(
266 relay_chain_config,
267 parachain_config.prometheus_registry(),
268 task_manager,
269 rpc_target_urls,
270 )
271 .await,
272 }
273}
274
275pub enum CollatorSybilResistance {
278 Resistant,
282 Unresistant,
287}
288
289pub struct BuildNetworkParams<
291 'a,
292 Block: BlockT,
293 Client: ProvideRuntimeApi<Block>
294 + BlockBackend<Block>
295 + HeaderMetadata<Block, Error = sp_blockchain::Error>
296 + HeaderBackend<Block>
297 + BlockIdTo<Block>
298 + 'static,
299 Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
300 RCInterface,
301 IQ,
302> where
303 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
304{
305 pub parachain_config: &'a Configuration,
306 pub net_config:
307 sc_network::config::FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Network>,
308 pub client: Arc<Client>,
309 pub transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<Block, Client>>,
310 pub para_id: ParaId,
311 pub relay_chain_interface: RCInterface,
312 pub spawn_handle: SpawnTaskHandle,
313 pub import_queue: IQ,
314 pub sybil_resistance_level: CollatorSybilResistance,
315 pub metrics: sc_network::NotificationMetrics,
316}
317
318pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>(
320 BuildNetworkParams {
321 parachain_config,
322 net_config,
323 client,
324 transaction_pool,
325 para_id,
326 spawn_handle,
327 relay_chain_interface,
328 import_queue,
329 sybil_resistance_level,
330 metrics,
331 }: BuildNetworkParams<'a, Block, Client, Network, RCInterface, IQ>,
332) -> sc_service::error::Result<(
333 Arc<dyn NetworkService>,
334 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
335 TransactionsHandlerController<Block::Hash>,
336 Arc<SyncingService<Block>>,
337)>
338where
339 Block: BlockT,
340 Client: UsageProvider<Block>
341 + HeaderBackend<Block>
342 + sp_consensus::block_validation::Chain<Block>
343 + Send
344 + Sync
345 + BlockBackend<Block>
346 + BlockchainEvents<Block>
347 + ProvideRuntimeApi<Block>
348 + HeaderMetadata<Block, Error = sp_blockchain::Error>
349 + BlockIdTo<Block, Error = sp_blockchain::Error>
350 + ProofProvider<Block>
351 + 'static,
352 Client::Api: CollectCollationInfo<Block>
353 + sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
354 for<'b> &'b Client: BlockImport<Block>,
355 RCInterface: RelayChainInterface + Clone + 'static,
356 IQ: ImportQueue<Block> + 'static,
357 Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
358{
359 let warp_sync_config = match parachain_config.network.sync_mode {
360 SyncMode::Warp => {
361 log::debug!(target: LOG_TARGET_SYNC, "waiting for announce block...");
362
363 let target_block =
364 wait_for_finalized_para_head::<Block, _>(para_id, relay_chain_interface.clone())
365 .await
366 .inspect_err(|e| {
367 log::error!(
368 target: LOG_TARGET_SYNC,
369 "Unable to determine parachain target block {:?}",
370 e
371 );
372 })?;
373 Some(WarpSyncConfig::WithTarget(target_block))
374 },
375 _ => None,
376 };
377
378 let block_announce_validator = match sybil_resistance_level {
379 CollatorSybilResistance::Resistant => {
380 let block_announce_validator = AssumeSybilResistance::allow_seconded_messages();
381 Box::new(block_announce_validator) as Box<_>
382 },
383 CollatorSybilResistance::Unresistant => {
384 let block_announce_validator =
385 RequireSecondedInBlockAnnounce::new(relay_chain_interface, para_id);
386 Box::new(block_announce_validator) as Box<_>
387 },
388 };
389
390 sc_service::build_network(sc_service::BuildNetworkParams {
391 config: parachain_config,
392 net_config,
393 client,
394 transaction_pool,
395 spawn_handle,
396 import_queue,
397 block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)),
398 warp_sync_config,
399 block_relay: None,
400 metrics,
401 })
402}
403
404async fn wait_for_finalized_para_head<B, RCInterface>(
407 para_id: ParaId,
408 relay_chain_interface: RCInterface,
409) -> sc_service::error::Result<<B as BlockT>::Header>
410where
411 B: BlockT + 'static,
412 RCInterface: RelayChainInterface + Send + 'static,
413{
414 let mut imported_blocks = relay_chain_interface
415 .import_notification_stream()
416 .await
417 .map_err(|error| {
418 sc_service::Error::Other(format!(
419 "Relay chain import notification stream error when waiting for parachain head: \
420 {error}"
421 ))
422 })?
423 .fuse();
424 while imported_blocks.next().await.is_some() {
425 let is_syncing = relay_chain_interface
426 .is_major_syncing()
427 .await
428 .map_err(|e| format!("Unable to determine sync status: {e}"))?;
429
430 if !is_syncing {
431 let relay_chain_best_hash = relay_chain_interface
432 .finalized_block_hash()
433 .await
434 .map_err(|e| Box::new(e) as Box<_>)?;
435
436 let validation_data = relay_chain_interface
437 .persisted_validation_data(
438 relay_chain_best_hash,
439 para_id,
440 OccupiedCoreAssumption::TimedOut,
441 )
442 .await
443 .map_err(|e| format!("{e:?}"))?
444 .ok_or("Could not find parachain head in relay chain")?;
445
446 let finalized_header = B::Header::decode(&mut &validation_data.parent_head.0[..])
447 .map_err(|e| format!("Failed to decode parachain head: {e}"))?;
448
449 log::info!(
450 "๐ Received target parachain header #{} ({}) from the relay chain.",
451 finalized_header.number(),
452 finalized_header.hash()
453 );
454 return Ok(finalized_header)
455 }
456 }
457
458 Err("Stopping following imported blocks. Could not determine parachain target block".into())
459}
460
461async fn parachain_informant<Block: BlockT, Client>(
463 para_id: ParaId,
464 relay_chain_interface: impl RelayChainInterface + Clone,
465 client: Arc<Client>,
466 metrics: Option<ParachainInformantMetrics>,
467) where
468 Client: HeaderBackend<Block> + Send + Sync + 'static,
469{
470 let mut import_notifications = match relay_chain_interface.import_notification_stream().await {
471 Ok(import_notifications) => import_notifications,
472 Err(e) => {
473 log::error!("Failed to get import notification stream: {e:?}. Parachain informant will not run!");
474 return
475 },
476 };
477 let mut last_backed_block_time: Option<Instant> = None;
478 while let Some(n) = import_notifications.next().await {
479 let candidate_events = match relay_chain_interface.candidate_events(n.hash()).await {
480 Ok(candidate_events) => candidate_events,
481 Err(e) => {
482 log::warn!("Failed to get candidate events for block {}: {e:?}", n.hash());
483 continue
484 },
485 };
486 let mut backed_candidates = Vec::new();
487 let mut included_candidates = Vec::new();
488 let mut timed_out_candidates = Vec::new();
489 for event in candidate_events {
490 match event {
491 CandidateEvent::CandidateBacked(receipt, head, _, _) => {
492 if receipt.descriptor.para_id() != para_id {
493 continue;
494 }
495 let backed_block = match Block::Header::decode(&mut &head.0[..]) {
496 Ok(header) => header,
497 Err(e) => {
498 log::warn!(
499 "Failed to decode parachain header from backed block: {e:?}"
500 );
501 continue
502 },
503 };
504 let backed_block_time = Instant::now();
505 if let Some(last_backed_block_time) = &last_backed_block_time {
506 let duration = backed_block_time.duration_since(*last_backed_block_time);
507 if let Some(metrics) = &metrics {
508 metrics.parachain_block_backed_duration.observe(duration.as_secs_f64());
509 }
510 }
511 last_backed_block_time = Some(backed_block_time);
512 backed_candidates.push(backed_block);
513 },
514 CandidateEvent::CandidateIncluded(receipt, head, _, _) => {
515 if receipt.descriptor.para_id() != para_id {
516 continue;
517 }
518 let included_block = match Block::Header::decode(&mut &head.0[..]) {
519 Ok(header) => header,
520 Err(e) => {
521 log::warn!(
522 "Failed to decode parachain header from included block: {e:?}"
523 );
524 continue
525 },
526 };
527 let unincluded_segment_size =
528 client.info().best_number.saturating_sub(*included_block.number());
529 let unincluded_segment_size: u32 = unincluded_segment_size.saturated_into();
530 if let Some(metrics) = &metrics {
531 metrics.unincluded_segment_size.observe(unincluded_segment_size.into());
532 }
533 included_candidates.push(included_block);
534 },
535 CandidateEvent::CandidateTimedOut(receipt, head, _) => {
536 if receipt.descriptor.para_id() != para_id {
537 continue;
538 }
539 let timed_out_block = match Block::Header::decode(&mut &head.0[..]) {
540 Ok(header) => header,
541 Err(e) => {
542 log::warn!(
543 "Failed to decode parachain header from timed out block: {e:?}"
544 );
545 continue
546 },
547 };
548 timed_out_candidates.push(timed_out_block);
549 },
550 }
551 }
552 let mut log_parts = Vec::new();
553 if !backed_candidates.is_empty() {
554 let backed_candidates = backed_candidates
555 .into_iter()
556 .map(|c| format!("#{} ({})", c.number(), c.hash()))
557 .collect::<Vec<_>>()
558 .join(", ");
559 log_parts.push(format!("backed: {}", backed_candidates));
560 };
561 if !included_candidates.is_empty() {
562 let included_candidates = included_candidates
563 .into_iter()
564 .map(|c| format!("#{} ({})", c.number(), c.hash()))
565 .collect::<Vec<_>>()
566 .join(", ");
567 log_parts.push(format!("included: {}", included_candidates));
568 };
569 if !timed_out_candidates.is_empty() {
570 let timed_out_candidates = timed_out_candidates
571 .into_iter()
572 .map(|c| format!("#{} ({})", c.number(), c.hash()))
573 .collect::<Vec<_>>()
574 .join(", ");
575 log_parts.push(format!("timed out: {}", timed_out_candidates));
576 };
577 if !log_parts.is_empty() {
578 log::info!(
579 "Update at relay chain block #{} ({}) - {}",
580 n.number(),
581 n.hash(),
582 log_parts.join(", ")
583 );
584 }
585 }
586}
587
588struct ParachainInformantMetrics {
589 parachain_block_backed_duration: Histogram,
591 unincluded_segment_size: Histogram,
593}
594
595impl ParachainInformantMetrics {
596 fn new(prometheus_registry: &Registry) -> prometheus::Result<Self> {
597 let parachain_block_authorship_duration = Histogram::with_opts(HistogramOpts::new(
598 "parachain_block_backed_duration",
599 "Time between parachain blocks getting backed by the relaychain",
600 ))?;
601 prometheus_registry.register(Box::new(parachain_block_authorship_duration.clone()))?;
602
603 let unincluded_segment_size = Histogram::with_opts(
604 HistogramOpts::new(
605 "parachain_unincluded_segment_size",
606 "Number of blocks between best block and last included block",
607 )
608 .buckets((0..=24).into_iter().map(|i| i as f64).collect()),
609 )?;
610 prometheus_registry.register(Box::new(unincluded_segment_size.clone()))?;
611
612 Ok(Self {
613 parachain_block_backed_duration: parachain_block_authorship_duration,
614 unincluded_segment_size,
615 })
616 }
617}