referrerpolicy=no-referrer-when-downgrade

sc_service/
builder.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	build_network_future, build_system_rpc_future,
21	client::{Client, ClientConfig},
22	config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
23	error::Error,
24	metrics::MetricsService,
25	start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
26	TaskManager, TransactionPoolAdapter,
27};
28use futures::{select, FutureExt, StreamExt};
29use jsonrpsee::RpcModule;
30use log::{debug, error, info};
31use prometheus_endpoint::Registry;
32use sc_chain_spec::{get_extension, ChainSpec};
33use sc_client_api::{
34	execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
35	BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, KeysIter, StorageProvider,
36	TrieCacheContext, UsageProvider,
37};
38use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
39use sc_consensus::import_queue::{ImportQueue, ImportQueueService};
40use sc_executor::{
41	sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
42	WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
43};
44use sc_keystore::LocalKeystore;
45use sc_network::{
46	config::{FullNetworkConfiguration, ProtocolId, SyncMode},
47	multiaddr::Protocol,
48	service::{
49		traits::{PeerStore, RequestResponseConfig},
50		NotificationMetrics,
51	},
52	NetworkBackend, NetworkStateInfo,
53};
54use sc_network_common::role::{Role, Roles};
55use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
56use sc_network_sync::{
57	block_relay_protocol::{BlockDownloader, BlockRelayParams},
58	block_request_handler::BlockRequestHandler,
59	engine::SyncingEngine,
60	service::network::{NetworkServiceHandle, NetworkServiceProvider},
61	state_request_handler::StateRequestHandler,
62	strategy::{
63		polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
64		SyncingStrategy,
65	},
66	warp_request_handler::RequestHandler as WarpSyncRequestHandler,
67	SyncingService, WarpSyncConfig,
68};
69use sc_rpc::{
70	author::AuthorApiServer,
71	chain::ChainApiServer,
72	offchain::OffchainApiServer,
73	state::{ChildStateApiServer, StateApiServer},
74	system::SystemApiServer,
75	DenyUnsafe, SubscriptionTaskExecutor,
76};
77use sc_rpc_spec_v2::{
78	archive::ArchiveApiServer,
79	chain_head::ChainHeadApiServer,
80	chain_spec::ChainSpecApiServer,
81	transaction::{TransactionApiServer, TransactionBroadcastApiServer},
82};
83use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
84use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
85use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
86use sp_api::{CallApiAt, ProvideRuntimeApi};
87use sp_blockchain::{HeaderBackend, HeaderMetadata};
88use sp_consensus::block_validation::{
89	BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
90};
91use sp_core::traits::{CodeExecutor, SpawnNamed};
92use sp_keystore::KeystorePtr;
93use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
94use sp_storage::{ChildInfo, ChildType, PrefixedStorageKey};
95use std::{
96	str::FromStr,
97	sync::Arc,
98	time::{Duration, SystemTime},
99};
100
101/// Full client type.
102pub type TFullClient<TBl, TRtApi, TExec> =
103	Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
104
105/// Full client backend type.
106pub type TFullBackend<TBl> = Backend<TBl>;
107
108/// Full client call executor type.
109pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
110
111type TFullParts<TBl, TRtApi, TExec> =
112	(TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
113
114/// Construct a local keystore shareable container
115pub struct KeystoreContainer(Arc<LocalKeystore>);
116
117impl KeystoreContainer {
118	/// Construct KeystoreContainer
119	pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
120		let keystore = Arc::new(match config {
121			KeystoreConfig::Path { path, password } =>
122				LocalKeystore::open(path.clone(), password.clone())?,
123			KeystoreConfig::InMemory => LocalKeystore::in_memory(),
124		});
125
126		Ok(Self(keystore))
127	}
128
129	/// Returns a shared reference to a dynamic `Keystore` trait implementation.
130	pub fn keystore(&self) -> KeystorePtr {
131		self.0.clone()
132	}
133
134	/// Returns a shared reference to the local keystore .
135	pub fn local_keystore(&self) -> Arc<LocalKeystore> {
136		self.0.clone()
137	}
138}
139
140/// Creates a new full client for the given config.
141pub fn new_full_client<TBl, TRtApi, TExec>(
142	config: &Configuration,
143	telemetry: Option<TelemetryHandle>,
144	executor: TExec,
145) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
146where
147	TBl: BlockT,
148	TExec: CodeExecutor + RuntimeVersionOf + Clone,
149{
150	new_full_parts(config, telemetry, executor).map(|parts| parts.0)
151}
152
153/// Create the initial parts of a full node with the default genesis block builder.
154pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
155	config: &Configuration,
156	telemetry: Option<TelemetryHandle>,
157	executor: TExec,
158	enable_import_proof_recording: bool,
159) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
160where
161	TBl: BlockT,
162	TExec: CodeExecutor + RuntimeVersionOf + Clone,
163{
164	let backend = new_db_backend(config.db_config())?;
165
166	let genesis_block_builder = GenesisBlockBuilder::new(
167		config.chain_spec.as_storage_builder(),
168		!config.no_genesis(),
169		backend.clone(),
170		executor.clone(),
171	)?;
172
173	new_full_parts_with_genesis_builder(
174		config,
175		telemetry,
176		executor,
177		backend,
178		genesis_block_builder,
179		enable_import_proof_recording,
180	)
181}
182/// Create the initial parts of a full node with the default genesis block builder.
183pub fn new_full_parts<TBl, TRtApi, TExec>(
184	config: &Configuration,
185	telemetry: Option<TelemetryHandle>,
186	executor: TExec,
187) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
188where
189	TBl: BlockT,
190	TExec: CodeExecutor + RuntimeVersionOf + Clone,
191{
192	new_full_parts_record_import(config, telemetry, executor, false)
193}
194
195/// Create the initial parts of a full node.
196pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
197	config: &Configuration,
198	telemetry: Option<TelemetryHandle>,
199	executor: TExec,
200	backend: Arc<TFullBackend<TBl>>,
201	genesis_block_builder: TBuildGenesisBlock,
202	enable_import_proof_recording: bool,
203) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
204where
205	TBl: BlockT,
206	TExec: CodeExecutor + RuntimeVersionOf + Clone,
207	TBuildGenesisBlock: BuildGenesisBlock<
208		TBl,
209		BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
210	>,
211{
212	let keystore_container = KeystoreContainer::new(&config.keystore)?;
213
214	let task_manager = {
215		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
216		TaskManager::new(config.tokio_handle.clone(), registry)?
217	};
218
219	let chain_spec = &config.chain_spec;
220	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
221		.cloned()
222		.unwrap_or_default();
223
224	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
225		.cloned()
226		.unwrap_or_default();
227
228	let client = {
229		let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
230
231		let wasm_runtime_substitutes = config
232			.chain_spec
233			.code_substitutes()
234			.into_iter()
235			.map(|(n, c)| {
236				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
237					Error::Application(Box::from(format!(
238						"Failed to parse `{}` as block number for code substitutes. \
239						 In an old version the key for code substitute was a block hash. \
240						 Please update the chain spec to a version that is compatible with your node.",
241						n
242					)))
243				})?;
244				Ok((number, c))
245			})
246			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
247
248		let client = new_client(
249			backend.clone(),
250			executor,
251			genesis_block_builder,
252			fork_blocks,
253			bad_blocks,
254			extensions,
255			Box::new(task_manager.spawn_handle()),
256			config.prometheus_config.as_ref().map(|config| config.registry.clone()),
257			telemetry,
258			ClientConfig {
259				offchain_worker_enabled: config.offchain_worker.enabled,
260				offchain_indexing_api: config.offchain_worker.indexing_enabled,
261				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
262				no_genesis: config.no_genesis(),
263				wasm_runtime_substitutes,
264				enable_import_proof_recording,
265			},
266		)?;
267
268		if let Some(warm_up_strategy) = config.warm_up_trie_cache {
269			let storage_root = client.usage_info().chain.best_hash;
270			let backend_clone = backend.clone();
271
272			if warm_up_strategy.is_blocking() {
273				// We use the blocking strategy for testing purposes.
274				// So better to error out if it fails.
275				warm_up_trie_cache(backend_clone, storage_root)?;
276			} else {
277				task_manager.spawn_handle().spawn_blocking(
278					"warm-up-trie-cache",
279					None,
280					async move {
281						if let Err(e) = warm_up_trie_cache(backend_clone, storage_root) {
282							error!("Failed to warm up trie cache: {e}");
283						}
284					},
285				);
286			}
287		}
288
289		client
290	};
291
292	Ok((client, backend, keystore_container, task_manager))
293}
294
295fn child_info(key: Vec<u8>) -> Option<ChildInfo> {
296	let prefixed_key = PrefixedStorageKey::new(key);
297	ChildType::from_prefixed_key(&prefixed_key).and_then(|(child_type, storage_key)| {
298		(child_type == ChildType::ParentKeyId).then(|| ChildInfo::new_default(storage_key))
299	})
300}
301
302fn warm_up_trie_cache<TBl: BlockT>(
303	backend: Arc<TFullBackend<TBl>>,
304	storage_root: TBl::Hash,
305) -> Result<(), Error> {
306	use sc_client_api::backend::Backend;
307	use sp_state_machine::Backend as StateBackend;
308
309	let untrusted_state = || backend.state_at(storage_root, TrieCacheContext::Untrusted);
310	let trusted_state = || backend.state_at(storage_root, TrieCacheContext::Trusted);
311
312	debug!("Populating trie cache started",);
313	let start_time = std::time::Instant::now();
314	let mut keys_count = 0;
315	let mut child_keys_count = 0;
316	for key in KeysIter::<_, TBl>::new(untrusted_state()?, None, None)? {
317		if keys_count != 0 && keys_count % 100_000 == 0 {
318			debug!("{} keys and {} child keys have been warmed", keys_count, child_keys_count);
319		}
320		match child_info(key.0.clone()) {
321			Some(info) => {
322				for child_key in
323					KeysIter::<_, TBl>::new_child(untrusted_state()?, info.clone(), None, None)?
324				{
325					if trusted_state()?
326						.child_storage(&info, &child_key.0)
327						.unwrap_or_default()
328						.is_none()
329					{
330						debug!("Child storage value unexpectedly empty: {child_key:?}");
331					}
332					child_keys_count += 1;
333				}
334			},
335			None => {
336				if trusted_state()?.storage(&key.0).unwrap_or_default().is_none() {
337					debug!("Storage value unexpectedly empty: {key:?}");
338				}
339				keys_count += 1;
340			},
341		}
342	}
343	debug!(
344		"Trie cache populated with {keys_count} keys and {child_keys_count} child keys in {} s",
345		start_time.elapsed().as_secs_f32()
346	);
347
348	Ok(())
349}
350
351/// Creates a [`NativeElseWasmExecutor`](sc_executor::NativeElseWasmExecutor) according to
352/// [`Configuration`].
353#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
354#[allow(deprecated)]
355pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
356	config: &Configuration,
357) -> sc_executor::NativeElseWasmExecutor<D> {
358	#[allow(deprecated)]
359	sc_executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(&config.executor))
360}
361
362/// Creates a [`WasmExecutor`] according to [`ExecutorConfiguration`].
363pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
364	let strategy = config
365		.default_heap_pages
366		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
367	WasmExecutor::<H>::builder()
368		.with_execution_method(config.wasm_method)
369		.with_onchain_heap_alloc_strategy(strategy)
370		.with_offchain_heap_alloc_strategy(strategy)
371		.with_max_runtime_instances(config.max_runtime_instances)
372		.with_runtime_cache_size(config.runtime_cache_size)
373		.build()
374}
375
376/// Create an instance of default DB-backend backend.
377pub fn new_db_backend<Block>(
378	settings: DatabaseSettings,
379) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
380where
381	Block: BlockT,
382{
383	const CANONICALIZATION_DELAY: u64 = 4096;
384
385	Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
386}
387
388/// Create an instance of client backed by given backend.
389pub fn new_client<E, Block, RA, G>(
390	backend: Arc<Backend<Block>>,
391	executor: E,
392	genesis_block_builder: G,
393	fork_blocks: ForkBlocks<Block>,
394	bad_blocks: BadBlocks<Block>,
395	execution_extensions: ExecutionExtensions<Block>,
396	spawn_handle: Box<dyn SpawnNamed>,
397	prometheus_registry: Option<Registry>,
398	telemetry: Option<TelemetryHandle>,
399	config: ClientConfig<Block>,
400) -> Result<
401	Client<
402		Backend<Block>,
403		crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
404		Block,
405		RA,
406	>,
407	sp_blockchain::Error,
408>
409where
410	Block: BlockT,
411	E: CodeExecutor + RuntimeVersionOf,
412	G: BuildGenesisBlock<
413		Block,
414		BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
415	>,
416{
417	let executor = crate::client::LocalCallExecutor::new(
418		backend.clone(),
419		executor,
420		config.clone(),
421		execution_extensions,
422	)?;
423
424	Client::new(
425		backend,
426		executor,
427		spawn_handle,
428		genesis_block_builder,
429		fork_blocks,
430		bad_blocks,
431		prometheus_registry,
432		telemetry,
433		config,
434	)
435}
436
437/// Parameters to pass into `build`.
438pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
439	/// The service configuration.
440	pub config: Configuration,
441	/// A shared client returned by `new_full_parts`.
442	pub client: Arc<TCl>,
443	/// A shared backend returned by `new_full_parts`.
444	pub backend: Arc<Backend>,
445	/// A task manager returned by `new_full_parts`.
446	pub task_manager: &'a mut TaskManager,
447	/// A shared keystore returned by `new_full_parts`.
448	pub keystore: KeystorePtr,
449	/// A shared transaction pool.
450	pub transaction_pool: Arc<TExPool>,
451	/// Builds additional [`RpcModule`]s that should be added to the server
452	pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
453	/// A shared network instance.
454	pub network: Arc<dyn sc_network::service::traits::NetworkService>,
455	/// A Sender for RPC requests.
456	pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
457	/// Controller for transactions handlers
458	pub tx_handler_controller:
459		sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
460	/// Syncing service.
461	pub sync_service: Arc<SyncingService<TBl>>,
462	/// Telemetry instance for this node.
463	pub telemetry: Option<&'a mut Telemetry>,
464}
465
466/// Spawn the tasks that are required to run a node.
467pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
468	params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
469) -> Result<RpcHandlers, Error>
470where
471	TCl: ProvideRuntimeApi<TBl>
472		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
473		+ Chain<TBl>
474		+ BlockBackend<TBl>
475		+ BlockIdTo<TBl, Error = sp_blockchain::Error>
476		+ ProofProvider<TBl>
477		+ HeaderBackend<TBl>
478		+ BlockchainEvents<TBl>
479		+ ExecutorProvider<TBl>
480		+ UsageProvider<TBl>
481		+ StorageProvider<TBl, TBackend>
482		+ CallApiAt<TBl>
483		+ Send
484		+ 'static,
485	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
486		+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
487		+ sp_session::SessionKeys<TBl>
488		+ sp_api::ApiExt<TBl>,
489	TBl: BlockT,
490	TBl::Hash: Unpin,
491	TBl::Header: Unpin,
492	TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
493	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
494{
495	let SpawnTasksParams {
496		mut config,
497		task_manager,
498		client,
499		backend,
500		keystore,
501		transaction_pool,
502		rpc_builder,
503		network,
504		system_rpc_tx,
505		tx_handler_controller,
506		sync_service,
507		telemetry,
508	} = params;
509
510	let chain_info = client.usage_info().chain;
511
512	sp_session::generate_initial_session_keys(
513		client.clone(),
514		chain_info.best_hash,
515		config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
516		keystore.clone(),
517	)
518	.map_err(|e| Error::Application(Box::new(e)))?;
519
520	let sysinfo = sc_sysinfo::gather_sysinfo();
521	sc_sysinfo::print_sysinfo(&sysinfo);
522
523	let telemetry = telemetry
524		.map(|telemetry| {
525			init_telemetry(
526				config.network.node_name.clone(),
527				config.impl_name.clone(),
528				config.impl_version.clone(),
529				config.chain_spec.name().to_string(),
530				config.role.is_authority(),
531				network.clone(),
532				client.clone(),
533				telemetry,
534				Some(sysinfo),
535			)
536		})
537		.transpose()?;
538
539	info!("📦 Highest known block at #{}", chain_info.best_number);
540
541	let spawn_handle = task_manager.spawn_handle();
542
543	// Inform the tx pool about imported and finalized blocks.
544	spawn_handle.spawn(
545		"txpool-notifications",
546		Some("transaction-pool"),
547		sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
548	);
549
550	spawn_handle.spawn(
551		"on-transaction-imported",
552		Some("transaction-pool"),
553		propagate_transaction_notifications(
554			transaction_pool.clone(),
555			tx_handler_controller,
556			telemetry.clone(),
557		),
558	);
559
560	// Prometheus metrics.
561	let metrics_service =
562		if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
563			// Set static metrics.
564			let metrics = MetricsService::with_prometheus(
565				telemetry,
566				&registry,
567				config.role,
568				&config.network.node_name,
569				&config.impl_version,
570			)?;
571			spawn_handle.spawn(
572				"prometheus-endpoint",
573				None,
574				prometheus_endpoint::init_prometheus(port, registry).map(drop),
575			);
576
577			metrics
578		} else {
579			MetricsService::new(telemetry)
580		};
581
582	// Periodically updated metrics and telemetry updates.
583	spawn_handle.spawn(
584		"telemetry-periodic-send",
585		None,
586		metrics_service.run(
587			client.clone(),
588			transaction_pool.clone(),
589			network.clone(),
590			sync_service.clone(),
591		),
592	);
593
594	let rpc_id_provider = config.rpc.id_provider.take();
595
596	// jsonrpsee RPC
597	// RPC-V2 specific metrics need to be registered before the RPC server is started,
598	// since we might have two instances running (one for the in-memory RPC and one for the network
599	// RPC).
600	let rpc_v2_metrics = config
601		.prometheus_registry()
602		.map(|registry| sc_rpc_spec_v2::transaction::TransactionMetrics::new(registry))
603		.transpose()?;
604
605	let gen_rpc_module = || {
606		gen_rpc_module(
607			task_manager.spawn_handle(),
608			client.clone(),
609			transaction_pool.clone(),
610			keystore.clone(),
611			system_rpc_tx.clone(),
612			config.impl_name.clone(),
613			config.impl_version.clone(),
614			config.chain_spec.as_ref(),
615			&config.state_pruning,
616			config.blocks_pruning,
617			backend.clone(),
618			&*rpc_builder,
619			rpc_v2_metrics.clone(),
620		)
621	};
622
623	let rpc_server_handle = start_rpc_servers(
624		&config.rpc,
625		config.prometheus_registry(),
626		&config.tokio_handle,
627		gen_rpc_module,
628		rpc_id_provider,
629	)?;
630
631	let listen_addrs = rpc_server_handle
632		.listen_addrs()
633		.into_iter()
634		.map(|socket_addr| {
635			let mut multiaddr: Multiaddr = socket_addr.ip().into();
636			multiaddr.push(Protocol::Tcp(socket_addr.port()));
637			multiaddr
638		})
639		.collect();
640
641	let in_memory_rpc = {
642		let mut module = gen_rpc_module()?;
643		module.extensions_mut().insert(DenyUnsafe::No);
644		module
645	};
646
647	let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
648
649	// Spawn informant task
650	spawn_handle.spawn(
651		"informant",
652		None,
653		sc_informant::build(client.clone(), network, sync_service.clone()),
654	);
655
656	task_manager.keep_alive((config.base_path, rpc_server_handle));
657
658	Ok(in_memory_rpc_handle)
659}
660
661/// Returns a future that forwards imported transactions to the transaction networking protocol.
662pub async fn propagate_transaction_notifications<Block, ExPool>(
663	transaction_pool: Arc<ExPool>,
664	tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
665		<Block as BlockT>::Hash,
666	>,
667	telemetry: Option<TelemetryHandle>,
668) where
669	Block: BlockT,
670	ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
671{
672	const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1);
673
674	// transaction notifications
675	let mut notifications = transaction_pool.import_notification_stream().fuse();
676	let mut timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
677	let mut tx_imported = false;
678
679	loop {
680		select! {
681			notification = notifications.next() => {
682				let Some(hash) = notification else { return };
683
684				tx_handler_controller.propagate_transaction(hash);
685
686				tx_imported = true;
687			},
688			_ = timer => {
689				timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
690
691				if !tx_imported {
692					continue;
693				}
694
695				tx_imported = false;
696				let status = transaction_pool.status();
697
698				telemetry!(
699					telemetry;
700					SUBSTRATE_INFO;
701					"txpool.import";
702					"ready" => status.ready,
703					"future" => status.future,
704				);
705			}
706		}
707	}
708}
709
710/// Initialize telemetry with provided configuration and return telemetry handle
711pub fn init_telemetry<Block, Client, Network>(
712	name: String,
713	implementation: String,
714	version: String,
715	chain: String,
716	authority: bool,
717	network: Network,
718	client: Arc<Client>,
719	telemetry: &mut Telemetry,
720	sysinfo: Option<sc_telemetry::SysInfo>,
721) -> sc_telemetry::Result<TelemetryHandle>
722where
723	Block: BlockT,
724	Client: BlockBackend<Block>,
725	Network: NetworkStateInfo,
726{
727	let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
728	let connection_message = ConnectionMessage {
729		name,
730		implementation,
731		version,
732		target_os: sc_sysinfo::TARGET_OS.into(),
733		target_arch: sc_sysinfo::TARGET_ARCH.into(),
734		target_env: sc_sysinfo::TARGET_ENV.into(),
735		config: String::new(),
736		chain,
737		genesis_hash: format!("{:?}", genesis_hash),
738		authority,
739		startup_time: SystemTime::UNIX_EPOCH
740			.elapsed()
741			.map(|dur| dur.as_millis())
742			.unwrap_or(0)
743			.to_string(),
744		network_id: network.local_peer_id().to_base58(),
745		sysinfo,
746	};
747
748	telemetry.start_telemetry(connection_message)?;
749
750	Ok(telemetry.handle())
751}
752
753/// Generate RPC module using provided configuration
754pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
755	spawn_handle: SpawnTaskHandle,
756	client: Arc<TCl>,
757	transaction_pool: Arc<TExPool>,
758	keystore: KeystorePtr,
759	system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
760	impl_name: String,
761	impl_version: String,
762	chain_spec: &dyn ChainSpec,
763	state_pruning: &Option<PruningMode>,
764	blocks_pruning: BlocksPruning,
765	backend: Arc<TBackend>,
766	rpc_builder: &(dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>),
767	metrics: Option<sc_rpc_spec_v2::transaction::TransactionMetrics>,
768) -> Result<RpcModule<()>, Error>
769where
770	TBl: BlockT,
771	TCl: ProvideRuntimeApi<TBl>
772		+ BlockchainEvents<TBl>
773		+ HeaderBackend<TBl>
774		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
775		+ ExecutorProvider<TBl>
776		+ CallApiAt<TBl>
777		+ ProofProvider<TBl>
778		+ StorageProvider<TBl, TBackend>
779		+ BlockBackend<TBl>
780		+ Send
781		+ Sync
782		+ 'static,
783	TBackend: sc_client_api::backend::Backend<TBl> + 'static,
784	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
785	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
786	TBl::Hash: Unpin,
787	TBl::Header: Unpin,
788{
789	let system_info = sc_rpc::system::SystemInfo {
790		chain_name: chain_spec.name().into(),
791		impl_name,
792		impl_version,
793		properties: chain_spec.properties(),
794		chain_type: chain_spec.chain_type(),
795	};
796
797	let mut rpc_api = RpcModule::new(());
798	let task_executor = Arc::new(spawn_handle);
799
800	let (chain, state, child_state) = {
801		let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
802		let (state, child_state) = sc_rpc::state::new_full(client.clone(), task_executor.clone());
803		let state = state.into_rpc();
804		let child_state = child_state.into_rpc();
805
806		(chain, state, child_state)
807	};
808
809	const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
810
811	let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
812		client.clone(),
813		transaction_pool.clone(),
814		task_executor.clone(),
815		MAX_TRANSACTION_PER_CONNECTION,
816	)
817	.into_rpc();
818
819	let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
820		client.clone(),
821		transaction_pool.clone(),
822		task_executor.clone(),
823		metrics,
824	)
825	.into_rpc();
826
827	let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
828		client.clone(),
829		backend.clone(),
830		task_executor.clone(),
831		// Defaults to sensible limits for the `ChainHead`.
832		sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
833	)
834	.into_rpc();
835
836	// Part of the RPC v2 spec.
837	// An archive node that can respond to the `archive` RPC-v2 queries is a node with:
838	// - state pruning in archive mode: The storage of blocks is kept around
839	// - block pruning in archive mode: The block's body is kept around
840	let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
841		blocks_pruning.is_archive();
842	let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
843	if is_archive_node {
844		let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
845			client.clone(),
846			backend.clone(),
847			genesis_hash,
848			task_executor.clone(),
849		)
850		.into_rpc();
851		rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
852	}
853
854	// ChainSpec RPC-v2.
855	let chain_spec_v2 = sc_rpc_spec_v2::chain_spec::ChainSpec::new(
856		chain_spec.name().into(),
857		genesis_hash,
858		chain_spec.properties(),
859	)
860	.into_rpc();
861
862	let author = sc_rpc::author::Author::new(
863		client.clone(),
864		transaction_pool,
865		keystore,
866		task_executor.clone(),
867	)
868	.into_rpc();
869
870	let system = sc_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
871
872	if let Some(storage) = backend.offchain_storage() {
873		let offchain = sc_rpc::offchain::Offchain::new(storage).into_rpc();
874
875		rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
876	}
877
878	// Part of the RPC v2 spec.
879	rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
880	rpc_api
881		.merge(transaction_broadcast_rpc_v2)
882		.map_err(|e| Error::Application(e.into()))?;
883	rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
884	rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
885
886	// Part of the old RPC spec.
887	rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
888	rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
889	rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
890	rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
891	rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
892	// Additional [`RpcModule`]s defined in the node to fit the specific blockchain
893	let extra_rpcs = rpc_builder(task_executor.clone())?;
894	rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
895
896	Ok(rpc_api)
897}
898
899/// Parameters to pass into [`build_network`].
900pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
901where
902	Block: BlockT,
903	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
904{
905	/// The service configuration.
906	pub config: &'a Configuration,
907	/// Full network configuration.
908	pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
909	/// A shared client returned by `new_full_parts`.
910	pub client: Arc<Client>,
911	/// A shared transaction pool.
912	pub transaction_pool: Arc<TxPool>,
913	/// A handle for spawning tasks.
914	pub spawn_handle: SpawnTaskHandle,
915	/// An import queue.
916	pub import_queue: IQ,
917	/// A block announce validator builder.
918	pub block_announce_validator_builder: Option<
919		Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
920	>,
921	/// Optional warp sync config.
922	pub warp_sync_config: Option<WarpSyncConfig<Block>>,
923	/// User specified block relay params. If not specified, the default
924	/// block request handler will be used.
925	pub block_relay: Option<BlockRelayParams<Block, Net>>,
926	/// Metrics.
927	pub metrics: NotificationMetrics,
928}
929
930/// Build the network service, the network status sinks and an RPC sender.
931pub fn build_network<Block, Net, TxPool, IQ, Client>(
932	params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
933) -> Result<
934	(
935		Arc<dyn sc_network::service::traits::NetworkService>,
936		TracingUnboundedSender<sc_rpc::system::Request<Block>>,
937		sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
938		Arc<SyncingService<Block>>,
939	),
940	Error,
941>
942where
943	Block: BlockT,
944	Client: ProvideRuntimeApi<Block>
945		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
946		+ Chain<Block>
947		+ BlockBackend<Block>
948		+ BlockIdTo<Block, Error = sp_blockchain::Error>
949		+ ProofProvider<Block>
950		+ HeaderBackend<Block>
951		+ BlockchainEvents<Block>
952		+ 'static,
953	TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
954	IQ: ImportQueue<Block> + 'static,
955	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
956{
957	let BuildNetworkParams {
958		config,
959		mut net_config,
960		client,
961		transaction_pool,
962		spawn_handle,
963		import_queue,
964		block_announce_validator_builder,
965		warp_sync_config,
966		block_relay,
967		metrics,
968	} = params;
969
970	let block_announce_validator = if let Some(f) = block_announce_validator_builder {
971		f(client.clone())
972	} else {
973		Box::new(DefaultBlockAnnounceValidator)
974	};
975
976	let network_service_provider = NetworkServiceProvider::new();
977	let protocol_id = config.protocol_id();
978	let fork_id = config.chain_spec.fork_id();
979	let metrics_registry = config.prometheus_config.as_ref().map(|config| &config.registry);
980
981	let block_downloader = match block_relay {
982		Some(params) => {
983			let BlockRelayParams { mut server, downloader, request_response_config } = params;
984
985			net_config.add_request_response_protocol(request_response_config);
986
987			spawn_handle.spawn("block-request-handler", Some("networking"), async move {
988				server.run().await;
989			});
990
991			downloader
992		},
993		None => build_default_block_downloader(
994			&protocol_id,
995			fork_id,
996			&mut net_config,
997			network_service_provider.handle(),
998			Arc::clone(&client),
999			config.network.default_peers_set.in_peers as usize +
1000				config.network.default_peers_set.out_peers as usize,
1001			&spawn_handle,
1002		),
1003	};
1004
1005	let syncing_strategy = build_polkadot_syncing_strategy(
1006		protocol_id.clone(),
1007		fork_id,
1008		&mut net_config,
1009		warp_sync_config,
1010		block_downloader,
1011		client.clone(),
1012		&spawn_handle,
1013		metrics_registry,
1014	)?;
1015
1016	let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1017		Roles::from(&config.role),
1018		Arc::clone(&client),
1019		metrics_registry,
1020		metrics.clone(),
1021		&net_config,
1022		protocol_id.clone(),
1023		fork_id,
1024		block_announce_validator,
1025		syncing_strategy,
1026		network_service_provider.handle(),
1027		import_queue.service(),
1028		net_config.peer_store_handle(),
1029	)?;
1030
1031	spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1032
1033	build_network_advanced(BuildNetworkAdvancedParams {
1034		role: config.role,
1035		protocol_id,
1036		fork_id,
1037		ipfs_server: config.network.ipfs_server,
1038		announce_block: config.announce_block,
1039		net_config,
1040		client,
1041		transaction_pool,
1042		spawn_handle,
1043		import_queue,
1044		sync_service,
1045		block_announce_config,
1046		network_service_provider,
1047		metrics_registry,
1048		metrics,
1049	})
1050}
1051
1052/// Parameters to pass into [`build_network_advanced`].
1053pub struct BuildNetworkAdvancedParams<'a, Block, Net, TxPool, IQ, Client>
1054where
1055	Block: BlockT,
1056	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1057{
1058	/// Role of the local node.
1059	pub role: Role,
1060	/// Protocol name prefix.
1061	pub protocol_id: ProtocolId,
1062	/// Fork ID.
1063	pub fork_id: Option<&'a str>,
1064	/// Enable serving block data over IPFS bitswap.
1065	pub ipfs_server: bool,
1066	/// Announce block automatically after they have been imported.
1067	pub announce_block: bool,
1068	/// Full network configuration.
1069	pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1070	/// A shared client returned by `new_full_parts`.
1071	pub client: Arc<Client>,
1072	/// A shared transaction pool.
1073	pub transaction_pool: Arc<TxPool>,
1074	/// A handle for spawning tasks.
1075	pub spawn_handle: SpawnTaskHandle,
1076	/// An import queue.
1077	pub import_queue: IQ,
1078	/// Syncing service to communicate with syncing engine.
1079	pub sync_service: SyncingService<Block>,
1080	/// Block announce config.
1081	pub block_announce_config: Net::NotificationProtocolConfig,
1082	/// Network service provider to drive with network internally.
1083	pub network_service_provider: NetworkServiceProvider,
1084	/// Prometheus metrics registry.
1085	pub metrics_registry: Option<&'a Registry>,
1086	/// Metrics.
1087	pub metrics: NotificationMetrics,
1088}
1089
1090/// Build the network service, the network status sinks and an RPC sender, this is a lower-level
1091/// version of [`build_network`] for those needing more control.
1092pub fn build_network_advanced<Block, Net, TxPool, IQ, Client>(
1093	params: BuildNetworkAdvancedParams<Block, Net, TxPool, IQ, Client>,
1094) -> Result<
1095	(
1096		Arc<dyn sc_network::service::traits::NetworkService>,
1097		TracingUnboundedSender<sc_rpc::system::Request<Block>>,
1098		sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
1099		Arc<SyncingService<Block>>,
1100	),
1101	Error,
1102>
1103where
1104	Block: BlockT,
1105	Client: ProvideRuntimeApi<Block>
1106		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1107		+ Chain<Block>
1108		+ BlockBackend<Block>
1109		+ BlockIdTo<Block, Error = sp_blockchain::Error>
1110		+ ProofProvider<Block>
1111		+ HeaderBackend<Block>
1112		+ BlockchainEvents<Block>
1113		+ 'static,
1114	TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1115	IQ: ImportQueue<Block> + 'static,
1116	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1117{
1118	let BuildNetworkAdvancedParams {
1119		role,
1120		protocol_id,
1121		fork_id,
1122		ipfs_server,
1123		announce_block,
1124		mut net_config,
1125		client,
1126		transaction_pool,
1127		spawn_handle,
1128		import_queue,
1129		sync_service,
1130		block_announce_config,
1131		network_service_provider,
1132		metrics_registry,
1133		metrics,
1134	} = params;
1135
1136	let genesis_hash = client.info().genesis_hash;
1137
1138	let light_client_request_protocol_config = {
1139		// Allow both outgoing and incoming requests.
1140		let (handler, protocol_config) =
1141			LightClientRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone());
1142		spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
1143		protocol_config
1144	};
1145
1146	// install request handlers to `FullNetworkConfiguration`
1147	net_config.add_request_response_protocol(light_client_request_protocol_config);
1148
1149	let bitswap_config = ipfs_server.then(|| {
1150		let (handler, config) = Net::bitswap_server(client.clone());
1151		spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
1152
1153		config
1154	});
1155
1156	// Create transactions protocol and add it to the list of supported protocols of
1157	let (transactions_handler_proto, transactions_config) =
1158		sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
1159			protocol_id.clone(),
1160			genesis_hash,
1161			fork_id,
1162			metrics.clone(),
1163			net_config.peer_store_handle(),
1164		);
1165	net_config.add_notification_protocol(transactions_config);
1166
1167	// Start task for `PeerStore`
1168	let peer_store = net_config.take_peer_store();
1169	spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
1170
1171	let sync_service = Arc::new(sync_service);
1172
1173	let network_params = sc_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
1174		role,
1175		executor: {
1176			let spawn_handle = Clone::clone(&spawn_handle);
1177			Box::new(move |fut| {
1178				spawn_handle.spawn("libp2p-node", Some("networking"), fut);
1179			})
1180		},
1181		network_config: net_config,
1182		genesis_hash,
1183		protocol_id,
1184		fork_id: fork_id.map(ToOwned::to_owned),
1185		metrics_registry: metrics_registry.cloned(),
1186		block_announce_config,
1187		bitswap_config,
1188		notification_metrics: metrics,
1189	};
1190
1191	let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
1192	let network_mut = Net::new(network_params)?;
1193	let network = network_mut.network_service().clone();
1194
1195	let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
1196		network.clone(),
1197		sync_service.clone(),
1198		Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
1199		metrics_registry,
1200	)?;
1201	spawn_handle.spawn_blocking(
1202		"network-transactions-handler",
1203		Some("networking"),
1204		tx_handler.run(),
1205	);
1206
1207	spawn_handle.spawn_blocking(
1208		"chain-sync-network-service-provider",
1209		Some("networking"),
1210		network_service_provider.run(Arc::new(network.clone())),
1211	);
1212	spawn_handle.spawn("import-queue", None, {
1213		let sync_service = sync_service.clone();
1214
1215		async move { import_queue.run(sync_service.as_ref()).await }
1216	});
1217
1218	let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
1219	spawn_handle.spawn(
1220		"system-rpc-handler",
1221		Some("networking"),
1222		build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
1223			role,
1224			network_mut.network_service(),
1225			sync_service.clone(),
1226			client.clone(),
1227			system_rpc_rx,
1228			has_bootnodes,
1229		),
1230	);
1231
1232	let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
1233		network_mut,
1234		client,
1235		sync_service.clone(),
1236		announce_block,
1237	);
1238
1239	// The network worker is responsible for gathering all network messages and processing
1240	// them. This is quite a heavy task, and at the time of the writing of this comment it
1241	// frequently happens that this future takes several seconds or in some situations
1242	// even more than a minute until it has processed its entire queue. This is clearly an
1243	// issue, and ideally we would like to fix the network future to take as little time as
1244	// possible, but we also take the extra harm-prevention measure to execute the networking
1245	// future using `spawn_blocking`.
1246	spawn_handle.spawn_blocking("network-worker", Some("networking"), future);
1247
1248	Ok((network, system_rpc_tx, tx_handler_controller, sync_service.clone()))
1249}
1250
1251/// Configuration for [`build_default_syncing_engine`].
1252pub struct DefaultSyncingEngineConfig<'a, Block, Client, Net>
1253where
1254	Block: BlockT,
1255	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1256{
1257	/// Role of the local node.
1258	pub role: Role,
1259	/// Protocol name prefix.
1260	pub protocol_id: ProtocolId,
1261	/// Fork ID.
1262	pub fork_id: Option<&'a str>,
1263	/// Full network configuration.
1264	pub net_config: &'a mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1265	/// Validator for incoming block announcements.
1266	pub block_announce_validator: Box<dyn BlockAnnounceValidator<Block> + Send>,
1267	/// Handle to communicate with `NetworkService`.
1268	pub network_service_handle: NetworkServiceHandle,
1269	/// Warp sync configuration (when used).
1270	pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1271	/// A shared client returned by `new_full_parts`.
1272	pub client: Arc<Client>,
1273	/// Blocks import queue API.
1274	pub import_queue_service: Box<dyn ImportQueueService<Block>>,
1275	/// Expected max total number of peer connections (in + out).
1276	pub num_peers_hint: usize,
1277	/// A handle for spawning tasks.
1278	pub spawn_handle: &'a SpawnTaskHandle,
1279	/// Prometheus metrics registry.
1280	pub metrics_registry: Option<&'a Registry>,
1281	/// Metrics.
1282	pub metrics: NotificationMetrics,
1283}
1284
1285/// Build default syncing engine using [`build_default_block_downloader`] and
1286/// [`build_polkadot_syncing_strategy`] internally.
1287pub fn build_default_syncing_engine<Block, Client, Net>(
1288	config: DefaultSyncingEngineConfig<Block, Client, Net>,
1289) -> Result<(SyncingService<Block>, Net::NotificationProtocolConfig), Error>
1290where
1291	Block: BlockT,
1292	Client: HeaderBackend<Block>
1293		+ BlockBackend<Block>
1294		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1295		+ ProofProvider<Block>
1296		+ Send
1297		+ Sync
1298		+ 'static,
1299	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1300{
1301	let DefaultSyncingEngineConfig {
1302		role,
1303		protocol_id,
1304		fork_id,
1305		net_config,
1306		block_announce_validator,
1307		network_service_handle,
1308		warp_sync_config,
1309		client,
1310		import_queue_service,
1311		num_peers_hint,
1312		spawn_handle,
1313		metrics_registry,
1314		metrics,
1315	} = config;
1316
1317	let block_downloader = build_default_block_downloader(
1318		&protocol_id,
1319		fork_id,
1320		net_config,
1321		network_service_handle.clone(),
1322		client.clone(),
1323		num_peers_hint,
1324		spawn_handle,
1325	);
1326	let syncing_strategy = build_polkadot_syncing_strategy(
1327		protocol_id.clone(),
1328		fork_id,
1329		net_config,
1330		warp_sync_config,
1331		block_downloader,
1332		client.clone(),
1333		spawn_handle,
1334		metrics_registry,
1335	)?;
1336
1337	let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1338		Roles::from(&role),
1339		client,
1340		metrics_registry,
1341		metrics,
1342		&net_config,
1343		protocol_id,
1344		fork_id,
1345		block_announce_validator,
1346		syncing_strategy,
1347		network_service_handle,
1348		import_queue_service,
1349		net_config.peer_store_handle(),
1350	)?;
1351
1352	spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1353
1354	Ok((sync_service, block_announce_config))
1355}
1356
1357/// Build default block downloader
1358pub fn build_default_block_downloader<Block, Client, Net>(
1359	protocol_id: &ProtocolId,
1360	fork_id: Option<&str>,
1361	net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1362	network_service_handle: NetworkServiceHandle,
1363	client: Arc<Client>,
1364	num_peers_hint: usize,
1365	spawn_handle: &SpawnTaskHandle,
1366) -> Arc<dyn BlockDownloader<Block>>
1367where
1368	Block: BlockT,
1369	Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
1370	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1371{
1372	// Custom protocol was not specified, use the default block handler.
1373	// Allow both outgoing and incoming requests.
1374	let BlockRelayParams { mut server, downloader, request_response_config } =
1375		BlockRequestHandler::new::<Net>(
1376			network_service_handle,
1377			&protocol_id,
1378			fork_id,
1379			client.clone(),
1380			num_peers_hint,
1381		);
1382
1383	spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1384		server.run().await;
1385	});
1386
1387	net_config.add_request_response_protocol(request_response_config);
1388
1389	downloader
1390}
1391
1392/// Build standard polkadot syncing strategy
1393pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1394	protocol_id: ProtocolId,
1395	fork_id: Option<&str>,
1396	net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1397	warp_sync_config: Option<WarpSyncConfig<Block>>,
1398	block_downloader: Arc<dyn BlockDownloader<Block>>,
1399	client: Arc<Client>,
1400	spawn_handle: &SpawnTaskHandle,
1401	metrics_registry: Option<&Registry>,
1402) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1403where
1404	Block: BlockT,
1405	Client: HeaderBackend<Block>
1406		+ BlockBackend<Block>
1407		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1408		+ ProofProvider<Block>
1409		+ Send
1410		+ Sync
1411		+ 'static,
1412	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1413{
1414	if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1415		return Err("Warp sync enabled, but no warp sync provider configured.".into())
1416	}
1417
1418	if client.requires_full_sync() {
1419		match net_config.network_config.sync_mode {
1420			SyncMode::LightState { .. } =>
1421				return Err("Fast sync doesn't work for archive nodes".into()),
1422			SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1423			SyncMode::Full => {},
1424		}
1425	}
1426
1427	let genesis_hash = client.info().genesis_hash;
1428
1429	let (state_request_protocol_config, state_request_protocol_name) = {
1430		let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
1431			net_config.network_config.default_peers_set.reserved_nodes.len();
1432		// Allow both outgoing and incoming requests.
1433		let (handler, protocol_config) =
1434			StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1435		let config_name = protocol_config.protocol_name().clone();
1436
1437		spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1438		(protocol_config, config_name)
1439	};
1440	net_config.add_request_response_protocol(state_request_protocol_config);
1441
1442	let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1443		Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1444			// Allow both outgoing and incoming requests.
1445			let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1446				protocol_id,
1447				genesis_hash,
1448				fork_id,
1449				warp_with_provider.clone(),
1450			);
1451			let config_name = protocol_config.protocol_name().clone();
1452
1453			spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1454			(Some(protocol_config), Some(config_name))
1455		},
1456		_ => (None, None),
1457	};
1458	if let Some(config) = warp_sync_protocol_config {
1459		net_config.add_request_response_protocol(config);
1460	}
1461
1462	let syncing_config = PolkadotSyncingStrategyConfig {
1463		mode: net_config.network_config.sync_mode,
1464		max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1465		max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1466		min_peers_to_start_warp_sync: net_config.network_config.min_peers_to_start_warp_sync,
1467		metrics_registry: metrics_registry.cloned(),
1468		state_request_protocol_name,
1469		block_downloader,
1470	};
1471	Ok(Box::new(PolkadotSyncingStrategy::new(
1472		syncing_config,
1473		client,
1474		warp_sync_config,
1475		warp_sync_protocol_name,
1476	)?))
1477}