sc_service/
builder.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	build_network_future, build_system_rpc_future,
21	client::{Client, ClientConfig},
22	config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
23	error::Error,
24	metrics::MetricsService,
25	start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
26	TaskManager, TransactionPoolAdapter,
27};
28use futures::{channel::oneshot, future::ready, FutureExt, StreamExt};
29use jsonrpsee::RpcModule;
30use log::info;
31use prometheus_endpoint::Registry;
32use sc_chain_spec::{get_extension, ChainSpec};
33use sc_client_api::{
34	execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
35	BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, StorageProvider, UsageProvider,
36};
37use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
38use sc_consensus::import_queue::ImportQueue;
39use sc_executor::{
40	sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
41	WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
42};
43use sc_keystore::LocalKeystore;
44use sc_network::{
45	config::{FullNetworkConfiguration, ProtocolId, SyncMode},
46	multiaddr::Protocol,
47	service::{
48		traits::{PeerStore, RequestResponseConfig},
49		NotificationMetrics,
50	},
51	NetworkBackend, NetworkStateInfo,
52};
53use sc_network_common::role::Roles;
54use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
55use sc_network_sync::{
56	block_relay_protocol::BlockRelayParams,
57	block_request_handler::BlockRequestHandler,
58	engine::SyncingEngine,
59	service::network::NetworkServiceProvider,
60	state_request_handler::StateRequestHandler,
61	strategy::{PolkadotSyncingStrategy, SyncingConfig, SyncingStrategy},
62	warp_request_handler::RequestHandler as WarpSyncRequestHandler,
63	SyncingService, WarpSyncConfig,
64};
65use sc_rpc::{
66	author::AuthorApiServer,
67	chain::ChainApiServer,
68	offchain::OffchainApiServer,
69	state::{ChildStateApiServer, StateApiServer},
70	system::SystemApiServer,
71	DenyUnsafe, SubscriptionTaskExecutor,
72};
73use sc_rpc_spec_v2::{
74	archive::ArchiveApiServer,
75	chain_head::ChainHeadApiServer,
76	chain_spec::ChainSpecApiServer,
77	transaction::{TransactionApiServer, TransactionBroadcastApiServer},
78};
79use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
80use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
81use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
82use sp_api::{CallApiAt, ProvideRuntimeApi};
83use sp_blockchain::{HeaderBackend, HeaderMetadata};
84use sp_consensus::block_validation::{
85	BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
86};
87use sp_core::traits::{CodeExecutor, SpawnNamed};
88use sp_keystore::KeystorePtr;
89use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
90use std::{str::FromStr, sync::Arc, time::SystemTime};
91
92/// Full client type.
93pub type TFullClient<TBl, TRtApi, TExec> =
94	Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
95
96/// Full client backend type.
97pub type TFullBackend<TBl> = Backend<TBl>;
98
99/// Full client call executor type.
100pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
101
102type TFullParts<TBl, TRtApi, TExec> =
103	(TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
104
105/// Construct a local keystore shareable container
106pub struct KeystoreContainer(Arc<LocalKeystore>);
107
108impl KeystoreContainer {
109	/// Construct KeystoreContainer
110	pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
111		let keystore = Arc::new(match config {
112			KeystoreConfig::Path { path, password } =>
113				LocalKeystore::open(path.clone(), password.clone())?,
114			KeystoreConfig::InMemory => LocalKeystore::in_memory(),
115		});
116
117		Ok(Self(keystore))
118	}
119
120	/// Returns a shared reference to a dynamic `Keystore` trait implementation.
121	pub fn keystore(&self) -> KeystorePtr {
122		self.0.clone()
123	}
124
125	/// Returns a shared reference to the local keystore .
126	pub fn local_keystore(&self) -> Arc<LocalKeystore> {
127		self.0.clone()
128	}
129}
130
131/// Creates a new full client for the given config.
132pub fn new_full_client<TBl, TRtApi, TExec>(
133	config: &Configuration,
134	telemetry: Option<TelemetryHandle>,
135	executor: TExec,
136) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
137where
138	TBl: BlockT,
139	TExec: CodeExecutor + RuntimeVersionOf + Clone,
140{
141	new_full_parts(config, telemetry, executor).map(|parts| parts.0)
142}
143
144/// Create the initial parts of a full node with the default genesis block builder.
145pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
146	config: &Configuration,
147	telemetry: Option<TelemetryHandle>,
148	executor: TExec,
149	enable_import_proof_recording: bool,
150) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
151where
152	TBl: BlockT,
153	TExec: CodeExecutor + RuntimeVersionOf + Clone,
154{
155	let backend = new_db_backend(config.db_config())?;
156
157	let genesis_block_builder = GenesisBlockBuilder::new(
158		config.chain_spec.as_storage_builder(),
159		!config.no_genesis(),
160		backend.clone(),
161		executor.clone(),
162	)?;
163
164	new_full_parts_with_genesis_builder(
165		config,
166		telemetry,
167		executor,
168		backend,
169		genesis_block_builder,
170		enable_import_proof_recording,
171	)
172}
173/// Create the initial parts of a full node with the default genesis block builder.
174pub fn new_full_parts<TBl, TRtApi, TExec>(
175	config: &Configuration,
176	telemetry: Option<TelemetryHandle>,
177	executor: TExec,
178) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
179where
180	TBl: BlockT,
181	TExec: CodeExecutor + RuntimeVersionOf + Clone,
182{
183	new_full_parts_record_import(config, telemetry, executor, false)
184}
185
186/// Create the initial parts of a full node.
187pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
188	config: &Configuration,
189	telemetry: Option<TelemetryHandle>,
190	executor: TExec,
191	backend: Arc<TFullBackend<TBl>>,
192	genesis_block_builder: TBuildGenesisBlock,
193	enable_import_proof_recording: bool,
194) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
195where
196	TBl: BlockT,
197	TExec: CodeExecutor + RuntimeVersionOf + Clone,
198	TBuildGenesisBlock: BuildGenesisBlock<
199		TBl,
200		BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
201	>,
202{
203	let keystore_container = KeystoreContainer::new(&config.keystore)?;
204
205	let task_manager = {
206		let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
207		TaskManager::new(config.tokio_handle.clone(), registry)?
208	};
209
210	let chain_spec = &config.chain_spec;
211	let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
212		.cloned()
213		.unwrap_or_default();
214
215	let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
216		.cloned()
217		.unwrap_or_default();
218
219	let client = {
220		let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
221
222		let wasm_runtime_substitutes = config
223			.chain_spec
224			.code_substitutes()
225			.into_iter()
226			.map(|(n, c)| {
227				let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
228					Error::Application(Box::from(format!(
229						"Failed to parse `{}` as block number for code substitutes. \
230						 In an old version the key for code substitute was a block hash. \
231						 Please update the chain spec to a version that is compatible with your node.",
232						n
233					)))
234				})?;
235				Ok((number, c))
236			})
237			.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
238
239		let client = new_client(
240			backend.clone(),
241			executor,
242			genesis_block_builder,
243			fork_blocks,
244			bad_blocks,
245			extensions,
246			Box::new(task_manager.spawn_handle()),
247			config.prometheus_config.as_ref().map(|config| config.registry.clone()),
248			telemetry,
249			ClientConfig {
250				offchain_worker_enabled: config.offchain_worker.enabled,
251				offchain_indexing_api: config.offchain_worker.indexing_enabled,
252				wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
253				no_genesis: config.no_genesis(),
254				wasm_runtime_substitutes,
255				enable_import_proof_recording,
256			},
257		)?;
258
259		client
260	};
261
262	Ok((client, backend, keystore_container, task_manager))
263}
264
265/// Creates a [`NativeElseWasmExecutor`](sc_executor::NativeElseWasmExecutor) according to
266/// [`Configuration`].
267#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
268#[allow(deprecated)]
269pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
270	config: &Configuration,
271) -> sc_executor::NativeElseWasmExecutor<D> {
272	#[allow(deprecated)]
273	sc_executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(&config.executor))
274}
275
276/// Creates a [`WasmExecutor`] according to [`ExecutorConfiguration`].
277pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
278	let strategy = config
279		.default_heap_pages
280		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
281	WasmExecutor::<H>::builder()
282		.with_execution_method(config.wasm_method)
283		.with_onchain_heap_alloc_strategy(strategy)
284		.with_offchain_heap_alloc_strategy(strategy)
285		.with_max_runtime_instances(config.max_runtime_instances)
286		.with_runtime_cache_size(config.runtime_cache_size)
287		.build()
288}
289
290/// Create an instance of default DB-backend backend.
291pub fn new_db_backend<Block>(
292	settings: DatabaseSettings,
293) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
294where
295	Block: BlockT,
296{
297	const CANONICALIZATION_DELAY: u64 = 4096;
298
299	Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
300}
301
302/// Create an instance of client backed by given backend.
303pub fn new_client<E, Block, RA, G>(
304	backend: Arc<Backend<Block>>,
305	executor: E,
306	genesis_block_builder: G,
307	fork_blocks: ForkBlocks<Block>,
308	bad_blocks: BadBlocks<Block>,
309	execution_extensions: ExecutionExtensions<Block>,
310	spawn_handle: Box<dyn SpawnNamed>,
311	prometheus_registry: Option<Registry>,
312	telemetry: Option<TelemetryHandle>,
313	config: ClientConfig<Block>,
314) -> Result<
315	Client<
316		Backend<Block>,
317		crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
318		Block,
319		RA,
320	>,
321	sp_blockchain::Error,
322>
323where
324	Block: BlockT,
325	E: CodeExecutor + RuntimeVersionOf,
326	G: BuildGenesisBlock<
327		Block,
328		BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
329	>,
330{
331	let executor = crate::client::LocalCallExecutor::new(
332		backend.clone(),
333		executor,
334		config.clone(),
335		execution_extensions,
336	)?;
337
338	Client::new(
339		backend,
340		executor,
341		spawn_handle,
342		genesis_block_builder,
343		fork_blocks,
344		bad_blocks,
345		prometheus_registry,
346		telemetry,
347		config,
348	)
349}
350
351/// Parameters to pass into `build`.
352pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
353	/// The service configuration.
354	pub config: Configuration,
355	/// A shared client returned by `new_full_parts`.
356	pub client: Arc<TCl>,
357	/// A shared backend returned by `new_full_parts`.
358	pub backend: Arc<Backend>,
359	/// A task manager returned by `new_full_parts`.
360	pub task_manager: &'a mut TaskManager,
361	/// A shared keystore returned by `new_full_parts`.
362	pub keystore: KeystorePtr,
363	/// A shared transaction pool.
364	pub transaction_pool: Arc<TExPool>,
365	/// Builds additional [`RpcModule`]s that should be added to the server
366	pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
367	/// A shared network instance.
368	pub network: Arc<dyn sc_network::service::traits::NetworkService>,
369	/// A Sender for RPC requests.
370	pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
371	/// Controller for transactions handlers
372	pub tx_handler_controller:
373		sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
374	/// Syncing service.
375	pub sync_service: Arc<SyncingService<TBl>>,
376	/// Telemetry instance for this node.
377	pub telemetry: Option<&'a mut Telemetry>,
378}
379
380/// Spawn the tasks that are required to run a node.
381pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
382	params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
383) -> Result<RpcHandlers, Error>
384where
385	TCl: ProvideRuntimeApi<TBl>
386		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
387		+ Chain<TBl>
388		+ BlockBackend<TBl>
389		+ BlockIdTo<TBl, Error = sp_blockchain::Error>
390		+ ProofProvider<TBl>
391		+ HeaderBackend<TBl>
392		+ BlockchainEvents<TBl>
393		+ ExecutorProvider<TBl>
394		+ UsageProvider<TBl>
395		+ StorageProvider<TBl, TBackend>
396		+ CallApiAt<TBl>
397		+ Send
398		+ 'static,
399	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
400		+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
401		+ sp_session::SessionKeys<TBl>
402		+ sp_api::ApiExt<TBl>,
403	TBl: BlockT,
404	TBl::Hash: Unpin,
405	TBl::Header: Unpin,
406	TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
407	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
408{
409	let SpawnTasksParams {
410		mut config,
411		task_manager,
412		client,
413		backend,
414		keystore,
415		transaction_pool,
416		rpc_builder,
417		network,
418		system_rpc_tx,
419		tx_handler_controller,
420		sync_service,
421		telemetry,
422	} = params;
423
424	let chain_info = client.usage_info().chain;
425
426	sp_session::generate_initial_session_keys(
427		client.clone(),
428		chain_info.best_hash,
429		config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
430		keystore.clone(),
431	)
432	.map_err(|e| Error::Application(Box::new(e)))?;
433
434	let sysinfo = sc_sysinfo::gather_sysinfo();
435	sc_sysinfo::print_sysinfo(&sysinfo);
436
437	let telemetry = telemetry
438		.map(|telemetry| {
439			init_telemetry(
440				config.network.node_name.clone(),
441				config.impl_name.clone(),
442				config.impl_version.clone(),
443				config.chain_spec.name().to_string(),
444				config.role.is_authority(),
445				network.clone(),
446				client.clone(),
447				telemetry,
448				Some(sysinfo),
449			)
450		})
451		.transpose()?;
452
453	info!("📦 Highest known block at #{}", chain_info.best_number);
454
455	let spawn_handle = task_manager.spawn_handle();
456
457	// Inform the tx pool about imported and finalized blocks.
458	spawn_handle.spawn(
459		"txpool-notifications",
460		Some("transaction-pool"),
461		sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
462	);
463
464	spawn_handle.spawn(
465		"on-transaction-imported",
466		Some("transaction-pool"),
467		propagate_transaction_notifications(
468			transaction_pool.clone(),
469			tx_handler_controller,
470			telemetry.clone(),
471		),
472	);
473
474	// Prometheus metrics.
475	let metrics_service =
476		if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
477			// Set static metrics.
478			let metrics = MetricsService::with_prometheus(
479				telemetry,
480				&registry,
481				config.role,
482				&config.network.node_name,
483				&config.impl_version,
484			)?;
485			spawn_handle.spawn(
486				"prometheus-endpoint",
487				None,
488				prometheus_endpoint::init_prometheus(port, registry).map(drop),
489			);
490
491			metrics
492		} else {
493			MetricsService::new(telemetry)
494		};
495
496	// Periodically updated metrics and telemetry updates.
497	spawn_handle.spawn(
498		"telemetry-periodic-send",
499		None,
500		metrics_service.run(
501			client.clone(),
502			transaction_pool.clone(),
503			network.clone(),
504			sync_service.clone(),
505		),
506	);
507
508	let rpc_id_provider = config.rpc.id_provider.take();
509
510	// jsonrpsee RPC
511	let gen_rpc_module = || {
512		gen_rpc_module(
513			task_manager.spawn_handle(),
514			client.clone(),
515			transaction_pool.clone(),
516			keystore.clone(),
517			system_rpc_tx.clone(),
518			config.impl_name.clone(),
519			config.impl_version.clone(),
520			config.chain_spec.as_ref(),
521			&config.state_pruning,
522			config.blocks_pruning,
523			backend.clone(),
524			&*rpc_builder,
525		)
526	};
527
528	let rpc_server_handle = start_rpc_servers(
529		&config.rpc,
530		config.prometheus_registry(),
531		&config.tokio_handle,
532		gen_rpc_module,
533		rpc_id_provider,
534	)?;
535
536	let listen_addrs = rpc_server_handle
537		.listen_addrs()
538		.into_iter()
539		.map(|socket_addr| {
540			let mut multiaddr: Multiaddr = socket_addr.ip().into();
541			multiaddr.push(Protocol::Tcp(socket_addr.port()));
542			multiaddr
543		})
544		.collect();
545
546	let in_memory_rpc = {
547		let mut module = gen_rpc_module()?;
548		module.extensions_mut().insert(DenyUnsafe::No);
549		module
550	};
551
552	let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
553
554	// Spawn informant task
555	spawn_handle.spawn(
556		"informant",
557		None,
558		sc_informant::build(client.clone(), network, sync_service.clone()),
559	);
560
561	task_manager.keep_alive((config.base_path, rpc_server_handle));
562
563	Ok(in_memory_rpc_handle)
564}
565
566/// Returns a future that forwards imported transactions to the transaction networking protocol.
567pub async fn propagate_transaction_notifications<Block, ExPool>(
568	transaction_pool: Arc<ExPool>,
569	tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
570		<Block as BlockT>::Hash,
571	>,
572	telemetry: Option<TelemetryHandle>,
573) where
574	Block: BlockT,
575	ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
576{
577	// transaction notifications
578	transaction_pool
579		.import_notification_stream()
580		.for_each(move |hash| {
581			tx_handler_controller.propagate_transaction(hash);
582			let status = transaction_pool.status();
583			telemetry!(
584				telemetry;
585				SUBSTRATE_INFO;
586				"txpool.import";
587				"ready" => status.ready,
588				"future" => status.future,
589			);
590			ready(())
591		})
592		.await;
593}
594
595/// Initialize telemetry with provided configuration and return telemetry handle
596pub fn init_telemetry<Block, Client, Network>(
597	name: String,
598	implementation: String,
599	version: String,
600	chain: String,
601	authority: bool,
602	network: Network,
603	client: Arc<Client>,
604	telemetry: &mut Telemetry,
605	sysinfo: Option<sc_telemetry::SysInfo>,
606) -> sc_telemetry::Result<TelemetryHandle>
607where
608	Block: BlockT,
609	Client: BlockBackend<Block>,
610	Network: NetworkStateInfo,
611{
612	let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
613	let connection_message = ConnectionMessage {
614		name,
615		implementation,
616		version,
617		target_os: sc_sysinfo::TARGET_OS.into(),
618		target_arch: sc_sysinfo::TARGET_ARCH.into(),
619		target_env: sc_sysinfo::TARGET_ENV.into(),
620		config: String::new(),
621		chain,
622		genesis_hash: format!("{:?}", genesis_hash),
623		authority,
624		startup_time: SystemTime::UNIX_EPOCH
625			.elapsed()
626			.map(|dur| dur.as_millis())
627			.unwrap_or(0)
628			.to_string(),
629		network_id: network.local_peer_id().to_base58(),
630		sysinfo,
631	};
632
633	telemetry.start_telemetry(connection_message)?;
634
635	Ok(telemetry.handle())
636}
637
638/// Generate RPC module using provided configuration
639pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
640	spawn_handle: SpawnTaskHandle,
641	client: Arc<TCl>,
642	transaction_pool: Arc<TExPool>,
643	keystore: KeystorePtr,
644	system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
645	impl_name: String,
646	impl_version: String,
647	chain_spec: &dyn ChainSpec,
648	state_pruning: &Option<PruningMode>,
649	blocks_pruning: BlocksPruning,
650	backend: Arc<TBackend>,
651	rpc_builder: &(dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>),
652) -> Result<RpcModule<()>, Error>
653where
654	TBl: BlockT,
655	TCl: ProvideRuntimeApi<TBl>
656		+ BlockchainEvents<TBl>
657		+ HeaderBackend<TBl>
658		+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
659		+ ExecutorProvider<TBl>
660		+ CallApiAt<TBl>
661		+ ProofProvider<TBl>
662		+ StorageProvider<TBl, TBackend>
663		+ BlockBackend<TBl>
664		+ Send
665		+ Sync
666		+ 'static,
667	TBackend: sc_client_api::backend::Backend<TBl> + 'static,
668	<TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
669	TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
670	TBl::Hash: Unpin,
671	TBl::Header: Unpin,
672{
673	let system_info = sc_rpc::system::SystemInfo {
674		chain_name: chain_spec.name().into(),
675		impl_name,
676		impl_version,
677		properties: chain_spec.properties(),
678		chain_type: chain_spec.chain_type(),
679	};
680
681	let mut rpc_api = RpcModule::new(());
682	let task_executor = Arc::new(spawn_handle);
683
684	let (chain, state, child_state) = {
685		let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
686		let (state, child_state) = sc_rpc::state::new_full(client.clone(), task_executor.clone());
687		let state = state.into_rpc();
688		let child_state = child_state.into_rpc();
689
690		(chain, state, child_state)
691	};
692
693	const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
694
695	let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
696		client.clone(),
697		transaction_pool.clone(),
698		task_executor.clone(),
699		MAX_TRANSACTION_PER_CONNECTION,
700	)
701	.into_rpc();
702
703	let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
704		client.clone(),
705		transaction_pool.clone(),
706		task_executor.clone(),
707	)
708	.into_rpc();
709
710	let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
711		client.clone(),
712		backend.clone(),
713		task_executor.clone(),
714		// Defaults to sensible limits for the `ChainHead`.
715		sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
716	)
717	.into_rpc();
718
719	// Part of the RPC v2 spec.
720	// An archive node that can respond to the `archive` RPC-v2 queries is a node with:
721	// - state pruning in archive mode: The storage of blocks is kept around
722	// - block pruning in archive mode: The block's body is kept around
723	let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
724		blocks_pruning.is_archive();
725	let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
726	if is_archive_node {
727		let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
728			client.clone(),
729			backend.clone(),
730			genesis_hash,
731			// Defaults to sensible limits for the `Archive`.
732			sc_rpc_spec_v2::archive::ArchiveConfig::default(),
733		)
734		.into_rpc();
735		rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
736	}
737
738	// ChainSpec RPC-v2.
739	let chain_spec_v2 = sc_rpc_spec_v2::chain_spec::ChainSpec::new(
740		chain_spec.name().into(),
741		genesis_hash,
742		chain_spec.properties(),
743	)
744	.into_rpc();
745
746	let author = sc_rpc::author::Author::new(
747		client.clone(),
748		transaction_pool,
749		keystore,
750		task_executor.clone(),
751	)
752	.into_rpc();
753
754	let system = sc_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
755
756	if let Some(storage) = backend.offchain_storage() {
757		let offchain = sc_rpc::offchain::Offchain::new(storage).into_rpc();
758
759		rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
760	}
761
762	// Part of the RPC v2 spec.
763	rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
764	rpc_api
765		.merge(transaction_broadcast_rpc_v2)
766		.map_err(|e| Error::Application(e.into()))?;
767	rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
768	rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
769
770	// Part of the old RPC spec.
771	rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
772	rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
773	rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
774	rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
775	rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
776	// Additional [`RpcModule`]s defined in the node to fit the specific blockchain
777	let extra_rpcs = rpc_builder(task_executor.clone())?;
778	rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
779
780	Ok(rpc_api)
781}
782
783/// Parameters to pass into `build_network`.
784pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
785where
786	Block: BlockT,
787	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
788{
789	/// The service configuration.
790	pub config: &'a Configuration,
791	/// Full network configuration.
792	pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
793	/// A shared client returned by `new_full_parts`.
794	pub client: Arc<Client>,
795	/// A shared transaction pool.
796	pub transaction_pool: Arc<TxPool>,
797	/// A handle for spawning tasks.
798	pub spawn_handle: SpawnTaskHandle,
799	/// An import queue.
800	pub import_queue: IQ,
801	/// A block announce validator builder.
802	pub block_announce_validator_builder: Option<
803		Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
804	>,
805	/// Syncing strategy to use in syncing engine.
806	pub syncing_strategy: Box<dyn SyncingStrategy<Block>>,
807	/// User specified block relay params. If not specified, the default
808	/// block request handler will be used.
809	pub block_relay: Option<BlockRelayParams<Block, Net>>,
810	/// Metrics.
811	pub metrics: NotificationMetrics,
812}
813
814/// Build the network service, the network status sinks and an RPC sender.
815pub fn build_network<Block, Net, TxPool, IQ, Client>(
816	params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
817) -> Result<
818	(
819		Arc<dyn sc_network::service::traits::NetworkService>,
820		TracingUnboundedSender<sc_rpc::system::Request<Block>>,
821		sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
822		NetworkStarter,
823		Arc<SyncingService<Block>>,
824	),
825	Error,
826>
827where
828	Block: BlockT,
829	Client: ProvideRuntimeApi<Block>
830		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
831		+ Chain<Block>
832		+ BlockBackend<Block>
833		+ BlockIdTo<Block, Error = sp_blockchain::Error>
834		+ ProofProvider<Block>
835		+ HeaderBackend<Block>
836		+ BlockchainEvents<Block>
837		+ 'static,
838	TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
839	IQ: ImportQueue<Block> + 'static,
840	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
841{
842	let BuildNetworkParams {
843		config,
844		mut net_config,
845		client,
846		transaction_pool,
847		spawn_handle,
848		import_queue,
849		block_announce_validator_builder,
850		syncing_strategy,
851		block_relay,
852		metrics,
853	} = params;
854
855	let protocol_id = config.protocol_id();
856	let genesis_hash = client.info().genesis_hash;
857
858	let block_announce_validator = if let Some(f) = block_announce_validator_builder {
859		f(client.clone())
860	} else {
861		Box::new(DefaultBlockAnnounceValidator)
862	};
863
864	let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new();
865	let (mut block_server, block_downloader, block_request_protocol_config) = match block_relay {
866		Some(params) => (params.server, params.downloader, params.request_response_config),
867		None => {
868			// Custom protocol was not specified, use the default block handler.
869			// Allow both outgoing and incoming requests.
870			let params = BlockRequestHandler::new::<Net>(
871				chain_sync_network_handle.clone(),
872				&protocol_id,
873				config.chain_spec.fork_id(),
874				client.clone(),
875				config.network.default_peers_set.in_peers as usize +
876					config.network.default_peers_set.out_peers as usize,
877			);
878			(params.server, params.downloader, params.request_response_config)
879		},
880	};
881	spawn_handle.spawn("block-request-handler", Some("networking"), async move {
882		block_server.run().await;
883	});
884
885	let light_client_request_protocol_config = {
886		// Allow both outgoing and incoming requests.
887		let (handler, protocol_config) = LightClientRequestHandler::new::<Net>(
888			&protocol_id,
889			config.chain_spec.fork_id(),
890			client.clone(),
891		);
892		spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
893		protocol_config
894	};
895
896	// install request handlers to `FullNetworkConfiguration`
897	net_config.add_request_response_protocol(block_request_protocol_config);
898	net_config.add_request_response_protocol(light_client_request_protocol_config);
899
900	let bitswap_config = config.network.ipfs_server.then(|| {
901		let (handler, config) = Net::bitswap_server(client.clone());
902		spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
903
904		config
905	});
906
907	// create transactions protocol and add it to the list of supported protocols of
908	let peer_store_handle = net_config.peer_store_handle();
909	let (transactions_handler_proto, transactions_config) =
910		sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
911			protocol_id.clone(),
912			genesis_hash,
913			config.chain_spec.fork_id(),
914			metrics.clone(),
915			Arc::clone(&peer_store_handle),
916		);
917	net_config.add_notification_protocol(transactions_config);
918
919	// Start task for `PeerStore`
920	let peer_store = net_config.take_peer_store();
921	let peer_store_handle = peer_store.handle();
922	spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
923
924	let (engine, sync_service, block_announce_config) = SyncingEngine::new(
925		Roles::from(&config.role),
926		client.clone(),
927		config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(),
928		metrics.clone(),
929		&net_config,
930		protocol_id.clone(),
931		&config.chain_spec.fork_id().map(ToOwned::to_owned),
932		block_announce_validator,
933		syncing_strategy,
934		chain_sync_network_handle,
935		import_queue.service(),
936		block_downloader,
937		Arc::clone(&peer_store_handle),
938	)?;
939	let sync_service_import_queue = sync_service.clone();
940	let sync_service = Arc::new(sync_service);
941
942	let network_params = sc_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
943		role: config.role,
944		executor: {
945			let spawn_handle = Clone::clone(&spawn_handle);
946			Box::new(move |fut| {
947				spawn_handle.spawn("libp2p-node", Some("networking"), fut);
948			})
949		},
950		network_config: net_config,
951		genesis_hash,
952		protocol_id,
953		fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned),
954		metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
955		block_announce_config,
956		bitswap_config,
957		notification_metrics: metrics,
958	};
959
960	let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
961	let network_mut = Net::new(network_params)?;
962	let network = network_mut.network_service().clone();
963
964	let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
965		network.clone(),
966		sync_service.clone(),
967		Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
968		config.prometheus_config.as_ref().map(|config| &config.registry),
969	)?;
970	spawn_handle.spawn_blocking(
971		"network-transactions-handler",
972		Some("networking"),
973		tx_handler.run(),
974	);
975
976	spawn_handle.spawn_blocking(
977		"chain-sync-network-service-provider",
978		Some("networking"),
979		chain_sync_network_provider.run(Arc::new(network.clone())),
980	);
981	spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(sync_service_import_queue)));
982	spawn_handle.spawn_blocking("syncing", None, engine.run());
983
984	let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
985	spawn_handle.spawn(
986		"system-rpc-handler",
987		Some("networking"),
988		build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
989			config.role,
990			network_mut.network_service(),
991			sync_service.clone(),
992			client.clone(),
993			system_rpc_rx,
994			has_bootnodes,
995		),
996	);
997
998	let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
999		network_mut,
1000		client,
1001		sync_service.clone(),
1002		config.announce_block,
1003	);
1004
1005	// TODO: Normally, one is supposed to pass a list of notifications protocols supported by the
1006	// node through the `NetworkConfiguration` struct. But because this function doesn't know in
1007	// advance which components, such as GrandPa or Polkadot, will be plugged on top of the
1008	// service, it is unfortunately not possible to do so without some deep refactoring. To
1009	// bypass this problem, the `NetworkService` provides a `register_notifications_protocol`
1010	// method that can be called even after the network has been initialized. However, we want to
1011	// avoid the situation where `register_notifications_protocol` is called *after* the network
1012	// actually connects to other peers. For this reason, we delay the process of the network
1013	// future until the user calls `NetworkStarter::start_network`.
1014	//
1015	// This entire hack should eventually be removed in favour of passing the list of protocols
1016	// through the configuration.
1017	//
1018	// See also https://github.com/paritytech/substrate/issues/6827
1019	let (network_start_tx, network_start_rx) = oneshot::channel();
1020
1021	// The network worker is responsible for gathering all network messages and processing
1022	// them. This is quite a heavy task, and at the time of the writing of this comment it
1023	// frequently happens that this future takes several seconds or in some situations
1024	// even more than a minute until it has processed its entire queue. This is clearly an
1025	// issue, and ideally we would like to fix the network future to take as little time as
1026	// possible, but we also take the extra harm-prevention measure to execute the networking
1027	// future using `spawn_blocking`.
1028	spawn_handle.spawn_blocking("network-worker", Some("networking"), async move {
1029		if network_start_rx.await.is_err() {
1030			log::warn!(
1031				"The NetworkStart returned as part of `build_network` has been silently dropped"
1032			);
1033			// This `return` might seem unnecessary, but we don't want to make it look like
1034			// everything is working as normal even though the user is clearly misusing the API.
1035			return
1036		}
1037
1038		future.await
1039	});
1040
1041	Ok((
1042		network,
1043		system_rpc_tx,
1044		tx_handler_controller,
1045		NetworkStarter(network_start_tx),
1046		sync_service.clone(),
1047	))
1048}
1049
1050/// Build standard polkadot syncing strategy
1051pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1052	protocol_id: ProtocolId,
1053	fork_id: Option<&str>,
1054	net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1055	warp_sync_config: Option<WarpSyncConfig<Block>>,
1056	client: Arc<Client>,
1057	spawn_handle: &SpawnTaskHandle,
1058	metrics_registry: Option<&Registry>,
1059) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1060where
1061	Block: BlockT,
1062	Client: HeaderBackend<Block>
1063		+ BlockBackend<Block>
1064		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1065		+ ProofProvider<Block>
1066		+ Send
1067		+ Sync
1068		+ 'static,
1069
1070	Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1071{
1072	if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1073		return Err("Warp sync enabled, but no warp sync provider configured.".into())
1074	}
1075
1076	if client.requires_full_sync() {
1077		match net_config.network_config.sync_mode {
1078			SyncMode::LightState { .. } =>
1079				return Err("Fast sync doesn't work for archive nodes".into()),
1080			SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1081			SyncMode::Full => {},
1082		}
1083	}
1084
1085	let genesis_hash = client.info().genesis_hash;
1086
1087	let (state_request_protocol_config, state_request_protocol_name) = {
1088		let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
1089			net_config.network_config.default_peers_set.reserved_nodes.len();
1090		// Allow both outgoing and incoming requests.
1091		let (handler, protocol_config) =
1092			StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1093		let config_name = protocol_config.protocol_name().clone();
1094
1095		spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1096		(protocol_config, config_name)
1097	};
1098	net_config.add_request_response_protocol(state_request_protocol_config);
1099
1100	let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1101		Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1102			// Allow both outgoing and incoming requests.
1103			let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1104				protocol_id,
1105				genesis_hash,
1106				fork_id,
1107				warp_with_provider.clone(),
1108			);
1109			let config_name = protocol_config.protocol_name().clone();
1110
1111			spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1112			(Some(protocol_config), Some(config_name))
1113		},
1114		_ => (None, None),
1115	};
1116	if let Some(config) = warp_sync_protocol_config {
1117		net_config.add_request_response_protocol(config);
1118	}
1119
1120	let syncing_config = SyncingConfig {
1121		mode: net_config.network_config.sync_mode,
1122		max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1123		max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1124		metrics_registry: metrics_registry.cloned(),
1125		state_request_protocol_name,
1126	};
1127	Ok(Box::new(PolkadotSyncingStrategy::new(
1128		syncing_config,
1129		client,
1130		warp_sync_config,
1131		warp_sync_protocol_name,
1132	)?))
1133}
1134
1135/// Object used to start the network.
1136#[must_use]
1137pub struct NetworkStarter(oneshot::Sender<()>);
1138
1139impl NetworkStarter {
1140	/// Create a new NetworkStarter
1141	pub fn new(sender: oneshot::Sender<()>) -> Self {
1142		NetworkStarter(sender)
1143	}
1144
1145	/// Start the network. Call this after all sub-components have been initialized.
1146	///
1147	/// > **Note**: If you don't call this function, the networking will not work.
1148	pub fn start_network(self) {
1149		let _ = self.0.send(());
1150	}
1151}