1use crate::{
18 chain_spec::Extensions,
19 common::{
20 command::NodeCommandRunner,
21 rpc::BuildRpcExtensions,
22 statement_store::{build_statement_store, new_statement_handler_proto},
23 types::{
24 ParachainBackend, ParachainBlockImport, ParachainClient, ParachainHostFunctions,
25 ParachainService,
26 },
27 ConstructNodeRuntimeApi, NodeBlock, NodeExtraArgs,
28 },
29};
30use cumulus_client_bootnodes::{start_bootnode_tasks, StartBootnodeTasksParams};
31use cumulus_client_cli::CollatorOptions;
32use cumulus_client_service::{
33 build_network, build_relay_chain_interface, prepare_node_config, start_relay_chain_tasks,
34 BuildNetworkParams, CollatorSybilResistance, DARecoveryProfile, StartRelayChainTasksParams,
35};
36use cumulus_primitives_core::{BlockT, GetParachainInfo, ParaId};
37use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface};
38use futures::FutureExt;
39use log::info;
40use parachains_common::Hash;
41use polkadot_primitives::CollatorPair;
42use prometheus_endpoint::Registry;
43use sc_client_api::Backend;
44use sc_consensus::DefaultImportQueue;
45use sc_executor::{HeapAllocStrategy, DEFAULT_HEAP_ALLOC_STRATEGY};
46use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NetworkBlock};
47use sc_service::{Configuration, ImportQueue, PartialComponents, TaskManager};
48use sc_statement_store::Store;
49use sc_sysinfo::HwBench;
50use sc_telemetry::{TelemetryHandle, TelemetryWorker};
51use sc_tracing::tracing::Instrument;
52use sc_transaction_pool::TransactionPoolHandle;
53use sc_transaction_pool_api::OffchainTransactionPoolFactory;
54use sp_api::{ApiExt, ProvideRuntimeApi};
55use sp_keystore::KeystorePtr;
56use sp_runtime::traits::AccountIdConversion;
57use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
58
59pub(crate) trait BuildImportQueue<
60 Block: BlockT,
61 RuntimeApi,
62 BlockImport: sc_consensus::BlockImport<Block>,
63>
64{
65 fn build_import_queue(
66 client: Arc<ParachainClient<Block, RuntimeApi>>,
67 block_import: ParachainBlockImport<Block, BlockImport>,
68 config: &Configuration,
69 telemetry_handle: Option<TelemetryHandle>,
70 task_manager: &TaskManager,
71 ) -> sc_service::error::Result<DefaultImportQueue<Block>>;
72}
73
74pub(crate) trait StartConsensus<Block: BlockT, RuntimeApi, BI, BIAuxiliaryData>
75where
76 RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
77{
78 fn start_consensus(
79 client: Arc<ParachainClient<Block, RuntimeApi>>,
80 block_import: ParachainBlockImport<Block, BI>,
81 prometheus_registry: Option<&Registry>,
82 telemetry: Option<TelemetryHandle>,
83 task_manager: &TaskManager,
84 relay_chain_interface: Arc<dyn RelayChainInterface>,
85 transaction_pool: Arc<TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>>,
86 keystore: KeystorePtr,
87 relay_chain_slot_duration: Duration,
88 para_id: ParaId,
89 collator_key: CollatorPair,
90 overseer_handle: OverseerHandle,
91 announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
92 backend: Arc<ParachainBackend<Block>>,
93 node_extra_args: NodeExtraArgs,
94 block_import_extra_return_value: BIAuxiliaryData,
95 ) -> Result<(), sc_service::Error>;
96}
97
98fn warn_if_slow_hardware(hwbench: &sc_sysinfo::HwBench) {
100 if let Err(err) =
103 frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench, false)
104 {
105 log::warn!(
106 "⚠️ The hardware does not meet the minimal requirements {} for role 'Authority' find out more at:\n\
107 https://wiki.polkadot.network/docs/maintain-guides-how-to-validate-polkadot#reference-hardware",
108 err
109 );
110 }
111}
112
113pub(crate) trait InitBlockImport<Block: BlockT, RuntimeApi> {
114 type BlockImport: sc_consensus::BlockImport<Block> + Clone + Send + Sync;
115 type BlockImportAuxiliaryData;
116
117 fn init_block_import(
118 client: Arc<ParachainClient<Block, RuntimeApi>>,
119 ) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)>;
120}
121
122pub(crate) struct ClientBlockImport;
123
124impl<Block: BlockT, RuntimeApi> InitBlockImport<Block, RuntimeApi> for ClientBlockImport
125where
126 RuntimeApi: Send + ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
127{
128 type BlockImport = Arc<ParachainClient<Block, RuntimeApi>>;
129 type BlockImportAuxiliaryData = ();
130
131 fn init_block_import(
132 client: Arc<ParachainClient<Block, RuntimeApi>>,
133 ) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)> {
134 Ok((client.clone(), ()))
135 }
136}
137
138pub(crate) trait BaseNodeSpec {
139 type Block: NodeBlock;
140
141 type RuntimeApi: ConstructNodeRuntimeApi<
142 Self::Block,
143 ParachainClient<Self::Block, Self::RuntimeApi>,
144 >;
145
146 type BuildImportQueue: BuildImportQueue<
147 Self::Block,
148 Self::RuntimeApi,
149 <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport,
150 >;
151
152 type InitBlockImport: self::InitBlockImport<Self::Block, Self::RuntimeApi>;
153
154 fn parachain_id(
156 client: &ParachainClient<Self::Block, Self::RuntimeApi>,
157 parachain_config: &Configuration,
158 ) -> Option<ParaId> {
159 let best_hash = client.chain_info().best_hash;
160 let para_id = if client
161 .runtime_api()
162 .has_api::<dyn GetParachainInfo<Self::Block>>(best_hash)
163 .ok()
164 .filter(|has_api| *has_api)
165 .is_some()
166 {
167 client
168 .runtime_api()
169 .parachain_id(best_hash)
170 .inspect_err(|err| {
171 log::error!(
172 "`cumulus_primitives_core::GetParachainInfo` runtime API call errored with {}",
173 err
174 );
175 })
176 .ok()?
177 } else {
178 ParaId::from(
179 Extensions::try_get(&*parachain_config.chain_spec).and_then(|ext| ext.para_id())?,
180 )
181 };
182
183 let parachain_account =
184 AccountIdConversion::<polkadot_primitives::AccountId>::into_account_truncating(
185 ¶_id,
186 );
187
188 info!("🪪 Parachain id: {:?}", para_id);
189 info!("🧾 Parachain Account: {}", parachain_account);
190
191 Some(para_id)
192 }
193
194 fn new_partial(
199 config: &Configuration,
200 ) -> sc_service::error::Result<
201 ParachainService<
202 Self::Block,
203 Self::RuntimeApi,
204 <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport,
205 <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImportAuxiliaryData
206 >
207 >{
208 let telemetry = config
209 .telemetry_endpoints
210 .clone()
211 .filter(|x| !x.is_empty())
212 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
213 let worker = TelemetryWorker::new(16)?;
214 let telemetry = worker.handle().new_telemetry(endpoints);
215 Ok((worker, telemetry))
216 })
217 .transpose()?;
218
219 let heap_pages =
220 config.executor.default_heap_pages.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| {
221 HeapAllocStrategy::Static { extra_pages: h as _ }
222 });
223
224 let executor = sc_executor::WasmExecutor::<ParachainHostFunctions>::builder()
225 .with_execution_method(config.executor.wasm_method)
226 .with_max_runtime_instances(config.executor.max_runtime_instances)
227 .with_runtime_cache_size(config.executor.runtime_cache_size)
228 .with_onchain_heap_alloc_strategy(heap_pages)
229 .with_offchain_heap_alloc_strategy(heap_pages)
230 .build();
231
232 let (client, backend, keystore_container, task_manager) =
233 sc_service::new_full_parts_record_import::<Self::Block, Self::RuntimeApi, _>(
234 config,
235 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
236 executor,
237 true,
238 )?;
239 let client = Arc::new(client);
240
241 let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
242
243 let telemetry = telemetry.map(|(worker, telemetry)| {
244 task_manager.spawn_handle().spawn("telemetry", None, worker.run());
245 telemetry
246 });
247
248 let transaction_pool = Arc::from(
249 sc_transaction_pool::Builder::new(
250 task_manager.spawn_essential_handle(),
251 client.clone(),
252 config.role.is_authority().into(),
253 )
254 .with_options(config.transaction_pool.clone())
255 .with_prometheus(config.prometheus_registry())
256 .build(),
257 );
258
259 let (block_import, block_import_auxiliary_data) =
260 Self::InitBlockImport::init_block_import(client.clone())?;
261
262 let block_import = ParachainBlockImport::new(block_import, backend.clone());
263
264 let import_queue = Self::BuildImportQueue::build_import_queue(
265 client.clone(),
266 block_import.clone(),
267 config,
268 telemetry.as_ref().map(|telemetry| telemetry.handle()),
269 &task_manager,
270 )?;
271
272 Ok(PartialComponents {
273 backend,
274 client,
275 import_queue,
276 keystore_container,
277 task_manager,
278 transaction_pool,
279 select_chain: (),
280 other: (block_import, telemetry, telemetry_worker_handle, block_import_auxiliary_data),
281 })
282 }
283}
284
285pub(crate) trait NodeSpec: BaseNodeSpec {
286 type BuildRpcExtensions: BuildRpcExtensions<
287 ParachainClient<Self::Block, Self::RuntimeApi>,
288 ParachainBackend<Self::Block>,
289 TransactionPoolHandle<Self::Block, ParachainClient<Self::Block, Self::RuntimeApi>>,
290 Store,
291 >;
292
293 type StartConsensus: StartConsensus<
294 Self::Block,
295 Self::RuntimeApi,
296 <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport,
297 <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImportAuxiliaryData,
298 >;
299
300 const SYBIL_RESISTANCE: CollatorSybilResistance;
301
302 fn start_node<Net>(
306 parachain_config: Configuration,
307 polkadot_config: Configuration,
308 collator_options: CollatorOptions,
309 hwbench: Option<sc_sysinfo::HwBench>,
310 node_extra_args: NodeExtraArgs,
311 ) -> Pin<Box<dyn Future<Output = sc_service::error::Result<TaskManager>>>>
312 where
313 Net: NetworkBackend<Self::Block, Hash>,
314 {
315 let fut = async move {
316 let parachain_config = prepare_node_config(parachain_config);
317 let parachain_public_addresses = parachain_config.network.public_addresses.clone();
318 let parachain_fork_id = parachain_config.chain_spec.fork_id().map(ToString::to_string);
319 let advertise_non_global_ips = parachain_config.network.allow_non_globals_in_dht;
320 let params = Self::new_partial(¶chain_config)?;
321 let (block_import, mut telemetry, telemetry_worker_handle, block_import_auxiliary_data) =
322 params.other;
323 let client = params.client.clone();
324 let backend = params.backend.clone();
325 let mut task_manager = params.task_manager;
326
327 let para_id = Self::parachain_id(&client, ¶chain_config)
329 .ok_or("Failed to retrieve the parachain id")?;
330 let relay_chain_fork_id = polkadot_config.chain_spec.fork_id().map(ToString::to_string);
331 let (relay_chain_interface, collator_key, relay_chain_network, paranode_rx) =
332 build_relay_chain_interface(
333 polkadot_config,
334 ¶chain_config,
335 telemetry_worker_handle,
336 &mut task_manager,
337 collator_options.clone(),
338 hwbench.clone(),
339 )
340 .await
341 .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
342
343 let validator = parachain_config.role.is_authority();
344 let prometheus_registry = parachain_config.prometheus_registry().cloned();
345 let transaction_pool = params.transaction_pool.clone();
346 let import_queue_service = params.import_queue.service();
347 let mut net_config = FullNetworkConfiguration::<_, _, Net>::new(
348 ¶chain_config.network,
349 prometheus_registry.clone(),
350 );
351
352 let metrics = Net::register_notification_metrics(
353 parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
354 );
355
356 let statement_handler_proto = node_extra_args.enable_statement_store.then(|| {
357 new_statement_handler_proto(&*client, ¶chain_config, &metrics, &mut net_config)
358 });
359
360 let (network, system_rpc_tx, tx_handler_controller, sync_service) =
361 build_network(BuildNetworkParams {
362 parachain_config: ¶chain_config,
363 net_config,
364 client: client.clone(),
365 transaction_pool: transaction_pool.clone(),
366 para_id,
367 spawn_handle: task_manager.spawn_handle(),
368 relay_chain_interface: relay_chain_interface.clone(),
369 import_queue: params.import_queue,
370 sybil_resistance_level: Self::SYBIL_RESISTANCE,
371 metrics,
372 })
373 .await?;
374
375 let statement_store = statement_handler_proto
376 .map(|statement_handler_proto| {
377 build_statement_store(
378 ¶chain_config,
379 &mut task_manager,
380 client.clone(),
381 network.clone(),
382 sync_service.clone(),
383 params.keystore_container.local_keystore(),
384 statement_handler_proto,
385 )
386 })
387 .transpose()?;
388
389 if parachain_config.offchain_worker.enabled {
390 let custom_extensions = {
391 let statement_store = statement_store.clone();
392 move |_hash| {
393 if let Some(statement_store) = &statement_store {
394 vec![Box::new(statement_store.clone().as_statement_store_ext())
395 as Box<_>]
396 } else {
397 vec![]
398 }
399 }
400 };
401
402 let offchain_workers =
403 sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
404 runtime_api_provider: client.clone(),
405 keystore: Some(params.keystore_container.keystore()),
406 offchain_db: backend.offchain_storage(),
407 transaction_pool: Some(OffchainTransactionPoolFactory::new(
408 transaction_pool.clone(),
409 )),
410 network_provider: Arc::new(network.clone()),
411 is_validator: parachain_config.role.is_authority(),
412 enable_http_requests: true,
413 custom_extensions,
414 })?;
415 task_manager.spawn_handle().spawn(
416 "offchain-workers-runner",
417 "offchain-work",
418 offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(),
419 );
420 }
421
422 let rpc_builder = {
423 let client = client.clone();
424 let transaction_pool = transaction_pool.clone();
425 let backend_for_rpc = backend.clone();
426 let statement_store = statement_store.clone();
427
428 Box::new(move |_| {
429 Self::BuildRpcExtensions::build_rpc_extensions(
430 client.clone(),
431 backend_for_rpc.clone(),
432 transaction_pool.clone(),
433 statement_store.clone(),
434 )
435 })
436 };
437
438 sc_service::spawn_tasks(sc_service::SpawnTasksParams {
439 rpc_builder,
440 client: client.clone(),
441 transaction_pool: transaction_pool.clone(),
442 task_manager: &mut task_manager,
443 config: parachain_config,
444 keystore: params.keystore_container.keystore(),
445 backend: backend.clone(),
446 network: network.clone(),
447 sync_service: sync_service.clone(),
448 system_rpc_tx,
449 tx_handler_controller,
450 telemetry: telemetry.as_mut(),
451 })?;
452
453 if let Some(hwbench) = hwbench {
454 sc_sysinfo::print_hwbench(&hwbench);
455 if validator {
456 warn_if_slow_hardware(&hwbench);
457 }
458
459 if let Some(ref mut telemetry) = telemetry {
460 let telemetry_handle = telemetry.handle();
461 task_manager.spawn_handle().spawn(
462 "telemetry_hwbench",
463 None,
464 sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
465 );
466 }
467 }
468
469 let announce_block = {
470 let sync_service = sync_service.clone();
471 Arc::new(move |hash, data| sync_service.announce_block(hash, data))
472 };
473
474 let relay_chain_slot_duration = Duration::from_secs(6);
475
476 let overseer_handle = relay_chain_interface
477 .overseer_handle()
478 .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
479
480 start_relay_chain_tasks(StartRelayChainTasksParams {
481 client: client.clone(),
482 announce_block: announce_block.clone(),
483 para_id,
484 relay_chain_interface: relay_chain_interface.clone(),
485 task_manager: &mut task_manager,
486 da_recovery_profile: if validator {
487 DARecoveryProfile::Collator
488 } else {
489 DARecoveryProfile::FullNode
490 },
491 import_queue: import_queue_service,
492 relay_chain_slot_duration,
493 recovery_handle: Box::new(overseer_handle.clone()),
494 sync_service,
495 prometheus_registry: prometheus_registry.as_ref(),
496 })?;
497
498 start_bootnode_tasks(StartBootnodeTasksParams {
499 embedded_dht_bootnode: collator_options.embedded_dht_bootnode,
500 dht_bootnode_discovery: collator_options.dht_bootnode_discovery,
501 para_id,
502 task_manager: &mut task_manager,
503 relay_chain_interface: relay_chain_interface.clone(),
504 relay_chain_fork_id,
505 relay_chain_network,
506 request_receiver: paranode_rx,
507 parachain_network: network,
508 advertise_non_global_ips,
509 parachain_genesis_hash: client.chain_info().genesis_hash,
510 parachain_fork_id,
511 parachain_public_addresses,
512 });
513
514 if validator {
515 Self::StartConsensus::start_consensus(
516 client.clone(),
517 block_import,
518 prometheus_registry.as_ref(),
519 telemetry.as_ref().map(|t| t.handle()),
520 &task_manager,
521 relay_chain_interface.clone(),
522 transaction_pool,
523 params.keystore_container.keystore(),
524 relay_chain_slot_duration,
525 para_id,
526 collator_key.expect("Command line arguments do not allow this. qed"),
527 overseer_handle,
528 announce_block,
529 backend.clone(),
530 node_extra_args,
531 block_import_auxiliary_data,
532 )?;
533 }
534
535 Ok(task_manager)
536 };
537
538 Box::pin(Instrument::instrument(
539 fut,
540 sc_tracing::tracing::info_span!(
541 sc_tracing::logging::PREFIX_LOG_SPAN,
542 name = "Parachain"
543 ),
544 ))
545 }
546}
547
548pub(crate) trait DynNodeSpec: NodeCommandRunner {
549 fn start_node(
550 self: Box<Self>,
551 parachain_config: Configuration,
552 polkadot_config: Configuration,
553 collator_options: CollatorOptions,
554 hwbench: Option<HwBench>,
555 node_extra_args: NodeExtraArgs,
556 ) -> Pin<Box<dyn Future<Output = sc_service::error::Result<TaskManager>>>>;
557}
558
559impl<T> DynNodeSpec for T
560where
561 T: NodeSpec + NodeCommandRunner,
562{
563 fn start_node(
564 self: Box<Self>,
565 parachain_config: Configuration,
566 polkadot_config: Configuration,
567 collator_options: CollatorOptions,
568 hwbench: Option<HwBench>,
569 node_extra_args: NodeExtraArgs,
570 ) -> Pin<Box<dyn Future<Output = sc_service::error::Result<TaskManager>>>> {
571 match parachain_config.network.network_backend {
572 sc_network::config::NetworkBackendType::Libp2p =>
573 <Self as NodeSpec>::start_node::<sc_network::NetworkWorker<_, _>>(
574 parachain_config,
575 polkadot_config,
576 collator_options,
577 hwbench,
578 node_extra_args,
579 ),
580 sc_network::config::NetworkBackendType::Litep2p =>
581 <Self as NodeSpec>::start_node::<sc_network::Litep2pNetworkBackend>(
582 parachain_config,
583 polkadot_config,
584 collator_options,
585 hwbench,
586 node_extra_args,
587 ),
588 }
589 }
590}