referrerpolicy=no-referrer-when-downgrade

polkadot_omni_node_lib/common/
spec.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use crate::{
18	chain_spec::Extensions,
19	common::{
20		command::NodeCommandRunner,
21		rpc::BuildRpcExtensions,
22		statement_store::{build_statement_store, new_statement_handler_proto},
23		types::{
24			ParachainBackend, ParachainBlockImport, ParachainClient, ParachainHostFunctions,
25			ParachainService,
26		},
27		ConstructNodeRuntimeApi, NodeBlock, NodeExtraArgs,
28	},
29};
30use cumulus_client_bootnodes::{start_bootnode_tasks, StartBootnodeTasksParams};
31use cumulus_client_cli::CollatorOptions;
32use cumulus_client_service::{
33	build_network, build_relay_chain_interface, prepare_node_config, start_relay_chain_tasks,
34	BuildNetworkParams, CollatorSybilResistance, DARecoveryProfile, StartRelayChainTasksParams,
35};
36use cumulus_primitives_core::{BlockT, GetParachainInfo, ParaId};
37use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface};
38use futures::FutureExt;
39use log::info;
40use parachains_common::Hash;
41use polkadot_primitives::CollatorPair;
42use prometheus_endpoint::Registry;
43use sc_client_api::Backend;
44use sc_consensus::DefaultImportQueue;
45use sc_executor::{HeapAllocStrategy, DEFAULT_HEAP_ALLOC_STRATEGY};
46use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NetworkBlock};
47use sc_service::{Configuration, ImportQueue, PartialComponents, TaskManager};
48use sc_statement_store::Store;
49use sc_sysinfo::HwBench;
50use sc_telemetry::{TelemetryHandle, TelemetryWorker};
51use sc_tracing::tracing::Instrument;
52use sc_transaction_pool::TransactionPoolHandle;
53use sc_transaction_pool_api::OffchainTransactionPoolFactory;
54use sp_api::{ApiExt, ProvideRuntimeApi};
55use sp_keystore::KeystorePtr;
56use sp_runtime::traits::AccountIdConversion;
57use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
58
59pub(crate) trait BuildImportQueue<
60	Block: BlockT,
61	RuntimeApi,
62	BlockImport: sc_consensus::BlockImport<Block>,
63>
64{
65	fn build_import_queue(
66		client: Arc<ParachainClient<Block, RuntimeApi>>,
67		block_import: ParachainBlockImport<Block, BlockImport>,
68		config: &Configuration,
69		telemetry_handle: Option<TelemetryHandle>,
70		task_manager: &TaskManager,
71	) -> sc_service::error::Result<DefaultImportQueue<Block>>;
72}
73
74pub(crate) trait StartConsensus<Block: BlockT, RuntimeApi, BI, BIAuxiliaryData>
75where
76	RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
77{
78	fn start_consensus(
79		client: Arc<ParachainClient<Block, RuntimeApi>>,
80		block_import: ParachainBlockImport<Block, BI>,
81		prometheus_registry: Option<&Registry>,
82		telemetry: Option<TelemetryHandle>,
83		task_manager: &TaskManager,
84		relay_chain_interface: Arc<dyn RelayChainInterface>,
85		transaction_pool: Arc<TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>>,
86		keystore: KeystorePtr,
87		relay_chain_slot_duration: Duration,
88		para_id: ParaId,
89		collator_key: CollatorPair,
90		overseer_handle: OverseerHandle,
91		announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
92		backend: Arc<ParachainBackend<Block>>,
93		node_extra_args: NodeExtraArgs,
94		block_import_extra_return_value: BIAuxiliaryData,
95	) -> Result<(), sc_service::Error>;
96}
97
98/// Checks that the hardware meets the requirements and print a warning otherwise.
99fn warn_if_slow_hardware(hwbench: &sc_sysinfo::HwBench) {
100	// Polkadot para-chains should generally use these requirements to ensure that the relay-chain
101	// will not take longer than expected to import its blocks.
102	if let Err(err) =
103		frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench, false)
104	{
105		log::warn!(
106			"⚠️  The hardware does not meet the minimal requirements {} for role 'Authority' find out more at:\n\
107			https://wiki.polkadot.network/docs/maintain-guides-how-to-validate-polkadot#reference-hardware",
108			err
109		);
110	}
111}
112
113pub(crate) trait InitBlockImport<Block: BlockT, RuntimeApi> {
114	type BlockImport: sc_consensus::BlockImport<Block> + Clone + Send + Sync;
115	type BlockImportAuxiliaryData;
116
117	fn init_block_import(
118		client: Arc<ParachainClient<Block, RuntimeApi>>,
119	) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)>;
120}
121
122pub(crate) struct ClientBlockImport;
123
124impl<Block: BlockT, RuntimeApi> InitBlockImport<Block, RuntimeApi> for ClientBlockImport
125where
126	RuntimeApi: Send + ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
127{
128	type BlockImport = Arc<ParachainClient<Block, RuntimeApi>>;
129	type BlockImportAuxiliaryData = ();
130
131	fn init_block_import(
132		client: Arc<ParachainClient<Block, RuntimeApi>>,
133	) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)> {
134		Ok((client.clone(), ()))
135	}
136}
137
138pub(crate) trait BaseNodeSpec {
139	type Block: NodeBlock;
140
141	type RuntimeApi: ConstructNodeRuntimeApi<
142		Self::Block,
143		ParachainClient<Self::Block, Self::RuntimeApi>,
144	>;
145
146	type BuildImportQueue: BuildImportQueue<
147		Self::Block,
148		Self::RuntimeApi,
149		<Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport,
150	>;
151
152	type InitBlockImport: self::InitBlockImport<Self::Block, Self::RuntimeApi>;
153
154	/// Retrieves parachain id.
155	fn parachain_id(
156		client: &ParachainClient<Self::Block, Self::RuntimeApi>,
157		parachain_config: &Configuration,
158	) -> Option<ParaId> {
159		let best_hash = client.chain_info().best_hash;
160		let para_id = if client
161			.runtime_api()
162			.has_api::<dyn GetParachainInfo<Self::Block>>(best_hash)
163			.ok()
164			.filter(|has_api| *has_api)
165			.is_some()
166		{
167			client
168				.runtime_api()
169				.parachain_id(best_hash)
170				.inspect_err(|err| {
171					log::error!(
172								"`cumulus_primitives_core::GetParachainInfo` runtime API call errored with {}",
173								err
174							);
175				})
176				.ok()?
177		} else {
178			ParaId::from(
179				Extensions::try_get(&*parachain_config.chain_spec).and_then(|ext| ext.para_id())?,
180			)
181		};
182
183		let parachain_account =
184			AccountIdConversion::<polkadot_primitives::AccountId>::into_account_truncating(
185				&para_id,
186			);
187
188		info!("🪪 Parachain id: {:?}", para_id);
189		info!("🧾 Parachain Account: {}", parachain_account);
190
191		Some(para_id)
192	}
193
194	/// Starts a `ServiceBuilder` for a full service.
195	///
196	/// Use this macro if you don't actually need the full service, but just the builder in order to
197	/// be able to perform chain operations.
198	fn new_partial(
199		config: &Configuration,
200	) -> sc_service::error::Result<
201		ParachainService<
202			Self::Block,
203			Self::RuntimeApi,
204			<Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport,
205			<Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImportAuxiliaryData
206		>
207	>{
208		let telemetry = config
209			.telemetry_endpoints
210			.clone()
211			.filter(|x| !x.is_empty())
212			.map(|endpoints| -> Result<_, sc_telemetry::Error> {
213				let worker = TelemetryWorker::new(16)?;
214				let telemetry = worker.handle().new_telemetry(endpoints);
215				Ok((worker, telemetry))
216			})
217			.transpose()?;
218
219		let heap_pages =
220			config.executor.default_heap_pages.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| {
221				HeapAllocStrategy::Static { extra_pages: h as _ }
222			});
223
224		let executor = sc_executor::WasmExecutor::<ParachainHostFunctions>::builder()
225			.with_execution_method(config.executor.wasm_method)
226			.with_max_runtime_instances(config.executor.max_runtime_instances)
227			.with_runtime_cache_size(config.executor.runtime_cache_size)
228			.with_onchain_heap_alloc_strategy(heap_pages)
229			.with_offchain_heap_alloc_strategy(heap_pages)
230			.build();
231
232		let (client, backend, keystore_container, task_manager) =
233			sc_service::new_full_parts_record_import::<Self::Block, Self::RuntimeApi, _>(
234				config,
235				telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
236				executor,
237				true,
238			)?;
239		let client = Arc::new(client);
240
241		let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
242
243		let telemetry = telemetry.map(|(worker, telemetry)| {
244			task_manager.spawn_handle().spawn("telemetry", None, worker.run());
245			telemetry
246		});
247
248		let transaction_pool = Arc::from(
249			sc_transaction_pool::Builder::new(
250				task_manager.spawn_essential_handle(),
251				client.clone(),
252				config.role.is_authority().into(),
253			)
254			.with_options(config.transaction_pool.clone())
255			.with_prometheus(config.prometheus_registry())
256			.build(),
257		);
258
259		let (block_import, block_import_auxiliary_data) =
260			Self::InitBlockImport::init_block_import(client.clone())?;
261
262		let block_import = ParachainBlockImport::new(block_import, backend.clone());
263
264		let import_queue = Self::BuildImportQueue::build_import_queue(
265			client.clone(),
266			block_import.clone(),
267			config,
268			telemetry.as_ref().map(|telemetry| telemetry.handle()),
269			&task_manager,
270		)?;
271
272		Ok(PartialComponents {
273			backend,
274			client,
275			import_queue,
276			keystore_container,
277			task_manager,
278			transaction_pool,
279			select_chain: (),
280			other: (block_import, telemetry, telemetry_worker_handle, block_import_auxiliary_data),
281		})
282	}
283}
284
285pub(crate) trait NodeSpec: BaseNodeSpec {
286	type BuildRpcExtensions: BuildRpcExtensions<
287		ParachainClient<Self::Block, Self::RuntimeApi>,
288		ParachainBackend<Self::Block>,
289		TransactionPoolHandle<Self::Block, ParachainClient<Self::Block, Self::RuntimeApi>>,
290		Store,
291	>;
292
293	type StartConsensus: StartConsensus<
294		Self::Block,
295		Self::RuntimeApi,
296		<Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport,
297		<Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImportAuxiliaryData,
298	>;
299
300	const SYBIL_RESISTANCE: CollatorSybilResistance;
301
302	/// Start a node with the given parachain spec.
303	///
304	/// This is the actual implementation that is abstract over the executor and the runtime api.
305	fn start_node<Net>(
306		parachain_config: Configuration,
307		polkadot_config: Configuration,
308		collator_options: CollatorOptions,
309		hwbench: Option<sc_sysinfo::HwBench>,
310		node_extra_args: NodeExtraArgs,
311	) -> Pin<Box<dyn Future<Output = sc_service::error::Result<TaskManager>>>>
312	where
313		Net: NetworkBackend<Self::Block, Hash>,
314	{
315		let fut = async move {
316			let parachain_config = prepare_node_config(parachain_config);
317			let parachain_public_addresses = parachain_config.network.public_addresses.clone();
318			let parachain_fork_id = parachain_config.chain_spec.fork_id().map(ToString::to_string);
319			let advertise_non_global_ips = parachain_config.network.allow_non_globals_in_dht;
320			let params = Self::new_partial(&parachain_config)?;
321			let (block_import, mut telemetry, telemetry_worker_handle, block_import_auxiliary_data) =
322				params.other;
323			let client = params.client.clone();
324			let backend = params.backend.clone();
325			let mut task_manager = params.task_manager;
326
327			// Resolve parachain id based on runtime, or based on chain spec.
328			let para_id = Self::parachain_id(&client, &parachain_config)
329				.ok_or("Failed to retrieve the parachain id")?;
330			let relay_chain_fork_id = polkadot_config.chain_spec.fork_id().map(ToString::to_string);
331			let (relay_chain_interface, collator_key, relay_chain_network, paranode_rx) =
332				build_relay_chain_interface(
333					polkadot_config,
334					&parachain_config,
335					telemetry_worker_handle,
336					&mut task_manager,
337					collator_options.clone(),
338					hwbench.clone(),
339				)
340				.await
341				.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
342
343			let validator = parachain_config.role.is_authority();
344			let prometheus_registry = parachain_config.prometheus_registry().cloned();
345			let transaction_pool = params.transaction_pool.clone();
346			let import_queue_service = params.import_queue.service();
347			let mut net_config = FullNetworkConfiguration::<_, _, Net>::new(
348				&parachain_config.network,
349				prometheus_registry.clone(),
350			);
351
352			let metrics = Net::register_notification_metrics(
353				parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
354			);
355
356			let statement_handler_proto = node_extra_args.enable_statement_store.then(|| {
357				new_statement_handler_proto(&*client, &parachain_config, &metrics, &mut net_config)
358			});
359
360			let (network, system_rpc_tx, tx_handler_controller, sync_service) =
361				build_network(BuildNetworkParams {
362					parachain_config: &parachain_config,
363					net_config,
364					client: client.clone(),
365					transaction_pool: transaction_pool.clone(),
366					para_id,
367					spawn_handle: task_manager.spawn_handle(),
368					relay_chain_interface: relay_chain_interface.clone(),
369					import_queue: params.import_queue,
370					sybil_resistance_level: Self::SYBIL_RESISTANCE,
371					metrics,
372				})
373				.await?;
374
375			let statement_store = statement_handler_proto
376				.map(|statement_handler_proto| {
377					build_statement_store(
378						&parachain_config,
379						&mut task_manager,
380						client.clone(),
381						network.clone(),
382						sync_service.clone(),
383						params.keystore_container.local_keystore(),
384						statement_handler_proto,
385					)
386				})
387				.transpose()?;
388
389			if parachain_config.offchain_worker.enabled {
390				let custom_extensions = {
391					let statement_store = statement_store.clone();
392					move |_hash| {
393						if let Some(statement_store) = &statement_store {
394							vec![Box::new(statement_store.clone().as_statement_store_ext())
395								as Box<_>]
396						} else {
397							vec![]
398						}
399					}
400				};
401
402				let offchain_workers =
403					sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
404						runtime_api_provider: client.clone(),
405						keystore: Some(params.keystore_container.keystore()),
406						offchain_db: backend.offchain_storage(),
407						transaction_pool: Some(OffchainTransactionPoolFactory::new(
408							transaction_pool.clone(),
409						)),
410						network_provider: Arc::new(network.clone()),
411						is_validator: parachain_config.role.is_authority(),
412						enable_http_requests: true,
413						custom_extensions,
414					})?;
415				task_manager.spawn_handle().spawn(
416					"offchain-workers-runner",
417					"offchain-work",
418					offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(),
419				);
420			}
421
422			let rpc_builder = {
423				let client = client.clone();
424				let transaction_pool = transaction_pool.clone();
425				let backend_for_rpc = backend.clone();
426				let statement_store = statement_store.clone();
427
428				Box::new(move |_| {
429					Self::BuildRpcExtensions::build_rpc_extensions(
430						client.clone(),
431						backend_for_rpc.clone(),
432						transaction_pool.clone(),
433						statement_store.clone(),
434					)
435				})
436			};
437
438			sc_service::spawn_tasks(sc_service::SpawnTasksParams {
439				rpc_builder,
440				client: client.clone(),
441				transaction_pool: transaction_pool.clone(),
442				task_manager: &mut task_manager,
443				config: parachain_config,
444				keystore: params.keystore_container.keystore(),
445				backend: backend.clone(),
446				network: network.clone(),
447				sync_service: sync_service.clone(),
448				system_rpc_tx,
449				tx_handler_controller,
450				telemetry: telemetry.as_mut(),
451			})?;
452
453			if let Some(hwbench) = hwbench {
454				sc_sysinfo::print_hwbench(&hwbench);
455				if validator {
456					warn_if_slow_hardware(&hwbench);
457				}
458
459				if let Some(ref mut telemetry) = telemetry {
460					let telemetry_handle = telemetry.handle();
461					task_manager.spawn_handle().spawn(
462						"telemetry_hwbench",
463						None,
464						sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
465					);
466				}
467			}
468
469			let announce_block = {
470				let sync_service = sync_service.clone();
471				Arc::new(move |hash, data| sync_service.announce_block(hash, data))
472			};
473
474			let relay_chain_slot_duration = Duration::from_secs(6);
475
476			let overseer_handle = relay_chain_interface
477				.overseer_handle()
478				.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
479
480			start_relay_chain_tasks(StartRelayChainTasksParams {
481				client: client.clone(),
482				announce_block: announce_block.clone(),
483				para_id,
484				relay_chain_interface: relay_chain_interface.clone(),
485				task_manager: &mut task_manager,
486				da_recovery_profile: if validator {
487					DARecoveryProfile::Collator
488				} else {
489					DARecoveryProfile::FullNode
490				},
491				import_queue: import_queue_service,
492				relay_chain_slot_duration,
493				recovery_handle: Box::new(overseer_handle.clone()),
494				sync_service,
495				prometheus_registry: prometheus_registry.as_ref(),
496			})?;
497
498			start_bootnode_tasks(StartBootnodeTasksParams {
499				embedded_dht_bootnode: collator_options.embedded_dht_bootnode,
500				dht_bootnode_discovery: collator_options.dht_bootnode_discovery,
501				para_id,
502				task_manager: &mut task_manager,
503				relay_chain_interface: relay_chain_interface.clone(),
504				relay_chain_fork_id,
505				relay_chain_network,
506				request_receiver: paranode_rx,
507				parachain_network: network,
508				advertise_non_global_ips,
509				parachain_genesis_hash: client.chain_info().genesis_hash,
510				parachain_fork_id,
511				parachain_public_addresses,
512			});
513
514			if validator {
515				Self::StartConsensus::start_consensus(
516					client.clone(),
517					block_import,
518					prometheus_registry.as_ref(),
519					telemetry.as_ref().map(|t| t.handle()),
520					&task_manager,
521					relay_chain_interface.clone(),
522					transaction_pool,
523					params.keystore_container.keystore(),
524					relay_chain_slot_duration,
525					para_id,
526					collator_key.expect("Command line arguments do not allow this. qed"),
527					overseer_handle,
528					announce_block,
529					backend.clone(),
530					node_extra_args,
531					block_import_auxiliary_data,
532				)?;
533			}
534
535			Ok(task_manager)
536		};
537
538		Box::pin(Instrument::instrument(
539			fut,
540			sc_tracing::tracing::info_span!(
541				sc_tracing::logging::PREFIX_LOG_SPAN,
542				name = "Parachain"
543			),
544		))
545	}
546}
547
548pub(crate) trait DynNodeSpec: NodeCommandRunner {
549	fn start_node(
550		self: Box<Self>,
551		parachain_config: Configuration,
552		polkadot_config: Configuration,
553		collator_options: CollatorOptions,
554		hwbench: Option<HwBench>,
555		node_extra_args: NodeExtraArgs,
556	) -> Pin<Box<dyn Future<Output = sc_service::error::Result<TaskManager>>>>;
557}
558
559impl<T> DynNodeSpec for T
560where
561	T: NodeSpec + NodeCommandRunner,
562{
563	fn start_node(
564		self: Box<Self>,
565		parachain_config: Configuration,
566		polkadot_config: Configuration,
567		collator_options: CollatorOptions,
568		hwbench: Option<HwBench>,
569		node_extra_args: NodeExtraArgs,
570	) -> Pin<Box<dyn Future<Output = sc_service::error::Result<TaskManager>>>> {
571		match parachain_config.network.network_backend {
572			sc_network::config::NetworkBackendType::Libp2p =>
573				<Self as NodeSpec>::start_node::<sc_network::NetworkWorker<_, _>>(
574					parachain_config,
575					polkadot_config,
576					collator_options,
577					hwbench,
578					node_extra_args,
579				),
580			sc_network::config::NetworkBackendType::Litep2p =>
581				<Self as NodeSpec>::start_node::<sc_network::Litep2pNetworkBackend>(
582					parachain_config,
583					polkadot_config,
584					collator_options,
585					hwbench,
586					node_extra_args,
587				),
588		}
589	}
590}