sc_service/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate service. Starts a thread that spins up the network, client, and extrinsic pool.
20//! Manages communication between them.
21
22#![warn(missing_docs)]
23#![recursion_limit = "1024"]
24
25pub mod chain_ops;
26pub mod config;
27pub mod error;
28
29mod builder;
30#[cfg(feature = "test-helpers")]
31pub mod client;
32#[cfg(not(feature = "test-helpers"))]
33mod client;
34mod metrics;
35mod task_manager;
36
37use crate::config::Multiaddr;
38use std::{
39	collections::HashMap,
40	net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
41};
42
43use codec::{Decode, Encode};
44use futures::{pin_mut, FutureExt, StreamExt};
45use jsonrpsee::RpcModule;
46use log::{debug, error, warn};
47use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider};
48use sc_network::{
49	config::MultiaddrWithPeerId, service::traits::NetworkService, NetworkBackend, NetworkBlock,
50	NetworkPeers, NetworkStateInfo,
51};
52use sc_network_sync::SyncingService;
53use sc_network_types::PeerId;
54use sc_rpc_server::Server;
55use sc_utils::mpsc::TracingUnboundedReceiver;
56use sp_blockchain::HeaderMetadata;
57use sp_consensus::SyncOracle;
58use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
59
60pub use self::{
61	builder::{
62		build_network, build_polkadot_syncing_strategy, gen_rpc_module, init_telemetry, new_client,
63		new_db_backend, new_full_client, new_full_parts, new_full_parts_record_import,
64		new_full_parts_with_genesis_builder, new_wasm_executor,
65		propagate_transaction_notifications, spawn_tasks, BuildNetworkParams, KeystoreContainer,
66		NetworkStarter, SpawnTasksParams, TFullBackend, TFullCallExecutor, TFullClient,
67	},
68	client::{ClientConfig, LocalCallExecutor},
69	error::Error,
70	metrics::MetricsService,
71};
72#[allow(deprecated)]
73pub use builder::new_native_or_wasm_executor;
74
75pub use sc_chain_spec::{
76	construct_genesis_block, resolve_state_version_from_wasm, BuildGenesisBlock,
77	GenesisBlockBuilder,
78};
79
80pub use config::{
81	BasePath, BlocksPruning, Configuration, DatabaseSource, PruningMode, Role, RpcMethods, TaskType,
82};
83pub use sc_chain_spec::{
84	ChainSpec, ChainType, Extension as ChainSpecExtension, GenericChainSpec, NoExtension,
85	Properties,
86};
87
88use crate::config::RpcConfiguration;
89use prometheus_endpoint::Registry;
90pub use sc_consensus::ImportQueue;
91pub use sc_executor::NativeExecutionDispatch;
92pub use sc_network_sync::WarpSyncConfig;
93#[doc(hidden)]
94pub use sc_network_transactions::config::{TransactionImport, TransactionImportFuture};
95pub use sc_rpc::{RandomIntegerSubscriptionId, RandomStringSubscriptionId};
96pub use sc_tracing::TracingReceiver;
97pub use sc_transaction_pool::Options as TransactionPoolOptions;
98pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool};
99#[doc(hidden)]
100pub use std::{ops::Deref, result::Result, sync::Arc};
101pub use task_manager::{SpawnTaskHandle, Task, TaskManager, TaskRegistry, DEFAULT_GROUP_NAME};
102use tokio::runtime::Handle;
103
104const DEFAULT_PROTOCOL_ID: &str = "sup";
105
106/// A running RPC service that can perform in-memory RPC queries.
107#[derive(Clone)]
108pub struct RpcHandlers {
109	// This is legacy and may be removed at some point, it was for WASM stuff before smoldot was a
110	// thing. https://github.com/paritytech/polkadot-sdk/pull/5038#discussion_r1694971805
111	rpc_module: Arc<RpcModule<()>>,
112
113	// This can be used to introspect the port the RPC server is listening on. SDK consumers are
114	// depending on this and it should be supported even if in-memory query support is removed.
115	listen_addresses: Vec<Multiaddr>,
116}
117
118impl RpcHandlers {
119	/// Create PRC handlers instance.
120	pub fn new(rpc_module: Arc<RpcModule<()>>, listen_addresses: Vec<Multiaddr>) -> Self {
121		Self { rpc_module, listen_addresses }
122	}
123
124	/// Starts an RPC query.
125	///
126	/// The query is passed as a string and must be valid JSON-RPC request object.
127	///
128	/// Returns a response and a stream if the call successful, fails if the
129	/// query could not be decoded as a JSON-RPC request object.
130	///
131	/// If the request subscribes you to events, the `stream` can be used to
132	/// retrieve the events.
133	pub async fn rpc_query(
134		&self,
135		json_query: &str,
136	) -> Result<(String, tokio::sync::mpsc::Receiver<String>), serde_json::Error> {
137		// Because `tokio::sync::mpsc::channel` is used under the hood
138		// it will panic if it's set to usize::MAX.
139		//
140		// This limit is used to prevent panics and is large enough.
141		const TOKIO_MPSC_MAX_SIZE: usize = tokio::sync::Semaphore::MAX_PERMITS;
142
143		self.rpc_module.raw_json_request(json_query, TOKIO_MPSC_MAX_SIZE).await
144	}
145
146	/// Provides access to the underlying `RpcModule`
147	pub fn handle(&self) -> Arc<RpcModule<()>> {
148		self.rpc_module.clone()
149	}
150
151	/// Provides access to listen addresses
152	pub fn listen_addresses(&self) -> &[Multiaddr] {
153		&self.listen_addresses[..]
154	}
155}
156
157/// An incomplete set of chain components, but enough to run the chain ops subcommands.
158pub struct PartialComponents<Client, Backend, SelectChain, ImportQueue, TransactionPool, Other> {
159	/// A shared client instance.
160	pub client: Arc<Client>,
161	/// A shared backend instance.
162	pub backend: Arc<Backend>,
163	/// The chain task manager.
164	pub task_manager: TaskManager,
165	/// A keystore container instance.
166	pub keystore_container: KeystoreContainer,
167	/// A chain selection algorithm instance.
168	pub select_chain: SelectChain,
169	/// An import queue.
170	pub import_queue: ImportQueue,
171	/// A shared transaction pool.
172	pub transaction_pool: Arc<TransactionPool>,
173	/// Everything else that needs to be passed into the main build function.
174	pub other: Other,
175}
176
177/// Builds a future that continuously polls the network.
178async fn build_network_future<
179	B: BlockT,
180	C: BlockchainEvents<B>
181		+ HeaderBackend<B>
182		+ BlockBackend<B>
183		+ HeaderMetadata<B, Error = sp_blockchain::Error>
184		+ ProofProvider<B>
185		+ Send
186		+ Sync
187		+ 'static,
188	H: sc_network_common::ExHashT,
189	N: NetworkBackend<B, <B as BlockT>::Hash>,
190>(
191	network: N,
192	client: Arc<C>,
193	sync_service: Arc<SyncingService<B>>,
194	announce_imported_blocks: bool,
195) {
196	let mut imported_blocks_stream = client.import_notification_stream().fuse();
197
198	// Stream of finalized blocks reported by the client.
199	let mut finality_notification_stream = client.finality_notification_stream().fuse();
200
201	let network_run = network.run().fuse();
202	pin_mut!(network_run);
203
204	loop {
205		futures::select! {
206			// List of blocks that the client has imported.
207			notification = imported_blocks_stream.next() => {
208				let notification = match notification {
209					Some(n) => n,
210					// If this stream is shut down, that means the client has shut down, and the
211					// most appropriate thing to do for the network future is to shut down too.
212					None => {
213						debug!("Block import stream has terminated, shutting down the network future.");
214						return
215					},
216				};
217
218				if announce_imported_blocks {
219					sync_service.announce_block(notification.hash, None);
220				}
221
222				if notification.is_new_best {
223					sync_service.new_best_block_imported(
224						notification.hash,
225						*notification.header.number(),
226					);
227				}
228			}
229
230			// List of blocks that the client has finalized.
231			notification = finality_notification_stream.select_next_some() => {
232				sync_service.on_block_finalized(notification.hash, notification.header);
233			}
234
235			// Drive the network. Shut down the network future if `NetworkWorker` has terminated.
236			_ = network_run => {
237				debug!("`NetworkWorker` has terminated, shutting down the network future.");
238				return
239			}
240		}
241	}
242}
243
244/// Builds a future that processes system RPC requests.
245pub async fn build_system_rpc_future<
246	B: BlockT,
247	C: BlockchainEvents<B>
248		+ HeaderBackend<B>
249		+ BlockBackend<B>
250		+ HeaderMetadata<B, Error = sp_blockchain::Error>
251		+ ProofProvider<B>
252		+ Send
253		+ Sync
254		+ 'static,
255	H: sc_network_common::ExHashT,
256>(
257	role: Role,
258	network_service: Arc<dyn NetworkService>,
259	sync_service: Arc<SyncingService<B>>,
260	client: Arc<C>,
261	mut rpc_rx: TracingUnboundedReceiver<sc_rpc::system::Request<B>>,
262	should_have_peers: bool,
263) {
264	// Current best block at initialization, to report to the RPC layer.
265	let starting_block = client.info().best_number;
266
267	loop {
268		// Answer incoming RPC requests.
269		let Some(req) = rpc_rx.next().await else {
270			debug!("RPC requests stream has terminated, shutting down the system RPC future.");
271			return
272		};
273
274		match req {
275			sc_rpc::system::Request::Health(sender) => match sync_service.peers_info().await {
276				Ok(info) => {
277					let _ = sender.send(sc_rpc::system::Health {
278						peers: info.len(),
279						is_syncing: sync_service.is_major_syncing(),
280						should_have_peers,
281					});
282				},
283				Err(_) => log::error!("`SyncingEngine` shut down"),
284			},
285			sc_rpc::system::Request::LocalPeerId(sender) => {
286				let _ = sender.send(network_service.local_peer_id().to_base58());
287			},
288			sc_rpc::system::Request::LocalListenAddresses(sender) => {
289				let peer_id = (network_service.local_peer_id()).into();
290				let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id);
291				let addresses = network_service
292					.listen_addresses()
293					.iter()
294					.map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string())
295					.collect();
296				let _ = sender.send(addresses);
297			},
298			sc_rpc::system::Request::Peers(sender) => match sync_service.peers_info().await {
299				Ok(info) => {
300					let _ = sender.send(
301						info.into_iter()
302							.map(|(peer_id, p)| sc_rpc::system::PeerInfo {
303								peer_id: peer_id.to_base58(),
304								roles: format!("{:?}", p.roles),
305								best_hash: p.best_hash,
306								best_number: p.best_number,
307							})
308							.collect(),
309					);
310				},
311				Err(_) => log::error!("`SyncingEngine` shut down"),
312			},
313			sc_rpc::system::Request::NetworkState(sender) => {
314				let network_state = network_service.network_state().await;
315				if let Ok(network_state) = network_state {
316					if let Ok(network_state) = serde_json::to_value(network_state) {
317						let _ = sender.send(network_state);
318					}
319				} else {
320					break
321				}
322			},
323			sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => {
324				let result = match MultiaddrWithPeerId::try_from(peer_addr) {
325					Ok(peer) => network_service.add_reserved_peer(peer),
326					Err(err) => Err(err.to_string()),
327				};
328				let x = result.map_err(sc_rpc::system::error::Error::MalformattedPeerArg);
329				let _ = sender.send(x);
330			},
331			sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => {
332				let _ = match peer_id.parse::<PeerId>() {
333					Ok(peer_id) => {
334						network_service.remove_reserved_peer(peer_id);
335						sender.send(Ok(()))
336					},
337					Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg(
338						e.to_string(),
339					))),
340				};
341			},
342			sc_rpc::system::Request::NetworkReservedPeers(sender) => {
343				let Ok(reserved_peers) = network_service.reserved_peers().await else {
344					break;
345				};
346
347				let _ =
348					sender.send(reserved_peers.iter().map(|peer_id| peer_id.to_base58()).collect());
349			},
350			sc_rpc::system::Request::NodeRoles(sender) => {
351				use sc_rpc::system::NodeRole;
352
353				let node_role = match role {
354					Role::Authority { .. } => NodeRole::Authority,
355					Role::Full => NodeRole::Full,
356				};
357
358				let _ = sender.send(vec![node_role]);
359			},
360			sc_rpc::system::Request::SyncState(sender) => {
361				use sc_rpc::system::SyncState;
362
363				match sync_service.status().await.map(|status| status.best_seen_block) {
364					Ok(best_seen_block) => {
365						let best_number = client.info().best_number;
366						let _ = sender.send(SyncState {
367							starting_block,
368							current_block: best_number,
369							highest_block: best_seen_block.unwrap_or(best_number),
370						});
371					},
372					Err(_) => log::error!("`SyncingEngine` shut down"),
373				}
374			},
375		}
376	}
377
378	debug!("`NetworkWorker` has terminated, shutting down the system RPC future.");
379}
380
381/// Starts RPC servers.
382pub fn start_rpc_servers<R>(
383	rpc_configuration: &RpcConfiguration,
384	registry: Option<&Registry>,
385	tokio_handle: &Handle,
386	gen_rpc_module: R,
387	rpc_id_provider: Option<Box<dyn sc_rpc_server::SubscriptionIdProvider>>,
388) -> Result<Server, error::Error>
389where
390	R: Fn() -> Result<RpcModule<()>, Error>,
391{
392	let endpoints: Vec<sc_rpc_server::RpcEndpoint> = if let Some(endpoints) =
393		rpc_configuration.addr.as_ref()
394	{
395		endpoints.clone()
396	} else {
397		let ipv6 =
398			SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, rpc_configuration.port, 0, 0));
399		let ipv4 = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, rpc_configuration.port));
400
401		vec![
402			sc_rpc_server::RpcEndpoint {
403				batch_config: rpc_configuration.batch_config,
404				cors: rpc_configuration.cors.clone(),
405				listen_addr: ipv4,
406				max_buffer_capacity_per_connection: rpc_configuration.message_buffer_capacity,
407				max_connections: rpc_configuration.max_connections,
408				max_payload_in_mb: rpc_configuration.max_request_size,
409				max_payload_out_mb: rpc_configuration.max_response_size,
410				max_subscriptions_per_connection: rpc_configuration.max_subs_per_conn,
411				rpc_methods: rpc_configuration.methods.into(),
412				rate_limit: rpc_configuration.rate_limit,
413				rate_limit_trust_proxy_headers: rpc_configuration.rate_limit_trust_proxy_headers,
414				rate_limit_whitelisted_ips: rpc_configuration.rate_limit_whitelisted_ips.clone(),
415				retry_random_port: true,
416				is_optional: false,
417			},
418			sc_rpc_server::RpcEndpoint {
419				batch_config: rpc_configuration.batch_config,
420				cors: rpc_configuration.cors.clone(),
421				listen_addr: ipv6,
422				max_buffer_capacity_per_connection: rpc_configuration.message_buffer_capacity,
423				max_connections: rpc_configuration.max_connections,
424				max_payload_in_mb: rpc_configuration.max_request_size,
425				max_payload_out_mb: rpc_configuration.max_response_size,
426				max_subscriptions_per_connection: rpc_configuration.max_subs_per_conn,
427				rpc_methods: rpc_configuration.methods.into(),
428				rate_limit: rpc_configuration.rate_limit,
429				rate_limit_trust_proxy_headers: rpc_configuration.rate_limit_trust_proxy_headers,
430				rate_limit_whitelisted_ips: rpc_configuration.rate_limit_whitelisted_ips.clone(),
431				retry_random_port: true,
432				is_optional: true,
433			},
434		]
435	};
436
437	let metrics = sc_rpc_server::RpcMetrics::new(registry)?;
438	let rpc_api = gen_rpc_module()?;
439
440	let server_config = sc_rpc_server::Config {
441		endpoints,
442		rpc_api,
443		metrics,
444		id_provider: rpc_id_provider,
445		tokio_handle: tokio_handle.clone(),
446	};
447
448	// TODO: https://github.com/paritytech/substrate/issues/13773
449	//
450	// `block_in_place` is a hack to allow callers to call `block_on` prior to
451	// calling `start_rpc_servers`.
452	match tokio::task::block_in_place(|| {
453		tokio_handle.block_on(sc_rpc_server::start_server(server_config))
454	}) {
455		Ok(server) => Ok(server),
456		Err(e) => Err(Error::Application(e)),
457	}
458}
459
460/// Transaction pool adapter.
461pub struct TransactionPoolAdapter<C, P> {
462	pool: Arc<P>,
463	client: Arc<C>,
464}
465
466impl<C, P> TransactionPoolAdapter<C, P> {
467	/// Constructs a new instance of [`TransactionPoolAdapter`].
468	pub fn new(pool: Arc<P>, client: Arc<C>) -> Self {
469		Self { pool, client }
470	}
471}
472
473/// Get transactions for propagation.
474///
475/// Function extracted to simplify the test and prevent creating `ServiceFactory`.
476fn transactions_to_propagate<Pool, B, H, E>(pool: &Pool) -> Vec<(H, B::Extrinsic)>
477where
478	Pool: TransactionPool<Block = B, Hash = H, Error = E>,
479	B: BlockT,
480	H: std::hash::Hash + Eq + sp_runtime::traits::Member + sp_runtime::traits::MaybeSerialize,
481	E: IntoPoolError + From<sc_transaction_pool_api::error::Error>,
482{
483	pool.ready()
484		.filter(|t| t.is_propagable())
485		.map(|t| {
486			let hash = t.hash().clone();
487			let ex: B::Extrinsic = t.data().clone();
488			(hash, ex)
489		})
490		.collect()
491}
492
493impl<B, H, C, Pool, E> sc_network_transactions::config::TransactionPool<H, B>
494	for TransactionPoolAdapter<C, Pool>
495where
496	C: HeaderBackend<B>
497		+ BlockBackend<B>
498		+ HeaderMetadata<B, Error = sp_blockchain::Error>
499		+ ProofProvider<B>
500		+ Send
501		+ Sync
502		+ 'static,
503	Pool: 'static + TransactionPool<Block = B, Hash = H, Error = E>,
504	B: BlockT,
505	H: std::hash::Hash + Eq + sp_runtime::traits::Member + sp_runtime::traits::MaybeSerialize,
506	E: 'static + IntoPoolError + From<sc_transaction_pool_api::error::Error>,
507{
508	fn transactions(&self) -> Vec<(H, B::Extrinsic)> {
509		transactions_to_propagate(&*self.pool)
510	}
511
512	fn hash_of(&self, transaction: &B::Extrinsic) -> H {
513		self.pool.hash_of(transaction)
514	}
515
516	fn import(&self, transaction: B::Extrinsic) -> TransactionImportFuture {
517		let encoded = transaction.encode();
518		let uxt = match Decode::decode(&mut &encoded[..]) {
519			Ok(uxt) => uxt,
520			Err(e) => {
521				debug!("Transaction invalid: {:?}", e);
522				return Box::pin(futures::future::ready(TransactionImport::Bad))
523			},
524		};
525
526		let import_future = self.pool.submit_one(
527			self.client.info().best_hash,
528			sc_transaction_pool_api::TransactionSource::External,
529			uxt,
530		);
531		Box::pin(async move {
532			match import_future.await {
533				Ok(_) => TransactionImport::NewGood,
534				Err(e) => match e.into_pool_error() {
535					Ok(sc_transaction_pool_api::error::Error::AlreadyImported(_)) =>
536						TransactionImport::KnownGood,
537					Ok(e) => {
538						debug!("Error adding transaction to the pool: {:?}", e);
539						TransactionImport::Bad
540					},
541					Err(e) => {
542						debug!("Error converting pool error: {}", e);
543						// it is not bad at least, just some internal node logic error, so peer is
544						// innocent.
545						TransactionImport::KnownGood
546					},
547				},
548			}
549		})
550	}
551
552	fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>) {
553		self.pool.on_broadcasted(propagations)
554	}
555
556	fn transaction(&self, hash: &H) -> Option<B::Extrinsic> {
557		self.pool.ready_transaction(hash).and_then(
558			// Only propagable transactions should be resolved for network service.
559			|tx| if tx.is_propagable() { Some(tx.data().clone()) } else { None },
560		)
561	}
562}
563
564#[cfg(test)]
565mod tests {
566	use super::*;
567	use futures::executor::block_on;
568	use sc_transaction_pool::BasicPool;
569	use sp_consensus::SelectChain;
570	use substrate_test_runtime_client::{
571		prelude::*,
572		runtime::{ExtrinsicBuilder, Transfer, TransferData},
573	};
574
575	#[test]
576	fn should_not_propagate_transactions_that_are_marked_as_such() {
577		// given
578		let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
579		let client = Arc::new(client);
580		let spawner = sp_core::testing::TaskExecutor::new();
581		let pool =
582			BasicPool::new_full(Default::default(), true.into(), None, spawner, client.clone());
583		let source = sp_runtime::transaction_validity::TransactionSource::External;
584		let best = block_on(longest_chain.best_chain()).unwrap();
585		let transaction = Transfer {
586			amount: 5,
587			nonce: 0,
588			from: AccountKeyring::Alice.into(),
589			to: AccountKeyring::Bob.into(),
590		}
591		.into_unchecked_extrinsic();
592		block_on(pool.submit_one(best.hash(), source, transaction.clone())).unwrap();
593		block_on(pool.submit_one(
594			best.hash(),
595			source,
596			ExtrinsicBuilder::new_call_do_not_propagate().nonce(1).build(),
597		))
598		.unwrap();
599		assert_eq!(pool.status().ready, 2);
600
601		// when
602		let transactions = transactions_to_propagate(&*pool);
603
604		// then
605		assert_eq!(transactions.len(), 1);
606		assert!(TransferData::try_from(&transactions[0].1).is_ok());
607	}
608}