referrerpolicy=no-referrer-when-downgrade

polkadot_omni_node_lib/common/
spec.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use crate::{
18	chain_spec::DeprecatedExtensions,
19	common::{
20		command::NodeCommandRunner,
21		rpc::BuildRpcExtensions,
22		statement_store::{build_statement_store, new_statement_handler_proto},
23		types::{
24			ParachainBackend, ParachainBlockImport, ParachainClient, ParachainHostFunctions,
25			ParachainService,
26		},
27		ConstructNodeRuntimeApi, NodeBlock, NodeExtraArgs,
28	},
29};
30use cumulus_client_bootnodes::{start_bootnode_tasks, StartBootnodeTasksParams};
31use cumulus_client_cli::CollatorOptions;
32use cumulus_client_service::{
33	build_network, build_relay_chain_interface, prepare_node_config, start_relay_chain_tasks,
34	BuildNetworkParams, CollatorSybilResistance, DARecoveryProfile, StartRelayChainTasksParams,
35};
36use cumulus_primitives_core::{BlockT, GetParachainInfo, ParaId};
37use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface};
38use futures::FutureExt;
39use log::info;
40use parachains_common::Hash;
41use polkadot_primitives::CollatorPair;
42use prometheus_endpoint::Registry;
43use sc_client_api::Backend;
44use sc_consensus::DefaultImportQueue;
45use sc_executor::{HeapAllocStrategy, DEFAULT_HEAP_ALLOC_STRATEGY};
46use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NetworkBlock};
47use sc_service::{Configuration, ImportQueue, PartialComponents, TaskManager};
48use sc_statement_store::Store;
49use sc_sysinfo::HwBench;
50use sc_telemetry::{TelemetryHandle, TelemetryWorker};
51use sc_tracing::tracing::Instrument;
52use sc_transaction_pool::TransactionPoolHandle;
53use sc_transaction_pool_api::OffchainTransactionPoolFactory;
54use sp_api::ProvideRuntimeApi;
55use sp_keystore::KeystorePtr;
56use sp_runtime::traits::AccountIdConversion;
57use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
58
59pub(crate) trait BuildImportQueue<
60	Block: BlockT,
61	RuntimeApi,
62	BlockImport: sc_consensus::BlockImport<Block>,
63>
64{
65	fn build_import_queue(
66		client: Arc<ParachainClient<Block, RuntimeApi>>,
67		block_import: ParachainBlockImport<Block, BlockImport>,
68		config: &Configuration,
69		telemetry_handle: Option<TelemetryHandle>,
70		task_manager: &TaskManager,
71	) -> sc_service::error::Result<DefaultImportQueue<Block>>;
72}
73
74pub(crate) trait StartConsensus<Block: BlockT, RuntimeApi, BI, BIAuxiliaryData>
75where
76	RuntimeApi: ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
77{
78	fn start_consensus(
79		client: Arc<ParachainClient<Block, RuntimeApi>>,
80		block_import: ParachainBlockImport<Block, BI>,
81		prometheus_registry: Option<&Registry>,
82		telemetry: Option<TelemetryHandle>,
83		task_manager: &TaskManager,
84		relay_chain_interface: Arc<dyn RelayChainInterface>,
85		transaction_pool: Arc<TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>>,
86		keystore: KeystorePtr,
87		relay_chain_slot_duration: Duration,
88		para_id: ParaId,
89		collator_key: CollatorPair,
90		overseer_handle: OverseerHandle,
91		announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
92		backend: Arc<ParachainBackend<Block>>,
93		node_extra_args: NodeExtraArgs,
94		block_import_extra_return_value: BIAuxiliaryData,
95	) -> Result<(), sc_service::Error>;
96}
97
98/// Checks that the hardware meets the requirements and print a warning otherwise.
99fn warn_if_slow_hardware(hwbench: &sc_sysinfo::HwBench) {
100	// Polkadot para-chains should generally use these requirements to ensure that the relay-chain
101	// will not take longer than expected to import its blocks.
102	if let Err(err) =
103		frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench, false)
104	{
105		log::warn!(
106			"⚠️  The hardware does not meet the minimal requirements {} for role 'Authority' find out more at:\n\
107			https://wiki.polkadot.network/docs/maintain-guides-how-to-validate-polkadot#reference-hardware",
108			err
109		);
110	}
111}
112
113pub(crate) trait InitBlockImport<Block: BlockT, RuntimeApi> {
114	type BlockImport: sc_consensus::BlockImport<Block> + Clone + Send + Sync;
115	type BlockImportAuxiliaryData;
116
117	fn init_block_import(
118		client: Arc<ParachainClient<Block, RuntimeApi>>,
119	) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)>;
120}
121
122pub(crate) struct ClientBlockImport;
123
124impl<Block: BlockT, RuntimeApi> InitBlockImport<Block, RuntimeApi> for ClientBlockImport
125where
126	RuntimeApi: Send + ConstructNodeRuntimeApi<Block, ParachainClient<Block, RuntimeApi>>,
127{
128	type BlockImport = Arc<ParachainClient<Block, RuntimeApi>>;
129	type BlockImportAuxiliaryData = ();
130
131	fn init_block_import(
132		client: Arc<ParachainClient<Block, RuntimeApi>>,
133	) -> sc_service::error::Result<(Self::BlockImport, Self::BlockImportAuxiliaryData)> {
134		Ok((client.clone(), ()))
135	}
136}
137
138pub(crate) trait BaseNodeSpec {
139	type Block: NodeBlock;
140
141	type RuntimeApi: ConstructNodeRuntimeApi<
142		Self::Block,
143		ParachainClient<Self::Block, Self::RuntimeApi>,
144	>;
145
146	type BuildImportQueue: BuildImportQueue<
147		Self::Block,
148		Self::RuntimeApi,
149		<Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport,
150	>;
151
152	type InitBlockImport: self::InitBlockImport<Self::Block, Self::RuntimeApi>;
153
154	/// Retrieves parachain id.
155	fn parachain_id(
156		client: &ParachainClient<Self::Block, Self::RuntimeApi>,
157		parachain_config: &Configuration,
158	) -> Option<ParaId> {
159		let best_hash = client.chain_info().best_hash;
160		let para_id = if let Ok(para_id) = client.runtime_api().parachain_id(best_hash) {
161			para_id
162		} else {
163			// TODO: remove this once `para_id` extension is removed: https://github.com/paritytech/polkadot-sdk/issues/8740
164			#[allow(deprecated)]
165			let id = ParaId::from(
166				DeprecatedExtensions::try_get(&*parachain_config.chain_spec)
167					.and_then(|ext| ext.para_id)?,
168			);
169			// TODO: https://github.com/paritytech/polkadot-sdk/issues/8747
170			// TODO: https://github.com/paritytech/polkadot-sdk/issues/8740
171			log::info!("Deprecation notice: the parachain id was provided via the chain spec. This way of providing the parachain id to the node is not recommended. The alternative is to implement the `cumulus_primitives_core::GetParachainInfo` runtime API in the runtime, and upgrade it on-chain. Starting with `stable2512` providing the parachain id via the chain spec will not be supported anymore.");
172			id
173		};
174
175		let parachain_account =
176			AccountIdConversion::<polkadot_primitives::AccountId>::into_account_truncating(
177				&para_id,
178			);
179
180		info!("🪪 Parachain id: {:?}", para_id);
181		info!("🧾 Parachain Account: {}", parachain_account);
182
183		Some(para_id)
184	}
185
186	/// Starts a `ServiceBuilder` for a full service.
187	///
188	/// Use this macro if you don't actually need the full service, but just the builder in order to
189	/// be able to perform chain operations.
190	fn new_partial(
191		config: &Configuration,
192	) -> sc_service::error::Result<
193		ParachainService<
194			Self::Block,
195			Self::RuntimeApi,
196			<Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport,
197			<Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImportAuxiliaryData
198		>
199	>{
200		let telemetry = config
201			.telemetry_endpoints
202			.clone()
203			.filter(|x| !x.is_empty())
204			.map(|endpoints| -> Result<_, sc_telemetry::Error> {
205				let worker = TelemetryWorker::new(16)?;
206				let telemetry = worker.handle().new_telemetry(endpoints);
207				Ok((worker, telemetry))
208			})
209			.transpose()?;
210
211		let heap_pages =
212			config.executor.default_heap_pages.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| {
213				HeapAllocStrategy::Static { extra_pages: h as _ }
214			});
215
216		let executor = sc_executor::WasmExecutor::<ParachainHostFunctions>::builder()
217			.with_execution_method(config.executor.wasm_method)
218			.with_max_runtime_instances(config.executor.max_runtime_instances)
219			.with_runtime_cache_size(config.executor.runtime_cache_size)
220			.with_onchain_heap_alloc_strategy(heap_pages)
221			.with_offchain_heap_alloc_strategy(heap_pages)
222			.build();
223
224		let (client, backend, keystore_container, task_manager) =
225			sc_service::new_full_parts_record_import::<Self::Block, Self::RuntimeApi, _>(
226				config,
227				telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
228				executor,
229				true,
230			)?;
231		let client = Arc::new(client);
232
233		let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
234
235		let telemetry = telemetry.map(|(worker, telemetry)| {
236			task_manager.spawn_handle().spawn("telemetry", None, worker.run());
237			telemetry
238		});
239
240		let transaction_pool = Arc::from(
241			sc_transaction_pool::Builder::new(
242				task_manager.spawn_essential_handle(),
243				client.clone(),
244				config.role.is_authority().into(),
245			)
246			.with_options(config.transaction_pool.clone())
247			.with_prometheus(config.prometheus_registry())
248			.build(),
249		);
250
251		let (block_import, block_import_auxiliary_data) =
252			Self::InitBlockImport::init_block_import(client.clone())?;
253
254		let block_import = ParachainBlockImport::new(block_import, backend.clone());
255
256		let import_queue = Self::BuildImportQueue::build_import_queue(
257			client.clone(),
258			block_import.clone(),
259			config,
260			telemetry.as_ref().map(|telemetry| telemetry.handle()),
261			&task_manager,
262		)?;
263
264		Ok(PartialComponents {
265			backend,
266			client,
267			import_queue,
268			keystore_container,
269			task_manager,
270			transaction_pool,
271			select_chain: (),
272			other: (block_import, telemetry, telemetry_worker_handle, block_import_auxiliary_data),
273		})
274	}
275}
276
277pub(crate) trait NodeSpec: BaseNodeSpec {
278	type BuildRpcExtensions: BuildRpcExtensions<
279		ParachainClient<Self::Block, Self::RuntimeApi>,
280		ParachainBackend<Self::Block>,
281		TransactionPoolHandle<Self::Block, ParachainClient<Self::Block, Self::RuntimeApi>>,
282		Store,
283	>;
284
285	type StartConsensus: StartConsensus<
286		Self::Block,
287		Self::RuntimeApi,
288		<Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImport,
289		<Self::InitBlockImport as InitBlockImport<Self::Block, Self::RuntimeApi>>::BlockImportAuxiliaryData,
290	>;
291
292	const SYBIL_RESISTANCE: CollatorSybilResistance;
293
294	/// Start a node with the given parachain spec.
295	///
296	/// This is the actual implementation that is abstract over the executor and the runtime api.
297	fn start_node<Net>(
298		parachain_config: Configuration,
299		polkadot_config: Configuration,
300		collator_options: CollatorOptions,
301		hwbench: Option<sc_sysinfo::HwBench>,
302		node_extra_args: NodeExtraArgs,
303	) -> Pin<Box<dyn Future<Output = sc_service::error::Result<TaskManager>>>>
304	where
305		Net: NetworkBackend<Self::Block, Hash>,
306	{
307		let fut = async move {
308			let parachain_config = prepare_node_config(parachain_config);
309			let parachain_public_addresses = parachain_config.network.public_addresses.clone();
310			let parachain_fork_id = parachain_config.chain_spec.fork_id().map(ToString::to_string);
311			let advertise_non_global_ips = parachain_config.network.allow_non_globals_in_dht;
312			let params = Self::new_partial(&parachain_config)?;
313			let (block_import, mut telemetry, telemetry_worker_handle, block_import_auxiliary_data) =
314				params.other;
315			let client = params.client.clone();
316			let backend = params.backend.clone();
317			let mut task_manager = params.task_manager;
318
319			// Resolve parachain id based on runtime, or based on chain spec.
320			let para_id = Self::parachain_id(&client, &parachain_config)
321				.ok_or("Failed to retrieve the parachain id")?;
322			let relay_chain_fork_id = polkadot_config.chain_spec.fork_id().map(ToString::to_string);
323			let (relay_chain_interface, collator_key, relay_chain_network, paranode_rx) =
324				build_relay_chain_interface(
325					polkadot_config,
326					&parachain_config,
327					telemetry_worker_handle,
328					&mut task_manager,
329					collator_options.clone(),
330					hwbench.clone(),
331				)
332				.await
333				.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
334
335			let validator = parachain_config.role.is_authority();
336			let prometheus_registry = parachain_config.prometheus_registry().cloned();
337			let transaction_pool = params.transaction_pool.clone();
338			let import_queue_service = params.import_queue.service();
339			let mut net_config = FullNetworkConfiguration::<_, _, Net>::new(
340				&parachain_config.network,
341				prometheus_registry.clone(),
342			);
343
344			let metrics = Net::register_notification_metrics(
345				parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
346			);
347
348			let statement_handler_proto = node_extra_args.enable_statement_store.then(|| {
349				new_statement_handler_proto(&*client, &parachain_config, &metrics, &mut net_config)
350			});
351
352			let (network, system_rpc_tx, tx_handler_controller, sync_service) =
353				build_network(BuildNetworkParams {
354					parachain_config: &parachain_config,
355					net_config,
356					client: client.clone(),
357					transaction_pool: transaction_pool.clone(),
358					para_id,
359					spawn_handle: task_manager.spawn_handle(),
360					relay_chain_interface: relay_chain_interface.clone(),
361					import_queue: params.import_queue,
362					sybil_resistance_level: Self::SYBIL_RESISTANCE,
363					metrics,
364				})
365				.await?;
366
367			let statement_store = statement_handler_proto
368				.map(|statement_handler_proto| {
369					build_statement_store(
370						&parachain_config,
371						&mut task_manager,
372						client.clone(),
373						network.clone(),
374						sync_service.clone(),
375						params.keystore_container.local_keystore(),
376						statement_handler_proto,
377					)
378				})
379				.transpose()?;
380
381			if parachain_config.offchain_worker.enabled {
382				let custom_extensions = {
383					let statement_store = statement_store.clone();
384					move |_hash| {
385						if let Some(statement_store) = &statement_store {
386							vec![Box::new(statement_store.clone().as_statement_store_ext())
387								as Box<_>]
388						} else {
389							vec![]
390						}
391					}
392				};
393
394				let offchain_workers =
395					sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
396						runtime_api_provider: client.clone(),
397						keystore: Some(params.keystore_container.keystore()),
398						offchain_db: backend.offchain_storage(),
399						transaction_pool: Some(OffchainTransactionPoolFactory::new(
400							transaction_pool.clone(),
401						)),
402						network_provider: Arc::new(network.clone()),
403						is_validator: parachain_config.role.is_authority(),
404						enable_http_requests: true,
405						custom_extensions,
406					})?;
407				task_manager.spawn_handle().spawn(
408					"offchain-workers-runner",
409					"offchain-work",
410					offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(),
411				);
412			}
413
414			let rpc_builder = {
415				let client = client.clone();
416				let transaction_pool = transaction_pool.clone();
417				let backend_for_rpc = backend.clone();
418				let statement_store = statement_store.clone();
419
420				Box::new(move |_| {
421					Self::BuildRpcExtensions::build_rpc_extensions(
422						client.clone(),
423						backend_for_rpc.clone(),
424						transaction_pool.clone(),
425						statement_store.clone(),
426					)
427				})
428			};
429
430			sc_service::spawn_tasks(sc_service::SpawnTasksParams {
431				rpc_builder,
432				client: client.clone(),
433				transaction_pool: transaction_pool.clone(),
434				task_manager: &mut task_manager,
435				config: parachain_config,
436				keystore: params.keystore_container.keystore(),
437				backend: backend.clone(),
438				network: network.clone(),
439				sync_service: sync_service.clone(),
440				system_rpc_tx,
441				tx_handler_controller,
442				telemetry: telemetry.as_mut(),
443			})?;
444
445			if let Some(hwbench) = hwbench {
446				sc_sysinfo::print_hwbench(&hwbench);
447				if validator {
448					warn_if_slow_hardware(&hwbench);
449				}
450
451				if let Some(ref mut telemetry) = telemetry {
452					let telemetry_handle = telemetry.handle();
453					task_manager.spawn_handle().spawn(
454						"telemetry_hwbench",
455						None,
456						sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
457					);
458				}
459			}
460
461			let announce_block = {
462				let sync_service = sync_service.clone();
463				Arc::new(move |hash, data| sync_service.announce_block(hash, data))
464			};
465
466			let relay_chain_slot_duration = Duration::from_secs(6);
467
468			let overseer_handle = relay_chain_interface
469				.overseer_handle()
470				.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
471
472			start_relay_chain_tasks(StartRelayChainTasksParams {
473				client: client.clone(),
474				announce_block: announce_block.clone(),
475				para_id,
476				relay_chain_interface: relay_chain_interface.clone(),
477				task_manager: &mut task_manager,
478				da_recovery_profile: if validator {
479					DARecoveryProfile::Collator
480				} else {
481					DARecoveryProfile::FullNode
482				},
483				import_queue: import_queue_service,
484				relay_chain_slot_duration,
485				recovery_handle: Box::new(overseer_handle.clone()),
486				sync_service,
487				prometheus_registry: prometheus_registry.as_ref(),
488			})?;
489
490			start_bootnode_tasks(StartBootnodeTasksParams {
491				embedded_dht_bootnode: collator_options.embedded_dht_bootnode,
492				dht_bootnode_discovery: collator_options.dht_bootnode_discovery,
493				para_id,
494				task_manager: &mut task_manager,
495				relay_chain_interface: relay_chain_interface.clone(),
496				relay_chain_fork_id,
497				relay_chain_network,
498				request_receiver: paranode_rx,
499				parachain_network: network,
500				advertise_non_global_ips,
501				parachain_genesis_hash: client.chain_info().genesis_hash,
502				parachain_fork_id,
503				parachain_public_addresses,
504			});
505
506			if validator {
507				Self::StartConsensus::start_consensus(
508					client.clone(),
509					block_import,
510					prometheus_registry.as_ref(),
511					telemetry.as_ref().map(|t| t.handle()),
512					&task_manager,
513					relay_chain_interface.clone(),
514					transaction_pool,
515					params.keystore_container.keystore(),
516					relay_chain_slot_duration,
517					para_id,
518					collator_key.expect("Command line arguments do not allow this. qed"),
519					overseer_handle,
520					announce_block,
521					backend.clone(),
522					node_extra_args,
523					block_import_auxiliary_data,
524				)?;
525			}
526
527			Ok(task_manager)
528		};
529
530		Box::pin(Instrument::instrument(
531			fut,
532			sc_tracing::tracing::info_span!(
533				sc_tracing::logging::PREFIX_LOG_SPAN,
534				name = "Parachain"
535			),
536		))
537	}
538}
539
540pub(crate) trait DynNodeSpec: NodeCommandRunner {
541	fn start_node(
542		self: Box<Self>,
543		parachain_config: Configuration,
544		polkadot_config: Configuration,
545		collator_options: CollatorOptions,
546		hwbench: Option<HwBench>,
547		node_extra_args: NodeExtraArgs,
548	) -> Pin<Box<dyn Future<Output = sc_service::error::Result<TaskManager>>>>;
549}
550
551impl<T> DynNodeSpec for T
552where
553	T: NodeSpec + NodeCommandRunner,
554{
555	fn start_node(
556		self: Box<Self>,
557		parachain_config: Configuration,
558		polkadot_config: Configuration,
559		collator_options: CollatorOptions,
560		hwbench: Option<HwBench>,
561		node_extra_args: NodeExtraArgs,
562	) -> Pin<Box<dyn Future<Output = sc_service::error::Result<TaskManager>>>> {
563		match parachain_config.network.network_backend {
564			sc_network::config::NetworkBackendType::Libp2p =>
565				<Self as NodeSpec>::start_node::<sc_network::NetworkWorker<_, _>>(
566					parachain_config,
567					polkadot_config,
568					collator_options,
569					hwbench,
570					node_extra_args,
571				),
572			sc_network::config::NetworkBackendType::Litep2p =>
573				<Self as NodeSpec>::start_node::<sc_network::Litep2pNetworkBackend>(
574					parachain_config,
575					polkadot_config,
576					collator_options,
577					hwbench,
578					node_extra_args,
579				),
580		}
581	}
582}