1use crate::{
20 build_network_future, build_system_rpc_future,
21 client::{Client, ClientConfig},
22 config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
23 error::Error,
24 metrics::MetricsService,
25 start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
26 TaskManager, TransactionPoolAdapter,
27};
28use futures::{channel::oneshot, future::ready, FutureExt, StreamExt};
29use jsonrpsee::RpcModule;
30use log::info;
31use prometheus_endpoint::Registry;
32use sc_chain_spec::{get_extension, ChainSpec};
33use sc_client_api::{
34 execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
35 BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, StorageProvider, UsageProvider,
36};
37use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
38use sc_consensus::import_queue::ImportQueue;
39use sc_executor::{
40 sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
41 WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
42};
43use sc_keystore::LocalKeystore;
44use sc_network::{
45 config::{FullNetworkConfiguration, ProtocolId, SyncMode},
46 multiaddr::Protocol,
47 service::{
48 traits::{PeerStore, RequestResponseConfig},
49 NotificationMetrics,
50 },
51 NetworkBackend, NetworkStateInfo,
52};
53use sc_network_common::role::Roles;
54use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
55use sc_network_sync::{
56 block_relay_protocol::BlockRelayParams,
57 block_request_handler::BlockRequestHandler,
58 engine::SyncingEngine,
59 service::network::NetworkServiceProvider,
60 state_request_handler::StateRequestHandler,
61 strategy::{PolkadotSyncingStrategy, SyncingConfig, SyncingStrategy},
62 warp_request_handler::RequestHandler as WarpSyncRequestHandler,
63 SyncingService, WarpSyncConfig,
64};
65use sc_rpc::{
66 author::AuthorApiServer,
67 chain::ChainApiServer,
68 offchain::OffchainApiServer,
69 state::{ChildStateApiServer, StateApiServer},
70 system::SystemApiServer,
71 DenyUnsafe, SubscriptionTaskExecutor,
72};
73use sc_rpc_spec_v2::{
74 archive::ArchiveApiServer,
75 chain_head::ChainHeadApiServer,
76 chain_spec::ChainSpecApiServer,
77 transaction::{TransactionApiServer, TransactionBroadcastApiServer},
78};
79use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
80use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
81use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
82use sp_api::{CallApiAt, ProvideRuntimeApi};
83use sp_blockchain::{HeaderBackend, HeaderMetadata};
84use sp_consensus::block_validation::{
85 BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
86};
87use sp_core::traits::{CodeExecutor, SpawnNamed};
88use sp_keystore::KeystorePtr;
89use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
90use std::{str::FromStr, sync::Arc, time::SystemTime};
91
92pub type TFullClient<TBl, TRtApi, TExec> =
94 Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
95
96pub type TFullBackend<TBl> = Backend<TBl>;
98
99pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
101
102type TFullParts<TBl, TRtApi, TExec> =
103 (TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
104
105pub struct KeystoreContainer(Arc<LocalKeystore>);
107
108impl KeystoreContainer {
109 pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
111 let keystore = Arc::new(match config {
112 KeystoreConfig::Path { path, password } =>
113 LocalKeystore::open(path.clone(), password.clone())?,
114 KeystoreConfig::InMemory => LocalKeystore::in_memory(),
115 });
116
117 Ok(Self(keystore))
118 }
119
120 pub fn keystore(&self) -> KeystorePtr {
122 self.0.clone()
123 }
124
125 pub fn local_keystore(&self) -> Arc<LocalKeystore> {
127 self.0.clone()
128 }
129}
130
131pub fn new_full_client<TBl, TRtApi, TExec>(
133 config: &Configuration,
134 telemetry: Option<TelemetryHandle>,
135 executor: TExec,
136) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
137where
138 TBl: BlockT,
139 TExec: CodeExecutor + RuntimeVersionOf + Clone,
140{
141 new_full_parts(config, telemetry, executor).map(|parts| parts.0)
142}
143
144pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
146 config: &Configuration,
147 telemetry: Option<TelemetryHandle>,
148 executor: TExec,
149 enable_import_proof_recording: bool,
150) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
151where
152 TBl: BlockT,
153 TExec: CodeExecutor + RuntimeVersionOf + Clone,
154{
155 let backend = new_db_backend(config.db_config())?;
156
157 let genesis_block_builder = GenesisBlockBuilder::new(
158 config.chain_spec.as_storage_builder(),
159 !config.no_genesis(),
160 backend.clone(),
161 executor.clone(),
162 )?;
163
164 new_full_parts_with_genesis_builder(
165 config,
166 telemetry,
167 executor,
168 backend,
169 genesis_block_builder,
170 enable_import_proof_recording,
171 )
172}
173pub fn new_full_parts<TBl, TRtApi, TExec>(
175 config: &Configuration,
176 telemetry: Option<TelemetryHandle>,
177 executor: TExec,
178) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
179where
180 TBl: BlockT,
181 TExec: CodeExecutor + RuntimeVersionOf + Clone,
182{
183 new_full_parts_record_import(config, telemetry, executor, false)
184}
185
186pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
188 config: &Configuration,
189 telemetry: Option<TelemetryHandle>,
190 executor: TExec,
191 backend: Arc<TFullBackend<TBl>>,
192 genesis_block_builder: TBuildGenesisBlock,
193 enable_import_proof_recording: bool,
194) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
195where
196 TBl: BlockT,
197 TExec: CodeExecutor + RuntimeVersionOf + Clone,
198 TBuildGenesisBlock: BuildGenesisBlock<
199 TBl,
200 BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
201 >,
202{
203 let keystore_container = KeystoreContainer::new(&config.keystore)?;
204
205 let task_manager = {
206 let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
207 TaskManager::new(config.tokio_handle.clone(), registry)?
208 };
209
210 let chain_spec = &config.chain_spec;
211 let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
212 .cloned()
213 .unwrap_or_default();
214
215 let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
216 .cloned()
217 .unwrap_or_default();
218
219 let client = {
220 let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
221
222 let wasm_runtime_substitutes = config
223 .chain_spec
224 .code_substitutes()
225 .into_iter()
226 .map(|(n, c)| {
227 let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
228 Error::Application(Box::from(format!(
229 "Failed to parse `{}` as block number for code substitutes. \
230 In an old version the key for code substitute was a block hash. \
231 Please update the chain spec to a version that is compatible with your node.",
232 n
233 )))
234 })?;
235 Ok((number, c))
236 })
237 .collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
238
239 let client = new_client(
240 backend.clone(),
241 executor,
242 genesis_block_builder,
243 fork_blocks,
244 bad_blocks,
245 extensions,
246 Box::new(task_manager.spawn_handle()),
247 config.prometheus_config.as_ref().map(|config| config.registry.clone()),
248 telemetry,
249 ClientConfig {
250 offchain_worker_enabled: config.offchain_worker.enabled,
251 offchain_indexing_api: config.offchain_worker.indexing_enabled,
252 wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
253 no_genesis: config.no_genesis(),
254 wasm_runtime_substitutes,
255 enable_import_proof_recording,
256 },
257 )?;
258
259 client
260 };
261
262 Ok((client, backend, keystore_container, task_manager))
263}
264
265#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
268#[allow(deprecated)]
269pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
270 config: &Configuration,
271) -> sc_executor::NativeElseWasmExecutor<D> {
272 #[allow(deprecated)]
273 sc_executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(&config.executor))
274}
275
276pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
278 let strategy = config
279 .default_heap_pages
280 .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
281 WasmExecutor::<H>::builder()
282 .with_execution_method(config.wasm_method)
283 .with_onchain_heap_alloc_strategy(strategy)
284 .with_offchain_heap_alloc_strategy(strategy)
285 .with_max_runtime_instances(config.max_runtime_instances)
286 .with_runtime_cache_size(config.runtime_cache_size)
287 .build()
288}
289
290pub fn new_db_backend<Block>(
292 settings: DatabaseSettings,
293) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
294where
295 Block: BlockT,
296{
297 const CANONICALIZATION_DELAY: u64 = 4096;
298
299 Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
300}
301
302pub fn new_client<E, Block, RA, G>(
304 backend: Arc<Backend<Block>>,
305 executor: E,
306 genesis_block_builder: G,
307 fork_blocks: ForkBlocks<Block>,
308 bad_blocks: BadBlocks<Block>,
309 execution_extensions: ExecutionExtensions<Block>,
310 spawn_handle: Box<dyn SpawnNamed>,
311 prometheus_registry: Option<Registry>,
312 telemetry: Option<TelemetryHandle>,
313 config: ClientConfig<Block>,
314) -> Result<
315 Client<
316 Backend<Block>,
317 crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
318 Block,
319 RA,
320 >,
321 sp_blockchain::Error,
322>
323where
324 Block: BlockT,
325 E: CodeExecutor + RuntimeVersionOf,
326 G: BuildGenesisBlock<
327 Block,
328 BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
329 >,
330{
331 let executor = crate::client::LocalCallExecutor::new(
332 backend.clone(),
333 executor,
334 config.clone(),
335 execution_extensions,
336 )?;
337
338 Client::new(
339 backend,
340 executor,
341 spawn_handle,
342 genesis_block_builder,
343 fork_blocks,
344 bad_blocks,
345 prometheus_registry,
346 telemetry,
347 config,
348 )
349}
350
351pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
353 pub config: Configuration,
355 pub client: Arc<TCl>,
357 pub backend: Arc<Backend>,
359 pub task_manager: &'a mut TaskManager,
361 pub keystore: KeystorePtr,
363 pub transaction_pool: Arc<TExPool>,
365 pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
367 pub network: Arc<dyn sc_network::service::traits::NetworkService>,
369 pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
371 pub tx_handler_controller:
373 sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
374 pub sync_service: Arc<SyncingService<TBl>>,
376 pub telemetry: Option<&'a mut Telemetry>,
378}
379
380pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
382 params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
383) -> Result<RpcHandlers, Error>
384where
385 TCl: ProvideRuntimeApi<TBl>
386 + HeaderMetadata<TBl, Error = sp_blockchain::Error>
387 + Chain<TBl>
388 + BlockBackend<TBl>
389 + BlockIdTo<TBl, Error = sp_blockchain::Error>
390 + ProofProvider<TBl>
391 + HeaderBackend<TBl>
392 + BlockchainEvents<TBl>
393 + ExecutorProvider<TBl>
394 + UsageProvider<TBl>
395 + StorageProvider<TBl, TBackend>
396 + CallApiAt<TBl>
397 + Send
398 + 'static,
399 <TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
400 + sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
401 + sp_session::SessionKeys<TBl>
402 + sp_api::ApiExt<TBl>,
403 TBl: BlockT,
404 TBl::Hash: Unpin,
405 TBl::Header: Unpin,
406 TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
407 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
408{
409 let SpawnTasksParams {
410 mut config,
411 task_manager,
412 client,
413 backend,
414 keystore,
415 transaction_pool,
416 rpc_builder,
417 network,
418 system_rpc_tx,
419 tx_handler_controller,
420 sync_service,
421 telemetry,
422 } = params;
423
424 let chain_info = client.usage_info().chain;
425
426 sp_session::generate_initial_session_keys(
427 client.clone(),
428 chain_info.best_hash,
429 config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
430 keystore.clone(),
431 )
432 .map_err(|e| Error::Application(Box::new(e)))?;
433
434 let sysinfo = sc_sysinfo::gather_sysinfo();
435 sc_sysinfo::print_sysinfo(&sysinfo);
436
437 let telemetry = telemetry
438 .map(|telemetry| {
439 init_telemetry(
440 config.network.node_name.clone(),
441 config.impl_name.clone(),
442 config.impl_version.clone(),
443 config.chain_spec.name().to_string(),
444 config.role.is_authority(),
445 network.clone(),
446 client.clone(),
447 telemetry,
448 Some(sysinfo),
449 )
450 })
451 .transpose()?;
452
453 info!("📦 Highest known block at #{}", chain_info.best_number);
454
455 let spawn_handle = task_manager.spawn_handle();
456
457 spawn_handle.spawn(
459 "txpool-notifications",
460 Some("transaction-pool"),
461 sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
462 );
463
464 spawn_handle.spawn(
465 "on-transaction-imported",
466 Some("transaction-pool"),
467 propagate_transaction_notifications(
468 transaction_pool.clone(),
469 tx_handler_controller,
470 telemetry.clone(),
471 ),
472 );
473
474 let metrics_service =
476 if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
477 let metrics = MetricsService::with_prometheus(
479 telemetry,
480 ®istry,
481 config.role,
482 &config.network.node_name,
483 &config.impl_version,
484 )?;
485 spawn_handle.spawn(
486 "prometheus-endpoint",
487 None,
488 prometheus_endpoint::init_prometheus(port, registry).map(drop),
489 );
490
491 metrics
492 } else {
493 MetricsService::new(telemetry)
494 };
495
496 spawn_handle.spawn(
498 "telemetry-periodic-send",
499 None,
500 metrics_service.run(
501 client.clone(),
502 transaction_pool.clone(),
503 network.clone(),
504 sync_service.clone(),
505 ),
506 );
507
508 let rpc_id_provider = config.rpc.id_provider.take();
509
510 let gen_rpc_module = || {
512 gen_rpc_module(
513 task_manager.spawn_handle(),
514 client.clone(),
515 transaction_pool.clone(),
516 keystore.clone(),
517 system_rpc_tx.clone(),
518 config.impl_name.clone(),
519 config.impl_version.clone(),
520 config.chain_spec.as_ref(),
521 &config.state_pruning,
522 config.blocks_pruning,
523 backend.clone(),
524 &*rpc_builder,
525 )
526 };
527
528 let rpc_server_handle = start_rpc_servers(
529 &config.rpc,
530 config.prometheus_registry(),
531 &config.tokio_handle,
532 gen_rpc_module,
533 rpc_id_provider,
534 )?;
535
536 let listen_addrs = rpc_server_handle
537 .listen_addrs()
538 .into_iter()
539 .map(|socket_addr| {
540 let mut multiaddr: Multiaddr = socket_addr.ip().into();
541 multiaddr.push(Protocol::Tcp(socket_addr.port()));
542 multiaddr
543 })
544 .collect();
545
546 let in_memory_rpc = {
547 let mut module = gen_rpc_module()?;
548 module.extensions_mut().insert(DenyUnsafe::No);
549 module
550 };
551
552 let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
553
554 spawn_handle.spawn(
556 "informant",
557 None,
558 sc_informant::build(client.clone(), network, sync_service.clone()),
559 );
560
561 task_manager.keep_alive((config.base_path, rpc_server_handle));
562
563 Ok(in_memory_rpc_handle)
564}
565
566pub async fn propagate_transaction_notifications<Block, ExPool>(
568 transaction_pool: Arc<ExPool>,
569 tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
570 <Block as BlockT>::Hash,
571 >,
572 telemetry: Option<TelemetryHandle>,
573) where
574 Block: BlockT,
575 ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
576{
577 transaction_pool
579 .import_notification_stream()
580 .for_each(move |hash| {
581 tx_handler_controller.propagate_transaction(hash);
582 let status = transaction_pool.status();
583 telemetry!(
584 telemetry;
585 SUBSTRATE_INFO;
586 "txpool.import";
587 "ready" => status.ready,
588 "future" => status.future,
589 );
590 ready(())
591 })
592 .await;
593}
594
595pub fn init_telemetry<Block, Client, Network>(
597 name: String,
598 implementation: String,
599 version: String,
600 chain: String,
601 authority: bool,
602 network: Network,
603 client: Arc<Client>,
604 telemetry: &mut Telemetry,
605 sysinfo: Option<sc_telemetry::SysInfo>,
606) -> sc_telemetry::Result<TelemetryHandle>
607where
608 Block: BlockT,
609 Client: BlockBackend<Block>,
610 Network: NetworkStateInfo,
611{
612 let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
613 let connection_message = ConnectionMessage {
614 name,
615 implementation,
616 version,
617 target_os: sc_sysinfo::TARGET_OS.into(),
618 target_arch: sc_sysinfo::TARGET_ARCH.into(),
619 target_env: sc_sysinfo::TARGET_ENV.into(),
620 config: String::new(),
621 chain,
622 genesis_hash: format!("{:?}", genesis_hash),
623 authority,
624 startup_time: SystemTime::UNIX_EPOCH
625 .elapsed()
626 .map(|dur| dur.as_millis())
627 .unwrap_or(0)
628 .to_string(),
629 network_id: network.local_peer_id().to_base58(),
630 sysinfo,
631 };
632
633 telemetry.start_telemetry(connection_message)?;
634
635 Ok(telemetry.handle())
636}
637
638pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
640 spawn_handle: SpawnTaskHandle,
641 client: Arc<TCl>,
642 transaction_pool: Arc<TExPool>,
643 keystore: KeystorePtr,
644 system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
645 impl_name: String,
646 impl_version: String,
647 chain_spec: &dyn ChainSpec,
648 state_pruning: &Option<PruningMode>,
649 blocks_pruning: BlocksPruning,
650 backend: Arc<TBackend>,
651 rpc_builder: &(dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>),
652) -> Result<RpcModule<()>, Error>
653where
654 TBl: BlockT,
655 TCl: ProvideRuntimeApi<TBl>
656 + BlockchainEvents<TBl>
657 + HeaderBackend<TBl>
658 + HeaderMetadata<TBl, Error = sp_blockchain::Error>
659 + ExecutorProvider<TBl>
660 + CallApiAt<TBl>
661 + ProofProvider<TBl>
662 + StorageProvider<TBl, TBackend>
663 + BlockBackend<TBl>
664 + Send
665 + Sync
666 + 'static,
667 TBackend: sc_client_api::backend::Backend<TBl> + 'static,
668 <TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
669 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
670 TBl::Hash: Unpin,
671 TBl::Header: Unpin,
672{
673 let system_info = sc_rpc::system::SystemInfo {
674 chain_name: chain_spec.name().into(),
675 impl_name,
676 impl_version,
677 properties: chain_spec.properties(),
678 chain_type: chain_spec.chain_type(),
679 };
680
681 let mut rpc_api = RpcModule::new(());
682 let task_executor = Arc::new(spawn_handle);
683
684 let (chain, state, child_state) = {
685 let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
686 let (state, child_state) = sc_rpc::state::new_full(client.clone(), task_executor.clone());
687 let state = state.into_rpc();
688 let child_state = child_state.into_rpc();
689
690 (chain, state, child_state)
691 };
692
693 const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
694
695 let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
696 client.clone(),
697 transaction_pool.clone(),
698 task_executor.clone(),
699 MAX_TRANSACTION_PER_CONNECTION,
700 )
701 .into_rpc();
702
703 let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
704 client.clone(),
705 transaction_pool.clone(),
706 task_executor.clone(),
707 )
708 .into_rpc();
709
710 let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
711 client.clone(),
712 backend.clone(),
713 task_executor.clone(),
714 sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
716 )
717 .into_rpc();
718
719 let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
724 blocks_pruning.is_archive();
725 let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
726 if is_archive_node {
727 let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
728 client.clone(),
729 backend.clone(),
730 genesis_hash,
731 sc_rpc_spec_v2::archive::ArchiveConfig::default(),
733 )
734 .into_rpc();
735 rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
736 }
737
738 let chain_spec_v2 = sc_rpc_spec_v2::chain_spec::ChainSpec::new(
740 chain_spec.name().into(),
741 genesis_hash,
742 chain_spec.properties(),
743 )
744 .into_rpc();
745
746 let author = sc_rpc::author::Author::new(
747 client.clone(),
748 transaction_pool,
749 keystore,
750 task_executor.clone(),
751 )
752 .into_rpc();
753
754 let system = sc_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
755
756 if let Some(storage) = backend.offchain_storage() {
757 let offchain = sc_rpc::offchain::Offchain::new(storage).into_rpc();
758
759 rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
760 }
761
762 rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
764 rpc_api
765 .merge(transaction_broadcast_rpc_v2)
766 .map_err(|e| Error::Application(e.into()))?;
767 rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
768 rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
769
770 rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
772 rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
773 rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
774 rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
775 rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
776 let extra_rpcs = rpc_builder(task_executor.clone())?;
778 rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
779
780 Ok(rpc_api)
781}
782
783pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
785where
786 Block: BlockT,
787 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
788{
789 pub config: &'a Configuration,
791 pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
793 pub client: Arc<Client>,
795 pub transaction_pool: Arc<TxPool>,
797 pub spawn_handle: SpawnTaskHandle,
799 pub import_queue: IQ,
801 pub block_announce_validator_builder: Option<
803 Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
804 >,
805 pub syncing_strategy: Box<dyn SyncingStrategy<Block>>,
807 pub block_relay: Option<BlockRelayParams<Block, Net>>,
810 pub metrics: NotificationMetrics,
812}
813
814pub fn build_network<Block, Net, TxPool, IQ, Client>(
816 params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
817) -> Result<
818 (
819 Arc<dyn sc_network::service::traits::NetworkService>,
820 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
821 sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
822 NetworkStarter,
823 Arc<SyncingService<Block>>,
824 ),
825 Error,
826>
827where
828 Block: BlockT,
829 Client: ProvideRuntimeApi<Block>
830 + HeaderMetadata<Block, Error = sp_blockchain::Error>
831 + Chain<Block>
832 + BlockBackend<Block>
833 + BlockIdTo<Block, Error = sp_blockchain::Error>
834 + ProofProvider<Block>
835 + HeaderBackend<Block>
836 + BlockchainEvents<Block>
837 + 'static,
838 TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
839 IQ: ImportQueue<Block> + 'static,
840 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
841{
842 let BuildNetworkParams {
843 config,
844 mut net_config,
845 client,
846 transaction_pool,
847 spawn_handle,
848 import_queue,
849 block_announce_validator_builder,
850 syncing_strategy,
851 block_relay,
852 metrics,
853 } = params;
854
855 let protocol_id = config.protocol_id();
856 let genesis_hash = client.info().genesis_hash;
857
858 let block_announce_validator = if let Some(f) = block_announce_validator_builder {
859 f(client.clone())
860 } else {
861 Box::new(DefaultBlockAnnounceValidator)
862 };
863
864 let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new();
865 let (mut block_server, block_downloader, block_request_protocol_config) = match block_relay {
866 Some(params) => (params.server, params.downloader, params.request_response_config),
867 None => {
868 let params = BlockRequestHandler::new::<Net>(
871 chain_sync_network_handle.clone(),
872 &protocol_id,
873 config.chain_spec.fork_id(),
874 client.clone(),
875 config.network.default_peers_set.in_peers as usize +
876 config.network.default_peers_set.out_peers as usize,
877 );
878 (params.server, params.downloader, params.request_response_config)
879 },
880 };
881 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
882 block_server.run().await;
883 });
884
885 let light_client_request_protocol_config = {
886 let (handler, protocol_config) = LightClientRequestHandler::new::<Net>(
888 &protocol_id,
889 config.chain_spec.fork_id(),
890 client.clone(),
891 );
892 spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
893 protocol_config
894 };
895
896 net_config.add_request_response_protocol(block_request_protocol_config);
898 net_config.add_request_response_protocol(light_client_request_protocol_config);
899
900 let bitswap_config = config.network.ipfs_server.then(|| {
901 let (handler, config) = Net::bitswap_server(client.clone());
902 spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
903
904 config
905 });
906
907 let peer_store_handle = net_config.peer_store_handle();
909 let (transactions_handler_proto, transactions_config) =
910 sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
911 protocol_id.clone(),
912 genesis_hash,
913 config.chain_spec.fork_id(),
914 metrics.clone(),
915 Arc::clone(&peer_store_handle),
916 );
917 net_config.add_notification_protocol(transactions_config);
918
919 let peer_store = net_config.take_peer_store();
921 let peer_store_handle = peer_store.handle();
922 spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
923
924 let (engine, sync_service, block_announce_config) = SyncingEngine::new(
925 Roles::from(&config.role),
926 client.clone(),
927 config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(),
928 metrics.clone(),
929 &net_config,
930 protocol_id.clone(),
931 &config.chain_spec.fork_id().map(ToOwned::to_owned),
932 block_announce_validator,
933 syncing_strategy,
934 chain_sync_network_handle,
935 import_queue.service(),
936 block_downloader,
937 Arc::clone(&peer_store_handle),
938 )?;
939 let sync_service_import_queue = sync_service.clone();
940 let sync_service = Arc::new(sync_service);
941
942 let network_params = sc_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
943 role: config.role,
944 executor: {
945 let spawn_handle = Clone::clone(&spawn_handle);
946 Box::new(move |fut| {
947 spawn_handle.spawn("libp2p-node", Some("networking"), fut);
948 })
949 },
950 network_config: net_config,
951 genesis_hash,
952 protocol_id,
953 fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned),
954 metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
955 block_announce_config,
956 bitswap_config,
957 notification_metrics: metrics,
958 };
959
960 let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
961 let network_mut = Net::new(network_params)?;
962 let network = network_mut.network_service().clone();
963
964 let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
965 network.clone(),
966 sync_service.clone(),
967 Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
968 config.prometheus_config.as_ref().map(|config| &config.registry),
969 )?;
970 spawn_handle.spawn_blocking(
971 "network-transactions-handler",
972 Some("networking"),
973 tx_handler.run(),
974 );
975
976 spawn_handle.spawn_blocking(
977 "chain-sync-network-service-provider",
978 Some("networking"),
979 chain_sync_network_provider.run(Arc::new(network.clone())),
980 );
981 spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(sync_service_import_queue)));
982 spawn_handle.spawn_blocking("syncing", None, engine.run());
983
984 let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
985 spawn_handle.spawn(
986 "system-rpc-handler",
987 Some("networking"),
988 build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
989 config.role,
990 network_mut.network_service(),
991 sync_service.clone(),
992 client.clone(),
993 system_rpc_rx,
994 has_bootnodes,
995 ),
996 );
997
998 let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
999 network_mut,
1000 client,
1001 sync_service.clone(),
1002 config.announce_block,
1003 );
1004
1005 let (network_start_tx, network_start_rx) = oneshot::channel();
1020
1021 spawn_handle.spawn_blocking("network-worker", Some("networking"), async move {
1029 if network_start_rx.await.is_err() {
1030 log::warn!(
1031 "The NetworkStart returned as part of `build_network` has been silently dropped"
1032 );
1033 return
1036 }
1037
1038 future.await
1039 });
1040
1041 Ok((
1042 network,
1043 system_rpc_tx,
1044 tx_handler_controller,
1045 NetworkStarter(network_start_tx),
1046 sync_service.clone(),
1047 ))
1048}
1049
1050pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1052 protocol_id: ProtocolId,
1053 fork_id: Option<&str>,
1054 net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1055 warp_sync_config: Option<WarpSyncConfig<Block>>,
1056 client: Arc<Client>,
1057 spawn_handle: &SpawnTaskHandle,
1058 metrics_registry: Option<&Registry>,
1059) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1060where
1061 Block: BlockT,
1062 Client: HeaderBackend<Block>
1063 + BlockBackend<Block>
1064 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1065 + ProofProvider<Block>
1066 + Send
1067 + Sync
1068 + 'static,
1069
1070 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1071{
1072 if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1073 return Err("Warp sync enabled, but no warp sync provider configured.".into())
1074 }
1075
1076 if client.requires_full_sync() {
1077 match net_config.network_config.sync_mode {
1078 SyncMode::LightState { .. } =>
1079 return Err("Fast sync doesn't work for archive nodes".into()),
1080 SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1081 SyncMode::Full => {},
1082 }
1083 }
1084
1085 let genesis_hash = client.info().genesis_hash;
1086
1087 let (state_request_protocol_config, state_request_protocol_name) = {
1088 let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
1089 net_config.network_config.default_peers_set.reserved_nodes.len();
1090 let (handler, protocol_config) =
1092 StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1093 let config_name = protocol_config.protocol_name().clone();
1094
1095 spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1096 (protocol_config, config_name)
1097 };
1098 net_config.add_request_response_protocol(state_request_protocol_config);
1099
1100 let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1101 Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1102 let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1104 protocol_id,
1105 genesis_hash,
1106 fork_id,
1107 warp_with_provider.clone(),
1108 );
1109 let config_name = protocol_config.protocol_name().clone();
1110
1111 spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1112 (Some(protocol_config), Some(config_name))
1113 },
1114 _ => (None, None),
1115 };
1116 if let Some(config) = warp_sync_protocol_config {
1117 net_config.add_request_response_protocol(config);
1118 }
1119
1120 let syncing_config = SyncingConfig {
1121 mode: net_config.network_config.sync_mode,
1122 max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1123 max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1124 metrics_registry: metrics_registry.cloned(),
1125 state_request_protocol_name,
1126 };
1127 Ok(Box::new(PolkadotSyncingStrategy::new(
1128 syncing_config,
1129 client,
1130 warp_sync_config,
1131 warp_sync_protocol_name,
1132 )?))
1133}
1134
1135#[must_use]
1137pub struct NetworkStarter(oneshot::Sender<()>);
1138
1139impl NetworkStarter {
1140 pub fn new(sender: oneshot::Sender<()>) -> Self {
1142 NetworkStarter(sender)
1143 }
1144
1145 pub fn start_network(self) {
1149 let _ = self.0.send(());
1150 }
1151}