1use crate::{
18 chain_spec::DeprecatedExtensions,
19 common::{
20 command::NodeCommandRunner,
21 rpc::BuildRpcExtensions,
22 statement_store::{build_statement_store, new_statement_handler_proto},
23 types::{
24 ParachainBackend, ParachainBlockImport, ParachainClient, ParachainHostFunctions,
25 ParachainService,
26 },
27 ConstructNodeRuntimeApi, NodeBlock, NodeExtraArgs,
28 },
29};
30use cumulus_client_bootnodes::{start_bootnode_tasks, StartBootnodeTasksParams};
31use cumulus_client_cli::CollatorOptions;
32use cumulus_client_service::{
33 build_network, build_relay_chain_interface, prepare_node_config, start_relay_chain_tasks,
34 BuildNetworkParams, CollatorSybilResistance, DARecoveryProfile, StartRelayChainTasksParams,
35};
36use cumulus_primitives_core::{BlockT, GetParachainInfo, ParaId};
37use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface};
38use futures::FutureExt;
39use log::info;
40use parachains_common::Hash;
41use polkadot_primitives::CollatorPair;
42use prometheus_endpoint::Registry;
43use sc_client_api::Backend;
44use sc_consensus::DefaultImportQueue;
45use sc_executor::{HeapAllocStrategy, DEFAULT_HEAP_ALLOC_STRATEGY};
46use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NetworkBlock};
47use sc_service::{Configuration, ImportQueue, PartialComponents, TaskManager};
48use sc_statement_store::Store;
49use sc_sysinfo::HwBench;
50use sc_telemetry::{TelemetryHandle, TelemetryWorker};
51use sc_tracing::tracing::Instrument;
52use sc_transaction_pool::TransactionPoolHandle;
53use sc_transaction_pool_api::OffchainTransactionPoolFactory;
54use sp_api::ProvideRuntimeApi;
55use sp_keystore::KeystorePtr;
56use sp_runtime::traits::AccountIdConversion;
57use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
58
59pub(crate) trait BuildImportQueue<
60 Block: BlockT,
61 RuntimeApi,
62 BlockImport: sc_consensus::BlockImport<Block>,
63>
64{
65 fn build_import_queue(
66 client: Arc<ParachainClient<Block, RuntimeApi>>,
67 block_import: ParachainBlockImport<Block, BlockImport>,
68 config: &Configuration,
69 telemetry_handle: Option<TelemetryHandle>,
70 task_manager: &TaskManager,
71 ) -> sc_service::error::Result<DefaultImportQueue<Block>>;
72}
73
74pub(crate) trait StartConsensus<Block: BlockT, RuntimeApi, BI, BIAuxiliaryData>
75where
76 RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
77{
78 fn start_consensus(
79 client: Arc<ParachainClient<Block, RuntimeApi>>,
80 block_import: ParachainBlockImport<Block, BI>,
81 prometheus_registry: Option<&Registry>,
82 telemetry: Option<TelemetryHandle>,
83 task_manager: &TaskManager,
84 relay_chain_interface: Arc<dyn RelayChainInterface>,
85 transaction_pool: Arc<TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>>,
86 keystore: KeystorePtr,
87 relay_chain_slot_duration: Duration,
88 para_id: ParaId,
89 collator_key: CollatorPair,
90 overseer_handle: OverseerHandle,
91 announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
92 backend: Arc<ParachainBackend<Block>>,
93 node_extra_args: NodeExtraArgs,
94 block_import_extra_return_value: BIAuxiliaryData,
95 ) -> Result<(), sc_service::Error>;
96}
97
98fn warn_if_slow_hardware(hwbench: &sc_sysinfo::HwBench) {
100 if let Err(err) =
103 frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench, false)
104 {
105 log::warn!(
106 "⚠️ The hardware does not meet the minimal requirements {} for role 'Authority' find out more at:\n\
107 https://wiki.polkadot.network/docs/maintain-guides-how-to-validate-polkadot#reference-hardware",
108 err
109 );
110 }
111}
112
113pub(crate) trait InitBlockImport<Block: BlockT, RuntimeApi> {
114 type BlockImport: sc_consensus::BlockImport<Block> + Clone + Send + Sync;
115 type BlockImportAuxiliaryData;
116
117 fn init_block_import(
118 client: Arc<ParachainClient<Block, RuntimeApi>>,
119 ) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)>;
120}
121
122pub(crate) struct ClientBlockImport;
123
124impl<Block: BlockT, RuntimeApi> InitBlockImport<Block, RuntimeApi> for ClientBlockImport
125where
126 RuntimeApi: Send + ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
127{
128 type BlockImport = Arc<ParachainClient<Block, RuntimeApi>>;
129 type BlockImportAuxiliaryData = ();
130
131 fn init_block_import(
132 client: Arc<ParachainClient<Block, RuntimeApi>>,
133 ) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)> {
134 Ok((client.clone(), ()))
135 }
136}
137
138pub(crate) trait BaseNodeSpec {
139 type Block: NodeBlock;
140
141 type RuntimeApi: ConstructNodeRuntimeApi<
142 Self::Block,
143 ParachainClient<Self::Block, Self::RuntimeApi>,
144 >;
145
146 type BuildImportQueue: BuildImportQueue<
147 Self::Block,
148 Self::RuntimeApi,
149 <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport,
150 >;
151
152 type InitBlockImport: self::InitBlockImport<Self::Block, Self::RuntimeApi>;
153
154 fn parachain_id(
156 client: &ParachainClient<Self::Block, Self::RuntimeApi>,
157 parachain_config: &Configuration,
158 ) -> Option<ParaId> {
159 let best_hash = client.chain_info().best_hash;
160 let para_id = if let Ok(para_id) = client.runtime_api().parachain_id(best_hash) {
161 para_id
162 } else {
163 #[allow(deprecated)]
165 let id = ParaId::from(
166 DeprecatedExtensions::try_get(&*parachain_config.chain_spec)
167 .and_then(|ext| ext.para_id)?,
168 );
169 log::info!("Deprecation notice: the parachain id was provided via the chain spec. This way of providing the parachain id to the node is not recommended. The alternative is to implement the `cumulus_primitives_core::GetParachainInfo` runtime API in the runtime, and upgrade it on-chain. Starting with `stable2512` providing the parachain id via the chain spec will not be supported anymore.");
172 id
173 };
174
175 let parachain_account =
176 AccountIdConversion::<polkadot_primitives::AccountId>::into_account_truncating(
177 ¶_id,
178 );
179
180 info!("🪪 Parachain id: {:?}", para_id);
181 info!("🧾 Parachain Account: {}", parachain_account);
182
183 Some(para_id)
184 }
185
186 fn new_partial(
191 config: &Configuration,
192 ) -> sc_service::error::Result<
193 ParachainService<
194 Self::Block,
195 Self::RuntimeApi,
196 <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport,
197 <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImportAuxiliaryData
198 >
199 >{
200 let telemetry = config
201 .telemetry_endpoints
202 .clone()
203 .filter(|x| !x.is_empty())
204 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
205 let worker = TelemetryWorker::new(16)?;
206 let telemetry = worker.handle().new_telemetry(endpoints);
207 Ok((worker, telemetry))
208 })
209 .transpose()?;
210
211 let heap_pages =
212 config.executor.default_heap_pages.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| {
213 HeapAllocStrategy::Static { extra_pages: h as _ }
214 });
215
216 let executor = sc_executor::WasmExecutor::<ParachainHostFunctions>::builder()
217 .with_execution_method(config.executor.wasm_method)
218 .with_max_runtime_instances(config.executor.max_runtime_instances)
219 .with_runtime_cache_size(config.executor.runtime_cache_size)
220 .with_onchain_heap_alloc_strategy(heap_pages)
221 .with_offchain_heap_alloc_strategy(heap_pages)
222 .build();
223
224 let (client, backend, keystore_container, task_manager) =
225 sc_service::new_full_parts_record_import::<Self::Block, Self::RuntimeApi, _>(
226 config,
227 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
228 executor,
229 true,
230 )?;
231 let client = Arc::new(client);
232
233 let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
234
235 let telemetry = telemetry.map(|(worker, telemetry)| {
236 task_manager.spawn_handle().spawn("telemetry", None, worker.run());
237 telemetry
238 });
239
240 let transaction_pool = Arc::from(
241 sc_transaction_pool::Builder::new(
242 task_manager.spawn_essential_handle(),
243 client.clone(),
244 config.role.is_authority().into(),
245 )
246 .with_options(config.transaction_pool.clone())
247 .with_prometheus(config.prometheus_registry())
248 .build(),
249 );
250
251 let (block_import, block_import_auxiliary_data) =
252 Self::InitBlockImport::init_block_import(client.clone())?;
253
254 let block_import = ParachainBlockImport::new(block_import, backend.clone());
255
256 let import_queue = Self::BuildImportQueue::build_import_queue(
257 client.clone(),
258 block_import.clone(),
259 config,
260 telemetry.as_ref().map(|telemetry| telemetry.handle()),
261 &task_manager,
262 )?;
263
264 Ok(PartialComponents {
265 backend,
266 client,
267 import_queue,
268 keystore_container,
269 task_manager,
270 transaction_pool,
271 select_chain: (),
272 other: (block_import, telemetry, telemetry_worker_handle, block_import_auxiliary_data),
273 })
274 }
275}
276
277pub(crate) trait NodeSpec: BaseNodeSpec {
278 type BuildRpcExtensions: BuildRpcExtensions<
279 ParachainClient<Self::Block, Self::RuntimeApi>,
280 ParachainBackend<Self::Block>,
281 TransactionPoolHandle<Self::Block, ParachainClient<Self::Block, Self::RuntimeApi>>,
282 Store,
283 >;
284
285 type StartConsensus: StartConsensus<
286 Self::Block,
287 Self::RuntimeApi,
288 <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport,
289 <Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImportAuxiliaryData,
290 >;
291
292 const SYBIL_RESISTANCE: CollatorSybilResistance;
293
294 fn start_node<Net>(
298 parachain_config: Configuration,
299 polkadot_config: Configuration,
300 collator_options: CollatorOptions,
301 hwbench: Option<sc_sysinfo::HwBench>,
302 node_extra_args: NodeExtraArgs,
303 ) -> Pin<Box<dyn Future<Output = sc_service::error::Result<TaskManager>>>>
304 where
305 Net: NetworkBackend<Self::Block, Hash>,
306 {
307 let fut = async move {
308 let parachain_config = prepare_node_config(parachain_config);
309 let parachain_public_addresses = parachain_config.network.public_addresses.clone();
310 let parachain_fork_id = parachain_config.chain_spec.fork_id().map(ToString::to_string);
311 let advertise_non_global_ips = parachain_config.network.allow_non_globals_in_dht;
312 let params = Self::new_partial(¶chain_config)?;
313 let (block_import, mut telemetry, telemetry_worker_handle, block_import_auxiliary_data) =
314 params.other;
315 let client = params.client.clone();
316 let backend = params.backend.clone();
317 let mut task_manager = params.task_manager;
318
319 let para_id = Self::parachain_id(&client, ¶chain_config)
321 .ok_or("Failed to retrieve the parachain id")?;
322 let relay_chain_fork_id = polkadot_config.chain_spec.fork_id().map(ToString::to_string);
323 let (relay_chain_interface, collator_key, relay_chain_network, paranode_rx) =
324 build_relay_chain_interface(
325 polkadot_config,
326 ¶chain_config,
327 telemetry_worker_handle,
328 &mut task_manager,
329 collator_options.clone(),
330 hwbench.clone(),
331 )
332 .await
333 .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
334
335 let validator = parachain_config.role.is_authority();
336 let prometheus_registry = parachain_config.prometheus_registry().cloned();
337 let transaction_pool = params.transaction_pool.clone();
338 let import_queue_service = params.import_queue.service();
339 let mut net_config = FullNetworkConfiguration::<_, _, Net>::new(
340 ¶chain_config.network,
341 prometheus_registry.clone(),
342 );
343
344 let metrics = Net::register_notification_metrics(
345 parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
346 );
347
348 let statement_handler_proto = node_extra_args.enable_statement_store.then(|| {
349 new_statement_handler_proto(&*client, ¶chain_config, &metrics, &mut net_config)
350 });
351
352 let (network, system_rpc_tx, tx_handler_controller, sync_service) =
353 build_network(BuildNetworkParams {
354 parachain_config: ¶chain_config,
355 net_config,
356 client: client.clone(),
357 transaction_pool: transaction_pool.clone(),
358 para_id,
359 spawn_handle: task_manager.spawn_handle(),
360 relay_chain_interface: relay_chain_interface.clone(),
361 import_queue: params.import_queue,
362 sybil_resistance_level: Self::SYBIL_RESISTANCE,
363 metrics,
364 })
365 .await?;
366
367 let statement_store = statement_handler_proto
368 .map(|statement_handler_proto| {
369 build_statement_store(
370 ¶chain_config,
371 &mut task_manager,
372 client.clone(),
373 network.clone(),
374 sync_service.clone(),
375 params.keystore_container.local_keystore(),
376 statement_handler_proto,
377 )
378 })
379 .transpose()?;
380
381 if parachain_config.offchain_worker.enabled {
382 let custom_extensions = {
383 let statement_store = statement_store.clone();
384 move |_hash| {
385 if let Some(statement_store) = &statement_store {
386 vec![Box::new(statement_store.clone().as_statement_store_ext())
387 as Box<_>]
388 } else {
389 vec![]
390 }
391 }
392 };
393
394 let offchain_workers =
395 sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
396 runtime_api_provider: client.clone(),
397 keystore: Some(params.keystore_container.keystore()),
398 offchain_db: backend.offchain_storage(),
399 transaction_pool: Some(OffchainTransactionPoolFactory::new(
400 transaction_pool.clone(),
401 )),
402 network_provider: Arc::new(network.clone()),
403 is_validator: parachain_config.role.is_authority(),
404 enable_http_requests: true,
405 custom_extensions,
406 })?;
407 task_manager.spawn_handle().spawn(
408 "offchain-workers-runner",
409 "offchain-work",
410 offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(),
411 );
412 }
413
414 let rpc_builder = {
415 let client = client.clone();
416 let transaction_pool = transaction_pool.clone();
417 let backend_for_rpc = backend.clone();
418 let statement_store = statement_store.clone();
419
420 Box::new(move |_| {
421 Self::BuildRpcExtensions::build_rpc_extensions(
422 client.clone(),
423 backend_for_rpc.clone(),
424 transaction_pool.clone(),
425 statement_store.clone(),
426 )
427 })
428 };
429
430 sc_service::spawn_tasks(sc_service::SpawnTasksParams {
431 rpc_builder,
432 client: client.clone(),
433 transaction_pool: transaction_pool.clone(),
434 task_manager: &mut task_manager,
435 config: parachain_config,
436 keystore: params.keystore_container.keystore(),
437 backend: backend.clone(),
438 network: network.clone(),
439 sync_service: sync_service.clone(),
440 system_rpc_tx,
441 tx_handler_controller,
442 telemetry: telemetry.as_mut(),
443 })?;
444
445 if let Some(hwbench) = hwbench {
446 sc_sysinfo::print_hwbench(&hwbench);
447 if validator {
448 warn_if_slow_hardware(&hwbench);
449 }
450
451 if let Some(ref mut telemetry) = telemetry {
452 let telemetry_handle = telemetry.handle();
453 task_manager.spawn_handle().spawn(
454 "telemetry_hwbench",
455 None,
456 sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
457 );
458 }
459 }
460
461 let announce_block = {
462 let sync_service = sync_service.clone();
463 Arc::new(move |hash, data| sync_service.announce_block(hash, data))
464 };
465
466 let relay_chain_slot_duration = Duration::from_secs(6);
467
468 let overseer_handle = relay_chain_interface
469 .overseer_handle()
470 .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
471
472 start_relay_chain_tasks(StartRelayChainTasksParams {
473 client: client.clone(),
474 announce_block: announce_block.clone(),
475 para_id,
476 relay_chain_interface: relay_chain_interface.clone(),
477 task_manager: &mut task_manager,
478 da_recovery_profile: if validator {
479 DARecoveryProfile::Collator
480 } else {
481 DARecoveryProfile::FullNode
482 },
483 import_queue: import_queue_service,
484 relay_chain_slot_duration,
485 recovery_handle: Box::new(overseer_handle.clone()),
486 sync_service,
487 prometheus_registry: prometheus_registry.as_ref(),
488 })?;
489
490 start_bootnode_tasks(StartBootnodeTasksParams {
491 embedded_dht_bootnode: collator_options.embedded_dht_bootnode,
492 dht_bootnode_discovery: collator_options.dht_bootnode_discovery,
493 para_id,
494 task_manager: &mut task_manager,
495 relay_chain_interface: relay_chain_interface.clone(),
496 relay_chain_fork_id,
497 relay_chain_network,
498 request_receiver: paranode_rx,
499 parachain_network: network,
500 advertise_non_global_ips,
501 parachain_genesis_hash: client.chain_info().genesis_hash,
502 parachain_fork_id,
503 parachain_public_addresses,
504 });
505
506 if validator {
507 Self::StartConsensus::start_consensus(
508 client.clone(),
509 block_import,
510 prometheus_registry.as_ref(),
511 telemetry.as_ref().map(|t| t.handle()),
512 &task_manager,
513 relay_chain_interface.clone(),
514 transaction_pool,
515 params.keystore_container.keystore(),
516 relay_chain_slot_duration,
517 para_id,
518 collator_key.expect("Command line arguments do not allow this. qed"),
519 overseer_handle,
520 announce_block,
521 backend.clone(),
522 node_extra_args,
523 block_import_auxiliary_data,
524 )?;
525 }
526
527 Ok(task_manager)
528 };
529
530 Box::pin(Instrument::instrument(
531 fut,
532 sc_tracing::tracing::info_span!(
533 sc_tracing::logging::PREFIX_LOG_SPAN,
534 name = "Parachain"
535 ),
536 ))
537 }
538}
539
540pub(crate) trait DynNodeSpec: NodeCommandRunner {
541 fn start_node(
542 self: Box<Self>,
543 parachain_config: Configuration,
544 polkadot_config: Configuration,
545 collator_options: CollatorOptions,
546 hwbench: Option<HwBench>,
547 node_extra_args: NodeExtraArgs,
548 ) -> Pin<Box<dyn Future<Output = sc_service::error::Result<TaskManager>>>>;
549}
550
551impl<T> DynNodeSpec for T
552where
553 T: NodeSpec + NodeCommandRunner,
554{
555 fn start_node(
556 self: Box<Self>,
557 parachain_config: Configuration,
558 polkadot_config: Configuration,
559 collator_options: CollatorOptions,
560 hwbench: Option<HwBench>,
561 node_extra_args: NodeExtraArgs,
562 ) -> Pin<Box<dyn Future<Output = sc_service::error::Result<TaskManager>>>> {
563 match parachain_config.network.network_backend {
564 sc_network::config::NetworkBackendType::Libp2p =>
565 <Self as NodeSpec>::start_node::<sc_network::NetworkWorker<_, _>>(
566 parachain_config,
567 polkadot_config,
568 collator_options,
569 hwbench,
570 node_extra_args,
571 ),
572 sc_network::config::NetworkBackendType::Litep2p =>
573 <Self as NodeSpec>::start_node::<sc_network::Litep2pNetworkBackend>(
574 parachain_config,
575 polkadot_config,
576 collator_options,
577 hwbench,
578 node_extra_args,
579 ),
580 }
581 }
582}