referrerpolicy=no-referrer-when-downgrade

parachain_template_node/
service.rs

1//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
2
3// std
4use std::{sync::Arc, time::Duration};
5
6// Local Runtime Types
7use parachain_template_runtime::{
8	apis::RuntimeApi,
9	opaque::{Block, Hash},
10};
11
12use polkadot_sdk::*;
13
14// Cumulus Imports
15use cumulus_client_bootnodes::{start_bootnode_tasks, StartBootnodeTasksParams};
16use cumulus_client_cli::CollatorOptions;
17use cumulus_client_collator::service::CollatorService;
18#[docify::export(lookahead_collator)]
19use cumulus_client_consensus_aura::collators::lookahead::{self as aura, Params as AuraParams};
20use cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport;
21use cumulus_client_service::{
22	build_network, build_relay_chain_interface, prepare_node_config, start_relay_chain_tasks,
23	BuildNetworkParams, CollatorSybilResistance, DARecoveryProfile, ParachainHostFunctions,
24	StartRelayChainTasksParams,
25};
26#[docify::export(cumulus_primitives)]
27use cumulus_primitives_core::{
28	relay_chain::{CollatorPair, ValidationCode},
29	GetParachainInfo, ParaId,
30};
31use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface};
32
33// Substrate Imports
34use frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE;
35use prometheus_endpoint::Registry;
36use sc_client_api::Backend;
37use sc_consensus::ImportQueue;
38use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
39use sc_network::{NetworkBackend, NetworkBlock};
40use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager};
41use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
42use sc_transaction_pool_api::OffchainTransactionPoolFactory;
43use sp_api::ProvideRuntimeApi;
44use sp_keystore::KeystorePtr;
45
46#[docify::export(wasm_executor)]
47type ParachainExecutor = WasmExecutor<ParachainHostFunctions>;
48
49type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
50
51type ParachainBackend = TFullBackend<Block>;
52
53type ParachainBlockImport = TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
54
55/// Assembly of PartialComponents (enough to run chain ops subcommands)
56pub type Service = PartialComponents<
57	ParachainClient,
58	ParachainBackend,
59	(),
60	sc_consensus::DefaultImportQueue<Block>,
61	sc_transaction_pool::TransactionPoolHandle<Block, ParachainClient>,
62	(ParachainBlockImport, Option<Telemetry>, Option<TelemetryWorkerHandle>),
63>;
64
65/// Starts a `ServiceBuilder` for a full service.
66///
67/// Use this macro if you don't actually need the full service, but just the builder in order to
68/// be able to perform chain operations.
69#[docify::export(component_instantiation)]
70pub fn new_partial(config: &Configuration) -> Result<Service, sc_service::Error> {
71	let telemetry = config
72		.telemetry_endpoints
73		.clone()
74		.filter(|x| !x.is_empty())
75		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
76			let worker = TelemetryWorker::new(16)?;
77			let telemetry = worker.handle().new_telemetry(endpoints);
78			Ok((worker, telemetry))
79		})
80		.transpose()?;
81
82	let heap_pages = config
83		.executor
84		.default_heap_pages
85		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static { extra_pages: h as _ });
86
87	let executor = ParachainExecutor::builder()
88		.with_execution_method(config.executor.wasm_method)
89		.with_onchain_heap_alloc_strategy(heap_pages)
90		.with_offchain_heap_alloc_strategy(heap_pages)
91		.with_max_runtime_instances(config.executor.max_runtime_instances)
92		.with_runtime_cache_size(config.executor.runtime_cache_size)
93		.build();
94
95	let (client, backend, keystore_container, task_manager) =
96		sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
97			config,
98			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
99			executor,
100			true,
101		)?;
102	let client = Arc::new(client);
103
104	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
105
106	let telemetry = telemetry.map(|(worker, telemetry)| {
107		task_manager.spawn_handle().spawn("telemetry", None, worker.run());
108		telemetry
109	});
110
111	let transaction_pool = Arc::from(
112		sc_transaction_pool::Builder::new(
113			task_manager.spawn_essential_handle(),
114			client.clone(),
115			config.role.is_authority().into(),
116		)
117		.with_options(config.transaction_pool.clone())
118		.with_prometheus(config.prometheus_registry())
119		.build(),
120	);
121
122	let block_import = ParachainBlockImport::new(client.clone(), backend.clone());
123
124	let import_queue = build_import_queue(
125		client.clone(),
126		block_import.clone(),
127		config,
128		telemetry.as_ref().map(|telemetry| telemetry.handle()),
129		&task_manager,
130	);
131
132	Ok(PartialComponents {
133		backend,
134		client,
135		import_queue,
136		keystore_container,
137		task_manager,
138		transaction_pool,
139		select_chain: (),
140		other: (block_import, telemetry, telemetry_worker_handle),
141	})
142}
143
144/// Build the import queue for the parachain runtime.
145fn build_import_queue(
146	client: Arc<ParachainClient>,
147	block_import: ParachainBlockImport,
148	config: &Configuration,
149	telemetry: Option<TelemetryHandle>,
150	task_manager: &TaskManager,
151) -> sc_consensus::DefaultImportQueue<Block> {
152	cumulus_client_consensus_aura::equivocation_import_queue::fully_verifying_import_queue::<
153		sp_consensus_aura::sr25519::AuthorityPair,
154		_,
155		_,
156		_,
157		_,
158	>(
159		client,
160		block_import,
161		move |_, _| async move {
162			let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
163			Ok(timestamp)
164		},
165		&task_manager.spawn_essential_handle(),
166		config.prometheus_registry(),
167		telemetry,
168	)
169}
170
171#[allow(clippy::too_many_arguments)]
172fn start_consensus(
173	client: Arc<ParachainClient>,
174	backend: Arc<ParachainBackend>,
175	block_import: ParachainBlockImport,
176	prometheus_registry: Option<&Registry>,
177	telemetry: Option<TelemetryHandle>,
178	task_manager: &TaskManager,
179	relay_chain_interface: Arc<dyn RelayChainInterface>,
180	transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<Block, ParachainClient>>,
181	keystore: KeystorePtr,
182	relay_chain_slot_duration: Duration,
183	para_id: ParaId,
184	collator_key: CollatorPair,
185	overseer_handle: OverseerHandle,
186	announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
187) -> Result<(), sc_service::Error> {
188	let proposer = sc_basic_authorship::ProposerFactory::with_proof_recording(
189		task_manager.spawn_handle(),
190		client.clone(),
191		transaction_pool,
192		prometheus_registry,
193		telemetry.clone(),
194	);
195
196	let collator_service = CollatorService::new(
197		client.clone(),
198		Arc::new(task_manager.spawn_handle()),
199		announce_block,
200		client.clone(),
201	);
202
203	let params = AuraParams {
204		create_inherent_data_providers: move |_, ()| async move { Ok(()) },
205		block_import,
206		para_client: client.clone(),
207		para_backend: backend,
208		relay_client: relay_chain_interface,
209		code_hash_provider: move |block_hash| {
210			client.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash())
211		},
212		keystore,
213		collator_key,
214		para_id,
215		overseer_handle,
216		relay_chain_slot_duration,
217		proposer,
218		collator_service,
219		authoring_duration: Duration::from_millis(2000),
220		reinitialize: false,
221		max_pov_percentage: None,
222	};
223	let fut = aura::run::<Block, sp_consensus_aura::sr25519::AuthorityPair, _, _, _, _, _, _, _, _>(
224		params,
225	);
226	task_manager.spawn_essential_handle().spawn("aura", None, fut);
227
228	Ok(())
229}
230
231/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
232#[sc_tracing::logging::prefix_logs_with("Parachain")]
233pub async fn start_parachain_node(
234	parachain_config: Configuration,
235	polkadot_config: Configuration,
236	collator_options: CollatorOptions,
237	hwbench: Option<sc_sysinfo::HwBench>,
238) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
239	let parachain_config = prepare_node_config(parachain_config);
240
241	let params = new_partial(&parachain_config)?;
242	let (block_import, mut telemetry, telemetry_worker_handle) = params.other;
243
244	let prometheus_registry = parachain_config.prometheus_registry().cloned();
245	let net_config = sc_network::config::FullNetworkConfiguration::<
246		_,
247		_,
248		sc_network::NetworkWorker<Block, Hash>,
249	>::new(&parachain_config.network, prometheus_registry.clone());
250
251	let client = params.client.clone();
252	let backend = params.backend.clone();
253	let mut task_manager = params.task_manager;
254
255	let relay_chain_fork_id = polkadot_config.chain_spec.fork_id().map(ToString::to_string);
256	let parachain_fork_id = parachain_config.chain_spec.fork_id().map(ToString::to_string);
257	let advertise_non_global_ips = parachain_config.network.allow_non_globals_in_dht;
258	let parachain_public_addresses = parachain_config.network.public_addresses.clone();
259
260	let (relay_chain_interface, collator_key, relay_chain_network, paranode_rx) =
261		build_relay_chain_interface(
262			polkadot_config,
263			&parachain_config,
264			telemetry_worker_handle,
265			&mut task_manager,
266			collator_options.clone(),
267			hwbench.clone(),
268		)
269		.await
270		.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
271
272	let validator = parachain_config.role.is_authority();
273	let transaction_pool = params.transaction_pool.clone();
274	let import_queue_service = params.import_queue.service();
275
276	// Take parachain id from runtime.
277	let best_hash = client.chain_info().best_hash;
278	let para_id = client
279		.runtime_api()
280		.parachain_id(best_hash)
281		.map_err(|_| "Failed to retrieve parachain id from runtime. Make sure you implement `cumulus_primitives_core::GetParachaiNidentity` runtime API.")?;
282
283	// NOTE: because we use Aura here explicitly, we can use `CollatorSybilResistance::Resistant`
284	// when starting the network.
285	let (network, system_rpc_tx, tx_handler_controller, sync_service) =
286		build_network(BuildNetworkParams {
287			parachain_config: &parachain_config,
288			net_config,
289			client: client.clone(),
290			transaction_pool: transaction_pool.clone(),
291			para_id,
292			spawn_handle: task_manager.spawn_handle(),
293			relay_chain_interface: relay_chain_interface.clone(),
294			import_queue: params.import_queue,
295			sybil_resistance_level: CollatorSybilResistance::Resistant, // because of Aura
296			metrics: sc_network::NetworkWorker::<Block, Hash>::register_notification_metrics(
297				parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
298			),
299		})
300		.await?;
301
302	if parachain_config.offchain_worker.enabled {
303		use futures::FutureExt;
304
305		let offchain_workers =
306			sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
307				runtime_api_provider: client.clone(),
308				keystore: Some(params.keystore_container.keystore()),
309				offchain_db: backend.offchain_storage(),
310				transaction_pool: Some(OffchainTransactionPoolFactory::new(
311					transaction_pool.clone(),
312				)),
313				network_provider: Arc::new(network.clone()),
314				is_validator: parachain_config.role.is_authority(),
315				enable_http_requests: false,
316				custom_extensions: move |_| vec![],
317			})?;
318		task_manager.spawn_handle().spawn(
319			"offchain-workers-runner",
320			"offchain-work",
321			offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(),
322		);
323	}
324
325	let rpc_builder = {
326		let client = client.clone();
327		let transaction_pool = transaction_pool.clone();
328
329		Box::new(move |_| {
330			let deps =
331				crate::rpc::FullDeps { client: client.clone(), pool: transaction_pool.clone() };
332
333			crate::rpc::create_full(deps).map_err(Into::into)
334		})
335	};
336
337	sc_service::spawn_tasks(sc_service::SpawnTasksParams {
338		rpc_builder,
339		client: client.clone(),
340		transaction_pool: transaction_pool.clone(),
341		task_manager: &mut task_manager,
342		config: parachain_config,
343		keystore: params.keystore_container.keystore(),
344		backend: backend.clone(),
345		network: network.clone(),
346		sync_service: sync_service.clone(),
347		system_rpc_tx,
348		tx_handler_controller,
349		telemetry: telemetry.as_mut(),
350	})?;
351
352	if let Some(hwbench) = hwbench {
353		sc_sysinfo::print_hwbench(&hwbench);
354		// Here you can check whether the hardware meets your chains' requirements. Putting a link
355		// in there and swapping out the requirements for your own are probably a good idea. The
356		// requirements for a para-chain are dictated by its relay-chain.
357		match SUBSTRATE_REFERENCE_HARDWARE.check_hardware(&hwbench, false) {
358			Err(err) if validator => {
359				log::warn!(
360				"⚠️  The hardware does not meet the minimal requirements {} for role 'Authority'.",
361				err
362			);
363			},
364			_ => {},
365		}
366
367		if let Some(ref mut telemetry) = telemetry {
368			let telemetry_handle = telemetry.handle();
369			task_manager.spawn_handle().spawn(
370				"telemetry_hwbench",
371				None,
372				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
373			);
374		}
375	}
376
377	let announce_block = {
378		let sync_service = sync_service.clone();
379		Arc::new(move |hash, data| sync_service.announce_block(hash, data))
380	};
381
382	let relay_chain_slot_duration = Duration::from_secs(6);
383
384	let overseer_handle = relay_chain_interface
385		.overseer_handle()
386		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
387
388	start_relay_chain_tasks(StartRelayChainTasksParams {
389		client: client.clone(),
390		announce_block: announce_block.clone(),
391		para_id,
392		relay_chain_interface: relay_chain_interface.clone(),
393		task_manager: &mut task_manager,
394		da_recovery_profile: if validator {
395			DARecoveryProfile::Collator
396		} else {
397			DARecoveryProfile::FullNode
398		},
399		import_queue: import_queue_service,
400		relay_chain_slot_duration,
401		recovery_handle: Box::new(overseer_handle.clone()),
402		sync_service: sync_service.clone(),
403		prometheus_registry: prometheus_registry.as_ref(),
404	})?;
405
406	start_bootnode_tasks(StartBootnodeTasksParams {
407		embedded_dht_bootnode: collator_options.embedded_dht_bootnode,
408		dht_bootnode_discovery: collator_options.dht_bootnode_discovery,
409		para_id,
410		task_manager: &mut task_manager,
411		relay_chain_interface: relay_chain_interface.clone(),
412		relay_chain_fork_id,
413		relay_chain_network,
414		request_receiver: paranode_rx,
415		parachain_network: network,
416		advertise_non_global_ips,
417		parachain_genesis_hash: client.chain_info().genesis_hash,
418		parachain_fork_id,
419		parachain_public_addresses,
420	});
421
422	if validator {
423		start_consensus(
424			client.clone(),
425			backend,
426			block_import,
427			prometheus_registry.as_ref(),
428			telemetry.as_ref().map(|t| t.handle()),
429			&task_manager,
430			relay_chain_interface,
431			transaction_pool,
432			params.keystore_container.keystore(),
433			relay_chain_slot_duration,
434			para_id,
435			collator_key.expect("Command line arguments do not allow this. qed"),
436			overseer_handle,
437			announce_block,
438		)?;
439	}
440
441	Ok((task_manager, client))
442}