1use std::{sync::Arc, time::Duration};
5
6use parachain_template_runtime::{
8 apis::RuntimeApi,
9 opaque::{Block, Hash},
10};
11
12use polkadot_sdk::*;
13
14use cumulus_client_bootnodes::{start_bootnode_tasks, StartBootnodeTasksParams};
16use cumulus_client_cli::CollatorOptions;
17use cumulus_client_collator::service::CollatorService;
18#[docify::export(lookahead_collator)]
19use cumulus_client_consensus_aura::collators::lookahead::{self as aura, Params as AuraParams};
20use cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport;
21use cumulus_client_service::{
22 build_network, build_relay_chain_interface, prepare_node_config, start_relay_chain_tasks,
23 BuildNetworkParams, CollatorSybilResistance, DARecoveryProfile, ParachainHostFunctions,
24 StartRelayChainTasksParams,
25};
26#[docify::export(cumulus_primitives)]
27use cumulus_primitives_core::{
28 relay_chain::{CollatorPair, ValidationCode},
29 GetParachainInfo, ParaId,
30};
31use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface};
32
33use frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE;
35use prometheus_endpoint::Registry;
36use sc_client_api::Backend;
37use sc_consensus::ImportQueue;
38use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
39use sc_network::{NetworkBackend, NetworkBlock};
40use sc_service::{Configuration, PartialComponents, TFullBackend, TFullClient, TaskManager};
41use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
42use sc_transaction_pool_api::OffchainTransactionPoolFactory;
43use sp_api::ProvideRuntimeApi;
44use sp_keystore::KeystorePtr;
45
46#[docify::export(wasm_executor)]
47type ParachainExecutor = WasmExecutor<ParachainHostFunctions>;
48
49type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
50
51type ParachainBackend = TFullBackend<Block>;
52
53type ParachainBlockImport = TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
54
55pub type Service = PartialComponents<
57 ParachainClient,
58 ParachainBackend,
59 (),
60 sc_consensus::DefaultImportQueue<Block>,
61 sc_transaction_pool::TransactionPoolHandle<Block, ParachainClient>,
62 (ParachainBlockImport, Option<Telemetry>, Option<TelemetryWorkerHandle>),
63>;
64
65#[docify::export(component_instantiation)]
70pub fn new_partial(config: &Configuration) -> Result<Service, sc_service::Error> {
71 let telemetry = config
72 .telemetry_endpoints
73 .clone()
74 .filter(|x| !x.is_empty())
75 .map(|endpoints| -> Result<_, sc_telemetry::Error> {
76 let worker = TelemetryWorker::new(16)?;
77 let telemetry = worker.handle().new_telemetry(endpoints);
78 Ok((worker, telemetry))
79 })
80 .transpose()?;
81
82 let heap_pages = config
83 .executor
84 .default_heap_pages
85 .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static { extra_pages: h as _ });
86
87 let executor = ParachainExecutor::builder()
88 .with_execution_method(config.executor.wasm_method)
89 .with_onchain_heap_alloc_strategy(heap_pages)
90 .with_offchain_heap_alloc_strategy(heap_pages)
91 .with_max_runtime_instances(config.executor.max_runtime_instances)
92 .with_runtime_cache_size(config.executor.runtime_cache_size)
93 .build();
94
95 let (client, backend, keystore_container, task_manager) =
96 sc_service::new_full_parts_record_import::<Block, RuntimeApi, _>(
97 config,
98 telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
99 executor,
100 true,
101 )?;
102 let client = Arc::new(client);
103
104 let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
105
106 let telemetry = telemetry.map(|(worker, telemetry)| {
107 task_manager.spawn_handle().spawn("telemetry", None, worker.run());
108 telemetry
109 });
110
111 let transaction_pool = Arc::from(
112 sc_transaction_pool::Builder::new(
113 task_manager.spawn_essential_handle(),
114 client.clone(),
115 config.role.is_authority().into(),
116 )
117 .with_options(config.transaction_pool.clone())
118 .with_prometheus(config.prometheus_registry())
119 .build(),
120 );
121
122 let block_import = ParachainBlockImport::new(client.clone(), backend.clone());
123
124 let import_queue = build_import_queue(
125 client.clone(),
126 block_import.clone(),
127 config,
128 telemetry.as_ref().map(|telemetry| telemetry.handle()),
129 &task_manager,
130 );
131
132 Ok(PartialComponents {
133 backend,
134 client,
135 import_queue,
136 keystore_container,
137 task_manager,
138 transaction_pool,
139 select_chain: (),
140 other: (block_import, telemetry, telemetry_worker_handle),
141 })
142}
143
144fn build_import_queue(
146 client: Arc<ParachainClient>,
147 block_import: ParachainBlockImport,
148 config: &Configuration,
149 telemetry: Option<TelemetryHandle>,
150 task_manager: &TaskManager,
151) -> sc_consensus::DefaultImportQueue<Block> {
152 cumulus_client_consensus_aura::equivocation_import_queue::fully_verifying_import_queue::<
153 sp_consensus_aura::sr25519::AuthorityPair,
154 _,
155 _,
156 _,
157 _,
158 >(
159 client,
160 block_import,
161 move |_, _| async move {
162 let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
163 Ok(timestamp)
164 },
165 &task_manager.spawn_essential_handle(),
166 config.prometheus_registry(),
167 telemetry,
168 )
169}
170
171#[allow(clippy::too_many_arguments)]
172fn start_consensus(
173 client: Arc<ParachainClient>,
174 backend: Arc<ParachainBackend>,
175 block_import: ParachainBlockImport,
176 prometheus_registry: Option<&Registry>,
177 telemetry: Option<TelemetryHandle>,
178 task_manager: &TaskManager,
179 relay_chain_interface: Arc<dyn RelayChainInterface>,
180 transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<Block, ParachainClient>>,
181 keystore: KeystorePtr,
182 relay_chain_slot_duration: Duration,
183 para_id: ParaId,
184 collator_key: CollatorPair,
185 overseer_handle: OverseerHandle,
186 announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
187) -> Result<(), sc_service::Error> {
188 let proposer = sc_basic_authorship::ProposerFactory::with_proof_recording(
189 task_manager.spawn_handle(),
190 client.clone(),
191 transaction_pool,
192 prometheus_registry,
193 telemetry.clone(),
194 );
195
196 let collator_service = CollatorService::new(
197 client.clone(),
198 Arc::new(task_manager.spawn_handle()),
199 announce_block,
200 client.clone(),
201 );
202
203 let params = AuraParams {
204 create_inherent_data_providers: move |_, ()| async move { Ok(()) },
205 block_import,
206 para_client: client.clone(),
207 para_backend: backend,
208 relay_client: relay_chain_interface,
209 code_hash_provider: move |block_hash| {
210 client.code_at(block_hash).ok().map(|c| ValidationCode::from(c).hash())
211 },
212 keystore,
213 collator_key,
214 para_id,
215 overseer_handle,
216 relay_chain_slot_duration,
217 proposer,
218 collator_service,
219 authoring_duration: Duration::from_millis(2000),
220 reinitialize: false,
221 max_pov_percentage: None,
222 };
223 let fut = aura::run::<Block, sp_consensus_aura::sr25519::AuthorityPair, _, _, _, _, _, _, _, _>(
224 params,
225 );
226 task_manager.spawn_essential_handle().spawn("aura", None, fut);
227
228 Ok(())
229}
230
231#[sc_tracing::logging::prefix_logs_with("Parachain")]
233pub async fn start_parachain_node(
234 parachain_config: Configuration,
235 polkadot_config: Configuration,
236 collator_options: CollatorOptions,
237 hwbench: Option<sc_sysinfo::HwBench>,
238) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
239 let parachain_config = prepare_node_config(parachain_config);
240
241 let params = new_partial(¶chain_config)?;
242 let (block_import, mut telemetry, telemetry_worker_handle) = params.other;
243
244 let prometheus_registry = parachain_config.prometheus_registry().cloned();
245 let net_config = sc_network::config::FullNetworkConfiguration::<
246 _,
247 _,
248 sc_network::NetworkWorker<Block, Hash>,
249 >::new(¶chain_config.network, prometheus_registry.clone());
250
251 let client = params.client.clone();
252 let backend = params.backend.clone();
253 let mut task_manager = params.task_manager;
254
255 let relay_chain_fork_id = polkadot_config.chain_spec.fork_id().map(ToString::to_string);
256 let parachain_fork_id = parachain_config.chain_spec.fork_id().map(ToString::to_string);
257 let advertise_non_global_ips = parachain_config.network.allow_non_globals_in_dht;
258 let parachain_public_addresses = parachain_config.network.public_addresses.clone();
259
260 let (relay_chain_interface, collator_key, relay_chain_network, paranode_rx) =
261 build_relay_chain_interface(
262 polkadot_config,
263 ¶chain_config,
264 telemetry_worker_handle,
265 &mut task_manager,
266 collator_options.clone(),
267 hwbench.clone(),
268 )
269 .await
270 .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
271
272 let validator = parachain_config.role.is_authority();
273 let transaction_pool = params.transaction_pool.clone();
274 let import_queue_service = params.import_queue.service();
275
276 let best_hash = client.chain_info().best_hash;
278 let para_id = client
279 .runtime_api()
280 .parachain_id(best_hash)
281 .map_err(|_| "Failed to retrieve parachain id from runtime. Make sure you implement `cumulus_primitives_core::GetParachaiNidentity` runtime API.")?;
282
283 let (network, system_rpc_tx, tx_handler_controller, sync_service) =
286 build_network(BuildNetworkParams {
287 parachain_config: ¶chain_config,
288 net_config,
289 client: client.clone(),
290 transaction_pool: transaction_pool.clone(),
291 para_id,
292 spawn_handle: task_manager.spawn_handle(),
293 relay_chain_interface: relay_chain_interface.clone(),
294 import_queue: params.import_queue,
295 sybil_resistance_level: CollatorSybilResistance::Resistant, metrics: sc_network::NetworkWorker::<Block, Hash>::register_notification_metrics(
297 parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
298 ),
299 })
300 .await?;
301
302 if parachain_config.offchain_worker.enabled {
303 use futures::FutureExt;
304
305 let offchain_workers =
306 sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
307 runtime_api_provider: client.clone(),
308 keystore: Some(params.keystore_container.keystore()),
309 offchain_db: backend.offchain_storage(),
310 transaction_pool: Some(OffchainTransactionPoolFactory::new(
311 transaction_pool.clone(),
312 )),
313 network_provider: Arc::new(network.clone()),
314 is_validator: parachain_config.role.is_authority(),
315 enable_http_requests: false,
316 custom_extensions: move |_| vec![],
317 })?;
318 task_manager.spawn_handle().spawn(
319 "offchain-workers-runner",
320 "offchain-work",
321 offchain_workers.run(client.clone(), task_manager.spawn_handle()).boxed(),
322 );
323 }
324
325 let rpc_builder = {
326 let client = client.clone();
327 let transaction_pool = transaction_pool.clone();
328
329 Box::new(move |_| {
330 let deps =
331 crate::rpc::FullDeps { client: client.clone(), pool: transaction_pool.clone() };
332
333 crate::rpc::create_full(deps).map_err(Into::into)
334 })
335 };
336
337 sc_service::spawn_tasks(sc_service::SpawnTasksParams {
338 rpc_builder,
339 client: client.clone(),
340 transaction_pool: transaction_pool.clone(),
341 task_manager: &mut task_manager,
342 config: parachain_config,
343 keystore: params.keystore_container.keystore(),
344 backend: backend.clone(),
345 network: network.clone(),
346 sync_service: sync_service.clone(),
347 system_rpc_tx,
348 tx_handler_controller,
349 telemetry: telemetry.as_mut(),
350 })?;
351
352 if let Some(hwbench) = hwbench {
353 sc_sysinfo::print_hwbench(&hwbench);
354 match SUBSTRATE_REFERENCE_HARDWARE.check_hardware(&hwbench, false) {
358 Err(err) if validator => {
359 log::warn!(
360 "⚠️ The hardware does not meet the minimal requirements {} for role 'Authority'.",
361 err
362 );
363 },
364 _ => {},
365 }
366
367 if let Some(ref mut telemetry) = telemetry {
368 let telemetry_handle = telemetry.handle();
369 task_manager.spawn_handle().spawn(
370 "telemetry_hwbench",
371 None,
372 sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
373 );
374 }
375 }
376
377 let announce_block = {
378 let sync_service = sync_service.clone();
379 Arc::new(move |hash, data| sync_service.announce_block(hash, data))
380 };
381
382 let relay_chain_slot_duration = Duration::from_secs(6);
383
384 let overseer_handle = relay_chain_interface
385 .overseer_handle()
386 .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
387
388 start_relay_chain_tasks(StartRelayChainTasksParams {
389 client: client.clone(),
390 announce_block: announce_block.clone(),
391 para_id,
392 relay_chain_interface: relay_chain_interface.clone(),
393 task_manager: &mut task_manager,
394 da_recovery_profile: if validator {
395 DARecoveryProfile::Collator
396 } else {
397 DARecoveryProfile::FullNode
398 },
399 import_queue: import_queue_service,
400 relay_chain_slot_duration,
401 recovery_handle: Box::new(overseer_handle.clone()),
402 sync_service: sync_service.clone(),
403 prometheus_registry: prometheus_registry.as_ref(),
404 })?;
405
406 start_bootnode_tasks(StartBootnodeTasksParams {
407 embedded_dht_bootnode: collator_options.embedded_dht_bootnode,
408 dht_bootnode_discovery: collator_options.dht_bootnode_discovery,
409 para_id,
410 task_manager: &mut task_manager,
411 relay_chain_interface: relay_chain_interface.clone(),
412 relay_chain_fork_id,
413 relay_chain_network,
414 request_receiver: paranode_rx,
415 parachain_network: network,
416 advertise_non_global_ips,
417 parachain_genesis_hash: client.chain_info().genesis_hash,
418 parachain_fork_id,
419 parachain_public_addresses,
420 });
421
422 if validator {
423 start_consensus(
424 client.clone(),
425 backend,
426 block_import,
427 prometheus_registry.as_ref(),
428 telemetry.as_ref().map(|t| t.handle()),
429 &task_manager,
430 relay_chain_interface,
431 transaction_pool,
432 params.keystore_container.keystore(),
433 relay_chain_slot_duration,
434 para_id,
435 collator_key.expect("Command line arguments do not allow this. qed"),
436 overseer_handle,
437 announce_block,
438 )?;
439 }
440
441 Ok((task_manager, client))
442}