1use crate::{
20 build_network_future, build_system_rpc_future,
21 client::{Client, ClientConfig},
22 config::{Configuration, ExecutorConfiguration, KeystoreConfig, Multiaddr, PrometheusConfig},
23 error::Error,
24 metrics::MetricsService,
25 start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
26 TaskManager, TransactionPoolAdapter,
27};
28use futures::{select, FutureExt, StreamExt};
29use jsonrpsee::RpcModule;
30use log::{debug, error, info};
31use prometheus_endpoint::Registry;
32use sc_chain_spec::{get_extension, ChainSpec};
33use sc_client_api::{
34 execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
35 BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, KeysIter, StorageProvider,
36 TrieCacheContext, UsageProvider,
37};
38use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, PruningMode};
39use sc_consensus::import_queue::{ImportQueue, ImportQueueService};
40use sc_executor::{
41 sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf,
42 WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
43};
44use sc_keystore::LocalKeystore;
45use sc_network::{
46 config::{FullNetworkConfiguration, ProtocolId, SyncMode},
47 multiaddr::Protocol,
48 service::{
49 traits::{PeerStore, RequestResponseConfig},
50 NotificationMetrics,
51 },
52 NetworkBackend, NetworkStateInfo,
53};
54use sc_network_common::role::{Role, Roles};
55use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
56use sc_network_sync::{
57 block_relay_protocol::{BlockDownloader, BlockRelayParams},
58 block_request_handler::BlockRequestHandler,
59 engine::SyncingEngine,
60 service::network::{NetworkServiceHandle, NetworkServiceProvider},
61 state_request_handler::StateRequestHandler,
62 strategy::{
63 polkadot::{PolkadotSyncingStrategy, PolkadotSyncingStrategyConfig},
64 SyncingStrategy,
65 },
66 warp_request_handler::RequestHandler as WarpSyncRequestHandler,
67 SyncingService, WarpSyncConfig,
68};
69use sc_rpc::{
70 author::AuthorApiServer,
71 chain::ChainApiServer,
72 offchain::OffchainApiServer,
73 state::{ChildStateApiServer, StateApiServer},
74 system::SystemApiServer,
75 DenyUnsafe, SubscriptionTaskExecutor,
76};
77use sc_rpc_spec_v2::{
78 archive::ArchiveApiServer,
79 chain_head::ChainHeadApiServer,
80 chain_spec::ChainSpecApiServer,
81 transaction::{TransactionApiServer, TransactionBroadcastApiServer},
82};
83use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
84use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
85use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
86use sp_api::{CallApiAt, ProvideRuntimeApi};
87use sp_blockchain::{HeaderBackend, HeaderMetadata};
88use sp_consensus::block_validation::{
89 BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
90};
91use sp_core::traits::{CodeExecutor, SpawnNamed};
92use sp_keystore::KeystorePtr;
93use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
94use sp_storage::{ChildInfo, ChildType, PrefixedStorageKey};
95use std::{
96 str::FromStr,
97 sync::Arc,
98 time::{Duration, SystemTime},
99};
100
101pub type TFullClient<TBl, TRtApi, TExec> =
103 Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
104
105pub type TFullBackend<TBl> = Backend<TBl>;
107
108pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
110
111type TFullParts<TBl, TRtApi, TExec> =
112 (TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
113
114pub struct KeystoreContainer(Arc<LocalKeystore>);
116
117impl KeystoreContainer {
118 pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
120 let keystore = Arc::new(match config {
121 KeystoreConfig::Path { path, password } =>
122 LocalKeystore::open(path.clone(), password.clone())?,
123 KeystoreConfig::InMemory => LocalKeystore::in_memory(),
124 });
125
126 Ok(Self(keystore))
127 }
128
129 pub fn keystore(&self) -> KeystorePtr {
131 self.0.clone()
132 }
133
134 pub fn local_keystore(&self) -> Arc<LocalKeystore> {
136 self.0.clone()
137 }
138}
139
140pub fn new_full_client<TBl, TRtApi, TExec>(
142 config: &Configuration,
143 telemetry: Option<TelemetryHandle>,
144 executor: TExec,
145) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
146where
147 TBl: BlockT,
148 TExec: CodeExecutor + RuntimeVersionOf + Clone,
149{
150 new_full_parts(config, telemetry, executor).map(|parts| parts.0)
151}
152
153pub fn new_full_parts_record_import<TBl, TRtApi, TExec>(
155 config: &Configuration,
156 telemetry: Option<TelemetryHandle>,
157 executor: TExec,
158 enable_import_proof_recording: bool,
159) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
160where
161 TBl: BlockT,
162 TExec: CodeExecutor + RuntimeVersionOf + Clone,
163{
164 let backend = new_db_backend(config.db_config())?;
165
166 let genesis_block_builder = GenesisBlockBuilder::new(
167 config.chain_spec.as_storage_builder(),
168 !config.no_genesis(),
169 backend.clone(),
170 executor.clone(),
171 )?;
172
173 new_full_parts_with_genesis_builder(
174 config,
175 telemetry,
176 executor,
177 backend,
178 genesis_block_builder,
179 enable_import_proof_recording,
180 )
181}
182pub fn new_full_parts<TBl, TRtApi, TExec>(
184 config: &Configuration,
185 telemetry: Option<TelemetryHandle>,
186 executor: TExec,
187) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
188where
189 TBl: BlockT,
190 TExec: CodeExecutor + RuntimeVersionOf + Clone,
191{
192 new_full_parts_record_import(config, telemetry, executor, false)
193}
194
195pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
197 config: &Configuration,
198 telemetry: Option<TelemetryHandle>,
199 executor: TExec,
200 backend: Arc<TFullBackend<TBl>>,
201 genesis_block_builder: TBuildGenesisBlock,
202 enable_import_proof_recording: bool,
203) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
204where
205 TBl: BlockT,
206 TExec: CodeExecutor + RuntimeVersionOf + Clone,
207 TBuildGenesisBlock: BuildGenesisBlock<
208 TBl,
209 BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
210 >,
211{
212 let keystore_container = KeystoreContainer::new(&config.keystore)?;
213
214 let task_manager = {
215 let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
216 TaskManager::new(config.tokio_handle.clone(), registry)?
217 };
218
219 let chain_spec = &config.chain_spec;
220 let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
221 .cloned()
222 .unwrap_or_default();
223
224 let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
225 .cloned()
226 .unwrap_or_default();
227
228 let client = {
229 let extensions = ExecutionExtensions::new(None, Arc::new(executor.clone()));
230
231 let wasm_runtime_substitutes = config
232 .chain_spec
233 .code_substitutes()
234 .into_iter()
235 .map(|(n, c)| {
236 let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
237 Error::Application(Box::from(format!(
238 "Failed to parse `{}` as block number for code substitutes. \
239 In an old version the key for code substitute was a block hash. \
240 Please update the chain spec to a version that is compatible with your node.",
241 n
242 )))
243 })?;
244 Ok((number, c))
245 })
246 .collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
247
248 let client = new_client(
249 backend.clone(),
250 executor,
251 genesis_block_builder,
252 fork_blocks,
253 bad_blocks,
254 extensions,
255 Box::new(task_manager.spawn_handle()),
256 config.prometheus_config.as_ref().map(|config| config.registry.clone()),
257 telemetry,
258 ClientConfig {
259 offchain_worker_enabled: config.offchain_worker.enabled,
260 offchain_indexing_api: config.offchain_worker.indexing_enabled,
261 wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
262 no_genesis: config.no_genesis(),
263 wasm_runtime_substitutes,
264 enable_import_proof_recording,
265 },
266 )?;
267
268 if let Some(warm_up_strategy) = config.warm_up_trie_cache {
269 let storage_root = client.usage_info().chain.best_hash;
270 let backend_clone = backend.clone();
271
272 if warm_up_strategy.is_blocking() {
273 warm_up_trie_cache(backend_clone, storage_root)?;
276 } else {
277 task_manager.spawn_handle().spawn_blocking(
278 "warm-up-trie-cache",
279 None,
280 async move {
281 if let Err(e) = warm_up_trie_cache(backend_clone, storage_root) {
282 error!("Failed to warm up trie cache: {e}");
283 }
284 },
285 );
286 }
287 }
288
289 client
290 };
291
292 Ok((client, backend, keystore_container, task_manager))
293}
294
295fn child_info(key: Vec<u8>) -> Option<ChildInfo> {
296 let prefixed_key = PrefixedStorageKey::new(key);
297 ChildType::from_prefixed_key(&prefixed_key).and_then(|(child_type, storage_key)| {
298 (child_type == ChildType::ParentKeyId).then(|| ChildInfo::new_default(storage_key))
299 })
300}
301
302fn warm_up_trie_cache<TBl: BlockT>(
303 backend: Arc<TFullBackend<TBl>>,
304 storage_root: TBl::Hash,
305) -> Result<(), Error> {
306 use sc_client_api::backend::Backend;
307 use sp_state_machine::Backend as StateBackend;
308
309 let untrusted_state = || backend.state_at(storage_root, TrieCacheContext::Untrusted);
310 let trusted_state = || backend.state_at(storage_root, TrieCacheContext::Trusted);
311
312 debug!("Populating trie cache started",);
313 let start_time = std::time::Instant::now();
314 let mut keys_count = 0;
315 let mut child_keys_count = 0;
316 for key in KeysIter::<_, TBl>::new(untrusted_state()?, None, None)? {
317 if keys_count != 0 && keys_count % 100_000 == 0 {
318 debug!("{} keys and {} child keys have been warmed", keys_count, child_keys_count);
319 }
320 match child_info(key.0.clone()) {
321 Some(info) => {
322 for child_key in
323 KeysIter::<_, TBl>::new_child(untrusted_state()?, info.clone(), None, None)?
324 {
325 if trusted_state()?
326 .child_storage(&info, &child_key.0)
327 .unwrap_or_default()
328 .is_none()
329 {
330 debug!("Child storage value unexpectedly empty: {child_key:?}");
331 }
332 child_keys_count += 1;
333 }
334 },
335 None => {
336 if trusted_state()?.storage(&key.0).unwrap_or_default().is_none() {
337 debug!("Storage value unexpectedly empty: {key:?}");
338 }
339 keys_count += 1;
340 },
341 }
342 }
343 debug!(
344 "Trie cache populated with {keys_count} keys and {child_keys_count} child keys in {} s",
345 start_time.elapsed().as_secs_f32()
346 );
347
348 Ok(())
349}
350
351#[deprecated(note = "Please switch to `new_wasm_executor`. Will be removed at end of 2024.")]
354#[allow(deprecated)]
355pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
356 config: &Configuration,
357) -> sc_executor::NativeElseWasmExecutor<D> {
358 #[allow(deprecated)]
359 sc_executor::NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(&config.executor))
360}
361
362pub fn new_wasm_executor<H: HostFunctions>(config: &ExecutorConfiguration) -> WasmExecutor<H> {
364 let strategy = config
365 .default_heap_pages
366 .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
367 WasmExecutor::<H>::builder()
368 .with_execution_method(config.wasm_method)
369 .with_onchain_heap_alloc_strategy(strategy)
370 .with_offchain_heap_alloc_strategy(strategy)
371 .with_max_runtime_instances(config.max_runtime_instances)
372 .with_runtime_cache_size(config.runtime_cache_size)
373 .build()
374}
375
376pub fn new_db_backend<Block>(
378 settings: DatabaseSettings,
379) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
380where
381 Block: BlockT,
382{
383 const CANONICALIZATION_DELAY: u64 = 4096;
384
385 Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
386}
387
388pub fn new_client<E, Block, RA, G>(
390 backend: Arc<Backend<Block>>,
391 executor: E,
392 genesis_block_builder: G,
393 fork_blocks: ForkBlocks<Block>,
394 bad_blocks: BadBlocks<Block>,
395 execution_extensions: ExecutionExtensions<Block>,
396 spawn_handle: Box<dyn SpawnNamed>,
397 prometheus_registry: Option<Registry>,
398 telemetry: Option<TelemetryHandle>,
399 config: ClientConfig<Block>,
400) -> Result<
401 Client<
402 Backend<Block>,
403 crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
404 Block,
405 RA,
406 >,
407 sp_blockchain::Error,
408>
409where
410 Block: BlockT,
411 E: CodeExecutor + RuntimeVersionOf,
412 G: BuildGenesisBlock<
413 Block,
414 BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
415 >,
416{
417 let executor = crate::client::LocalCallExecutor::new(
418 backend.clone(),
419 executor,
420 config.clone(),
421 execution_extensions,
422 )?;
423
424 Client::new(
425 backend,
426 executor,
427 spawn_handle,
428 genesis_block_builder,
429 fork_blocks,
430 bad_blocks,
431 prometheus_registry,
432 telemetry,
433 config,
434 )
435}
436
437pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
439 pub config: Configuration,
441 pub client: Arc<TCl>,
443 pub backend: Arc<Backend>,
445 pub task_manager: &'a mut TaskManager,
447 pub keystore: KeystorePtr,
449 pub transaction_pool: Arc<TExPool>,
451 pub rpc_builder: Box<dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
453 pub network: Arc<dyn sc_network::service::traits::NetworkService>,
455 pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
457 pub tx_handler_controller:
459 sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
460 pub sync_service: Arc<SyncingService<TBl>>,
462 pub telemetry: Option<&'a mut Telemetry>,
464}
465
466pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
468 params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
469) -> Result<RpcHandlers, Error>
470where
471 TCl: ProvideRuntimeApi<TBl>
472 + HeaderMetadata<TBl, Error = sp_blockchain::Error>
473 + Chain<TBl>
474 + BlockBackend<TBl>
475 + BlockIdTo<TBl, Error = sp_blockchain::Error>
476 + ProofProvider<TBl>
477 + HeaderBackend<TBl>
478 + BlockchainEvents<TBl>
479 + ExecutorProvider<TBl>
480 + UsageProvider<TBl>
481 + StorageProvider<TBl, TBackend>
482 + CallApiAt<TBl>
483 + Send
484 + 'static,
485 <TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
486 + sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
487 + sp_session::SessionKeys<TBl>
488 + sp_api::ApiExt<TBl>,
489 TBl: BlockT,
490 TBl::Hash: Unpin,
491 TBl::Header: Unpin,
492 TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
493 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
494{
495 let SpawnTasksParams {
496 mut config,
497 task_manager,
498 client,
499 backend,
500 keystore,
501 transaction_pool,
502 rpc_builder,
503 network,
504 system_rpc_tx,
505 tx_handler_controller,
506 sync_service,
507 telemetry,
508 } = params;
509
510 let chain_info = client.usage_info().chain;
511
512 sp_session::generate_initial_session_keys(
513 client.clone(),
514 chain_info.best_hash,
515 config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
516 keystore.clone(),
517 )
518 .map_err(|e| Error::Application(Box::new(e)))?;
519
520 let sysinfo = sc_sysinfo::gather_sysinfo();
521 sc_sysinfo::print_sysinfo(&sysinfo);
522
523 let telemetry = telemetry
524 .map(|telemetry| {
525 init_telemetry(
526 config.network.node_name.clone(),
527 config.impl_name.clone(),
528 config.impl_version.clone(),
529 config.chain_spec.name().to_string(),
530 config.role.is_authority(),
531 network.clone(),
532 client.clone(),
533 telemetry,
534 Some(sysinfo),
535 )
536 })
537 .transpose()?;
538
539 info!("📦 Highest known block at #{}", chain_info.best_number);
540
541 let spawn_handle = task_manager.spawn_handle();
542
543 spawn_handle.spawn(
545 "txpool-notifications",
546 Some("transaction-pool"),
547 sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
548 );
549
550 spawn_handle.spawn(
551 "on-transaction-imported",
552 Some("transaction-pool"),
553 propagate_transaction_notifications(
554 transaction_pool.clone(),
555 tx_handler_controller,
556 telemetry.clone(),
557 ),
558 );
559
560 let metrics_service =
562 if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
563 let metrics = MetricsService::with_prometheus(
565 telemetry,
566 ®istry,
567 config.role,
568 &config.network.node_name,
569 &config.impl_version,
570 )?;
571 spawn_handle.spawn(
572 "prometheus-endpoint",
573 None,
574 prometheus_endpoint::init_prometheus(port, registry).map(drop),
575 );
576
577 metrics
578 } else {
579 MetricsService::new(telemetry)
580 };
581
582 spawn_handle.spawn(
584 "telemetry-periodic-send",
585 None,
586 metrics_service.run(
587 client.clone(),
588 transaction_pool.clone(),
589 network.clone(),
590 sync_service.clone(),
591 ),
592 );
593
594 let rpc_id_provider = config.rpc.id_provider.take();
595
596 let rpc_v2_metrics = config
601 .prometheus_registry()
602 .map(|registry| sc_rpc_spec_v2::transaction::TransactionMetrics::new(registry))
603 .transpose()?;
604
605 let gen_rpc_module = || {
606 gen_rpc_module(
607 task_manager.spawn_handle(),
608 client.clone(),
609 transaction_pool.clone(),
610 keystore.clone(),
611 system_rpc_tx.clone(),
612 config.impl_name.clone(),
613 config.impl_version.clone(),
614 config.chain_spec.as_ref(),
615 &config.state_pruning,
616 config.blocks_pruning,
617 backend.clone(),
618 &*rpc_builder,
619 rpc_v2_metrics.clone(),
620 )
621 };
622
623 let rpc_server_handle = start_rpc_servers(
624 &config.rpc,
625 config.prometheus_registry(),
626 &config.tokio_handle,
627 gen_rpc_module,
628 rpc_id_provider,
629 )?;
630
631 let listen_addrs = rpc_server_handle
632 .listen_addrs()
633 .into_iter()
634 .map(|socket_addr| {
635 let mut multiaddr: Multiaddr = socket_addr.ip().into();
636 multiaddr.push(Protocol::Tcp(socket_addr.port()));
637 multiaddr
638 })
639 .collect();
640
641 let in_memory_rpc = {
642 let mut module = gen_rpc_module()?;
643 module.extensions_mut().insert(DenyUnsafe::No);
644 module
645 };
646
647 let in_memory_rpc_handle = RpcHandlers::new(Arc::new(in_memory_rpc), listen_addrs);
648
649 spawn_handle.spawn(
651 "informant",
652 None,
653 sc_informant::build(client.clone(), network, sync_service.clone()),
654 );
655
656 task_manager.keep_alive((config.base_path, rpc_server_handle));
657
658 Ok(in_memory_rpc_handle)
659}
660
661pub async fn propagate_transaction_notifications<Block, ExPool>(
663 transaction_pool: Arc<ExPool>,
664 tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
665 <Block as BlockT>::Hash,
666 >,
667 telemetry: Option<TelemetryHandle>,
668) where
669 Block: BlockT,
670 ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
671{
672 const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1);
673
674 let mut notifications = transaction_pool.import_notification_stream().fuse();
676 let mut timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
677 let mut tx_imported = false;
678
679 loop {
680 select! {
681 notification = notifications.next() => {
682 let Some(hash) = notification else { return };
683
684 tx_handler_controller.propagate_transaction(hash);
685
686 tx_imported = true;
687 },
688 _ = timer => {
689 timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
690
691 if !tx_imported {
692 continue;
693 }
694
695 tx_imported = false;
696 let status = transaction_pool.status();
697
698 telemetry!(
699 telemetry;
700 SUBSTRATE_INFO;
701 "txpool.import";
702 "ready" => status.ready,
703 "future" => status.future,
704 );
705 }
706 }
707 }
708}
709
710pub fn init_telemetry<Block, Client, Network>(
712 name: String,
713 implementation: String,
714 version: String,
715 chain: String,
716 authority: bool,
717 network: Network,
718 client: Arc<Client>,
719 telemetry: &mut Telemetry,
720 sysinfo: Option<sc_telemetry::SysInfo>,
721) -> sc_telemetry::Result<TelemetryHandle>
722where
723 Block: BlockT,
724 Client: BlockBackend<Block>,
725 Network: NetworkStateInfo,
726{
727 let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
728 let connection_message = ConnectionMessage {
729 name,
730 implementation,
731 version,
732 target_os: sc_sysinfo::TARGET_OS.into(),
733 target_arch: sc_sysinfo::TARGET_ARCH.into(),
734 target_env: sc_sysinfo::TARGET_ENV.into(),
735 config: String::new(),
736 chain,
737 genesis_hash: format!("{:?}", genesis_hash),
738 authority,
739 startup_time: SystemTime::UNIX_EPOCH
740 .elapsed()
741 .map(|dur| dur.as_millis())
742 .unwrap_or(0)
743 .to_string(),
744 network_id: network.local_peer_id().to_base58(),
745 sysinfo,
746 };
747
748 telemetry.start_telemetry(connection_message)?;
749
750 Ok(telemetry.handle())
751}
752
753pub fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
755 spawn_handle: SpawnTaskHandle,
756 client: Arc<TCl>,
757 transaction_pool: Arc<TExPool>,
758 keystore: KeystorePtr,
759 system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
760 impl_name: String,
761 impl_version: String,
762 chain_spec: &dyn ChainSpec,
763 state_pruning: &Option<PruningMode>,
764 blocks_pruning: BlocksPruning,
765 backend: Arc<TBackend>,
766 rpc_builder: &(dyn Fn(SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>),
767 metrics: Option<sc_rpc_spec_v2::transaction::TransactionMetrics>,
768) -> Result<RpcModule<()>, Error>
769where
770 TBl: BlockT,
771 TCl: ProvideRuntimeApi<TBl>
772 + BlockchainEvents<TBl>
773 + HeaderBackend<TBl>
774 + HeaderMetadata<TBl, Error = sp_blockchain::Error>
775 + ExecutorProvider<TBl>
776 + CallApiAt<TBl>
777 + ProofProvider<TBl>
778 + StorageProvider<TBl, TBackend>
779 + BlockBackend<TBl>
780 + Send
781 + Sync
782 + 'static,
783 TBackend: sc_client_api::backend::Backend<TBl> + 'static,
784 <TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
785 TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
786 TBl::Hash: Unpin,
787 TBl::Header: Unpin,
788{
789 let system_info = sc_rpc::system::SystemInfo {
790 chain_name: chain_spec.name().into(),
791 impl_name,
792 impl_version,
793 properties: chain_spec.properties(),
794 chain_type: chain_spec.chain_type(),
795 };
796
797 let mut rpc_api = RpcModule::new(());
798 let task_executor = Arc::new(spawn_handle);
799
800 let (chain, state, child_state) = {
801 let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
802 let (state, child_state) = sc_rpc::state::new_full(client.clone(), task_executor.clone());
803 let state = state.into_rpc();
804 let child_state = child_state.into_rpc();
805
806 (chain, state, child_state)
807 };
808
809 const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
810
811 let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
812 client.clone(),
813 transaction_pool.clone(),
814 task_executor.clone(),
815 MAX_TRANSACTION_PER_CONNECTION,
816 )
817 .into_rpc();
818
819 let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
820 client.clone(),
821 transaction_pool.clone(),
822 task_executor.clone(),
823 metrics,
824 )
825 .into_rpc();
826
827 let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
828 client.clone(),
829 backend.clone(),
830 task_executor.clone(),
831 sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
833 )
834 .into_rpc();
835
836 let is_archive_node = state_pruning.as_ref().map(|sp| sp.is_archive()).unwrap_or(false) &&
841 blocks_pruning.is_archive();
842 let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
843 if is_archive_node {
844 let archive_v2 = sc_rpc_spec_v2::archive::Archive::new(
845 client.clone(),
846 backend.clone(),
847 genesis_hash,
848 task_executor.clone(),
849 )
850 .into_rpc();
851 rpc_api.merge(archive_v2).map_err(|e| Error::Application(e.into()))?;
852 }
853
854 let chain_spec_v2 = sc_rpc_spec_v2::chain_spec::ChainSpec::new(
856 chain_spec.name().into(),
857 genesis_hash,
858 chain_spec.properties(),
859 )
860 .into_rpc();
861
862 let author = sc_rpc::author::Author::new(
863 client.clone(),
864 transaction_pool,
865 keystore,
866 task_executor.clone(),
867 )
868 .into_rpc();
869
870 let system = sc_rpc::system::System::new(system_info, system_rpc_tx).into_rpc();
871
872 if let Some(storage) = backend.offchain_storage() {
873 let offchain = sc_rpc::offchain::Offchain::new(storage).into_rpc();
874
875 rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
876 }
877
878 rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
880 rpc_api
881 .merge(transaction_broadcast_rpc_v2)
882 .map_err(|e| Error::Application(e.into()))?;
883 rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
884 rpc_api.merge(chain_spec_v2).map_err(|e| Error::Application(e.into()))?;
885
886 rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
888 rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
889 rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
890 rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
891 rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
892 let extra_rpcs = rpc_builder(task_executor.clone())?;
894 rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
895
896 Ok(rpc_api)
897}
898
899pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client>
901where
902 Block: BlockT,
903 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
904{
905 pub config: &'a Configuration,
907 pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
909 pub client: Arc<Client>,
911 pub transaction_pool: Arc<TxPool>,
913 pub spawn_handle: SpawnTaskHandle,
915 pub import_queue: IQ,
917 pub block_announce_validator_builder: Option<
919 Box<dyn FnOnce(Arc<Client>) -> Box<dyn BlockAnnounceValidator<Block> + Send> + Send>,
920 >,
921 pub warp_sync_config: Option<WarpSyncConfig<Block>>,
923 pub block_relay: Option<BlockRelayParams<Block, Net>>,
926 pub metrics: NotificationMetrics,
928}
929
930pub fn build_network<Block, Net, TxPool, IQ, Client>(
932 params: BuildNetworkParams<Block, Net, TxPool, IQ, Client>,
933) -> Result<
934 (
935 Arc<dyn sc_network::service::traits::NetworkService>,
936 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
937 sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
938 Arc<SyncingService<Block>>,
939 ),
940 Error,
941>
942where
943 Block: BlockT,
944 Client: ProvideRuntimeApi<Block>
945 + HeaderMetadata<Block, Error = sp_blockchain::Error>
946 + Chain<Block>
947 + BlockBackend<Block>
948 + BlockIdTo<Block, Error = sp_blockchain::Error>
949 + ProofProvider<Block>
950 + HeaderBackend<Block>
951 + BlockchainEvents<Block>
952 + 'static,
953 TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
954 IQ: ImportQueue<Block> + 'static,
955 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
956{
957 let BuildNetworkParams {
958 config,
959 mut net_config,
960 client,
961 transaction_pool,
962 spawn_handle,
963 import_queue,
964 block_announce_validator_builder,
965 warp_sync_config,
966 block_relay,
967 metrics,
968 } = params;
969
970 let block_announce_validator = if let Some(f) = block_announce_validator_builder {
971 f(client.clone())
972 } else {
973 Box::new(DefaultBlockAnnounceValidator)
974 };
975
976 let network_service_provider = NetworkServiceProvider::new();
977 let protocol_id = config.protocol_id();
978 let fork_id = config.chain_spec.fork_id();
979 let metrics_registry = config.prometheus_config.as_ref().map(|config| &config.registry);
980
981 let block_downloader = match block_relay {
982 Some(params) => {
983 let BlockRelayParams { mut server, downloader, request_response_config } = params;
984
985 net_config.add_request_response_protocol(request_response_config);
986
987 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
988 server.run().await;
989 });
990
991 downloader
992 },
993 None => build_default_block_downloader(
994 &protocol_id,
995 fork_id,
996 &mut net_config,
997 network_service_provider.handle(),
998 Arc::clone(&client),
999 config.network.default_peers_set.in_peers as usize +
1000 config.network.default_peers_set.out_peers as usize,
1001 &spawn_handle,
1002 ),
1003 };
1004
1005 let syncing_strategy = build_polkadot_syncing_strategy(
1006 protocol_id.clone(),
1007 fork_id,
1008 &mut net_config,
1009 warp_sync_config,
1010 block_downloader,
1011 client.clone(),
1012 &spawn_handle,
1013 metrics_registry,
1014 )?;
1015
1016 let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1017 Roles::from(&config.role),
1018 Arc::clone(&client),
1019 metrics_registry,
1020 metrics.clone(),
1021 &net_config,
1022 protocol_id.clone(),
1023 fork_id,
1024 block_announce_validator,
1025 syncing_strategy,
1026 network_service_provider.handle(),
1027 import_queue.service(),
1028 net_config.peer_store_handle(),
1029 )?;
1030
1031 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1032
1033 build_network_advanced(BuildNetworkAdvancedParams {
1034 role: config.role,
1035 protocol_id,
1036 fork_id,
1037 ipfs_server: config.network.ipfs_server,
1038 announce_block: config.announce_block,
1039 net_config,
1040 client,
1041 transaction_pool,
1042 spawn_handle,
1043 import_queue,
1044 sync_service,
1045 block_announce_config,
1046 network_service_provider,
1047 metrics_registry,
1048 metrics,
1049 })
1050}
1051
1052pub struct BuildNetworkAdvancedParams<'a, Block, Net, TxPool, IQ, Client>
1054where
1055 Block: BlockT,
1056 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1057{
1058 pub role: Role,
1060 pub protocol_id: ProtocolId,
1062 pub fork_id: Option<&'a str>,
1064 pub ipfs_server: bool,
1066 pub announce_block: bool,
1068 pub net_config: FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1070 pub client: Arc<Client>,
1072 pub transaction_pool: Arc<TxPool>,
1074 pub spawn_handle: SpawnTaskHandle,
1076 pub import_queue: IQ,
1078 pub sync_service: SyncingService<Block>,
1080 pub block_announce_config: Net::NotificationProtocolConfig,
1082 pub network_service_provider: NetworkServiceProvider,
1084 pub metrics_registry: Option<&'a Registry>,
1086 pub metrics: NotificationMetrics,
1088}
1089
1090pub fn build_network_advanced<Block, Net, TxPool, IQ, Client>(
1093 params: BuildNetworkAdvancedParams<Block, Net, TxPool, IQ, Client>,
1094) -> Result<
1095 (
1096 Arc<dyn sc_network::service::traits::NetworkService>,
1097 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
1098 sc_network_transactions::TransactionsHandlerController<<Block as BlockT>::Hash>,
1099 Arc<SyncingService<Block>>,
1100 ),
1101 Error,
1102>
1103where
1104 Block: BlockT,
1105 Client: ProvideRuntimeApi<Block>
1106 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1107 + Chain<Block>
1108 + BlockBackend<Block>
1109 + BlockIdTo<Block, Error = sp_blockchain::Error>
1110 + ProofProvider<Block>
1111 + HeaderBackend<Block>
1112 + BlockchainEvents<Block>
1113 + 'static,
1114 TxPool: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
1115 IQ: ImportQueue<Block> + 'static,
1116 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1117{
1118 let BuildNetworkAdvancedParams {
1119 role,
1120 protocol_id,
1121 fork_id,
1122 ipfs_server,
1123 announce_block,
1124 mut net_config,
1125 client,
1126 transaction_pool,
1127 spawn_handle,
1128 import_queue,
1129 sync_service,
1130 block_announce_config,
1131 network_service_provider,
1132 metrics_registry,
1133 metrics,
1134 } = params;
1135
1136 let genesis_hash = client.info().genesis_hash;
1137
1138 let light_client_request_protocol_config = {
1139 let (handler, protocol_config) =
1141 LightClientRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone());
1142 spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
1143 protocol_config
1144 };
1145
1146 net_config.add_request_response_protocol(light_client_request_protocol_config);
1148
1149 let bitswap_config = ipfs_server.then(|| {
1150 let (handler, config) = Net::bitswap_server(client.clone());
1151 spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler);
1152
1153 config
1154 });
1155
1156 let (transactions_handler_proto, transactions_config) =
1158 sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>(
1159 protocol_id.clone(),
1160 genesis_hash,
1161 fork_id,
1162 metrics.clone(),
1163 net_config.peer_store_handle(),
1164 );
1165 net_config.add_notification_protocol(transactions_config);
1166
1167 let peer_store = net_config.take_peer_store();
1169 spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
1170
1171 let sync_service = Arc::new(sync_service);
1172
1173 let network_params = sc_network::config::Params::<Block, <Block as BlockT>::Hash, Net> {
1174 role,
1175 executor: {
1176 let spawn_handle = Clone::clone(&spawn_handle);
1177 Box::new(move |fut| {
1178 spawn_handle.spawn("libp2p-node", Some("networking"), fut);
1179 })
1180 },
1181 network_config: net_config,
1182 genesis_hash,
1183 protocol_id,
1184 fork_id: fork_id.map(ToOwned::to_owned),
1185 metrics_registry: metrics_registry.cloned(),
1186 block_announce_config,
1187 bitswap_config,
1188 notification_metrics: metrics,
1189 };
1190
1191 let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
1192 let network_mut = Net::new(network_params)?;
1193 let network = network_mut.network_service().clone();
1194
1195 let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
1196 network.clone(),
1197 sync_service.clone(),
1198 Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
1199 metrics_registry,
1200 )?;
1201 spawn_handle.spawn_blocking(
1202 "network-transactions-handler",
1203 Some("networking"),
1204 tx_handler.run(),
1205 );
1206
1207 spawn_handle.spawn_blocking(
1208 "chain-sync-network-service-provider",
1209 Some("networking"),
1210 network_service_provider.run(Arc::new(network.clone())),
1211 );
1212 spawn_handle.spawn("import-queue", None, {
1213 let sync_service = sync_service.clone();
1214
1215 async move { import_queue.run(sync_service.as_ref()).await }
1216 });
1217
1218 let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
1219 spawn_handle.spawn(
1220 "system-rpc-handler",
1221 Some("networking"),
1222 build_system_rpc_future::<_, _, <Block as BlockT>::Hash>(
1223 role,
1224 network_mut.network_service(),
1225 sync_service.clone(),
1226 client.clone(),
1227 system_rpc_rx,
1228 has_bootnodes,
1229 ),
1230 );
1231
1232 let future = build_network_future::<_, _, <Block as BlockT>::Hash, _>(
1233 network_mut,
1234 client,
1235 sync_service.clone(),
1236 announce_block,
1237 );
1238
1239 spawn_handle.spawn_blocking("network-worker", Some("networking"), future);
1247
1248 Ok((network, system_rpc_tx, tx_handler_controller, sync_service.clone()))
1249}
1250
1251pub struct DefaultSyncingEngineConfig<'a, Block, Client, Net>
1253where
1254 Block: BlockT,
1255 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1256{
1257 pub role: Role,
1259 pub protocol_id: ProtocolId,
1261 pub fork_id: Option<&'a str>,
1263 pub net_config: &'a mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1265 pub block_announce_validator: Box<dyn BlockAnnounceValidator<Block> + Send>,
1267 pub network_service_handle: NetworkServiceHandle,
1269 pub warp_sync_config: Option<WarpSyncConfig<Block>>,
1271 pub client: Arc<Client>,
1273 pub import_queue_service: Box<dyn ImportQueueService<Block>>,
1275 pub num_peers_hint: usize,
1277 pub spawn_handle: &'a SpawnTaskHandle,
1279 pub metrics_registry: Option<&'a Registry>,
1281 pub metrics: NotificationMetrics,
1283}
1284
1285pub fn build_default_syncing_engine<Block, Client, Net>(
1288 config: DefaultSyncingEngineConfig<Block, Client, Net>,
1289) -> Result<(SyncingService<Block>, Net::NotificationProtocolConfig), Error>
1290where
1291 Block: BlockT,
1292 Client: HeaderBackend<Block>
1293 + BlockBackend<Block>
1294 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1295 + ProofProvider<Block>
1296 + Send
1297 + Sync
1298 + 'static,
1299 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1300{
1301 let DefaultSyncingEngineConfig {
1302 role,
1303 protocol_id,
1304 fork_id,
1305 net_config,
1306 block_announce_validator,
1307 network_service_handle,
1308 warp_sync_config,
1309 client,
1310 import_queue_service,
1311 num_peers_hint,
1312 spawn_handle,
1313 metrics_registry,
1314 metrics,
1315 } = config;
1316
1317 let block_downloader = build_default_block_downloader(
1318 &protocol_id,
1319 fork_id,
1320 net_config,
1321 network_service_handle.clone(),
1322 client.clone(),
1323 num_peers_hint,
1324 spawn_handle,
1325 );
1326 let syncing_strategy = build_polkadot_syncing_strategy(
1327 protocol_id.clone(),
1328 fork_id,
1329 net_config,
1330 warp_sync_config,
1331 block_downloader,
1332 client.clone(),
1333 spawn_handle,
1334 metrics_registry,
1335 )?;
1336
1337 let (syncing_engine, sync_service, block_announce_config) = SyncingEngine::new(
1338 Roles::from(&role),
1339 client,
1340 metrics_registry,
1341 metrics,
1342 &net_config,
1343 protocol_id,
1344 fork_id,
1345 block_announce_validator,
1346 syncing_strategy,
1347 network_service_handle,
1348 import_queue_service,
1349 net_config.peer_store_handle(),
1350 )?;
1351
1352 spawn_handle.spawn_blocking("syncing", None, syncing_engine.run());
1353
1354 Ok((sync_service, block_announce_config))
1355}
1356
1357pub fn build_default_block_downloader<Block, Client, Net>(
1359 protocol_id: &ProtocolId,
1360 fork_id: Option<&str>,
1361 net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1362 network_service_handle: NetworkServiceHandle,
1363 client: Arc<Client>,
1364 num_peers_hint: usize,
1365 spawn_handle: &SpawnTaskHandle,
1366) -> Arc<dyn BlockDownloader<Block>>
1367where
1368 Block: BlockT,
1369 Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
1370 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1371{
1372 let BlockRelayParams { mut server, downloader, request_response_config } =
1375 BlockRequestHandler::new::<Net>(
1376 network_service_handle,
1377 &protocol_id,
1378 fork_id,
1379 client.clone(),
1380 num_peers_hint,
1381 );
1382
1383 spawn_handle.spawn("block-request-handler", Some("networking"), async move {
1384 server.run().await;
1385 });
1386
1387 net_config.add_request_response_protocol(request_response_config);
1388
1389 downloader
1390}
1391
1392pub fn build_polkadot_syncing_strategy<Block, Client, Net>(
1394 protocol_id: ProtocolId,
1395 fork_id: Option<&str>,
1396 net_config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Net>,
1397 warp_sync_config: Option<WarpSyncConfig<Block>>,
1398 block_downloader: Arc<dyn BlockDownloader<Block>>,
1399 client: Arc<Client>,
1400 spawn_handle: &SpawnTaskHandle,
1401 metrics_registry: Option<&Registry>,
1402) -> Result<Box<dyn SyncingStrategy<Block>>, Error>
1403where
1404 Block: BlockT,
1405 Client: HeaderBackend<Block>
1406 + BlockBackend<Block>
1407 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1408 + ProofProvider<Block>
1409 + Send
1410 + Sync
1411 + 'static,
1412 Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
1413{
1414 if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() {
1415 return Err("Warp sync enabled, but no warp sync provider configured.".into())
1416 }
1417
1418 if client.requires_full_sync() {
1419 match net_config.network_config.sync_mode {
1420 SyncMode::LightState { .. } =>
1421 return Err("Fast sync doesn't work for archive nodes".into()),
1422 SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
1423 SyncMode::Full => {},
1424 }
1425 }
1426
1427 let genesis_hash = client.info().genesis_hash;
1428
1429 let (state_request_protocol_config, state_request_protocol_name) = {
1430 let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
1431 net_config.network_config.default_peers_set.reserved_nodes.len();
1432 let (handler, protocol_config) =
1434 StateRequestHandler::new::<Net>(&protocol_id, fork_id, client.clone(), num_peer_hint);
1435 let config_name = protocol_config.protocol_name().clone();
1436
1437 spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
1438 (protocol_config, config_name)
1439 };
1440 net_config.add_request_response_protocol(state_request_protocol_config);
1441
1442 let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() {
1443 Some(WarpSyncConfig::WithProvider(warp_with_provider)) => {
1444 let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>(
1446 protocol_id,
1447 genesis_hash,
1448 fork_id,
1449 warp_with_provider.clone(),
1450 );
1451 let config_name = protocol_config.protocol_name().clone();
1452
1453 spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
1454 (Some(protocol_config), Some(config_name))
1455 },
1456 _ => (None, None),
1457 };
1458 if let Some(config) = warp_sync_protocol_config {
1459 net_config.add_request_response_protocol(config);
1460 }
1461
1462 let syncing_config = PolkadotSyncingStrategyConfig {
1463 mode: net_config.network_config.sync_mode,
1464 max_parallel_downloads: net_config.network_config.max_parallel_downloads,
1465 max_blocks_per_request: net_config.network_config.max_blocks_per_request,
1466 min_peers_to_start_warp_sync: net_config.network_config.min_peers_to_start_warp_sync,
1467 metrics_registry: metrics_registry.cloned(),
1468 state_request_protocol_name,
1469 block_downloader,
1470 };
1471 Ok(Box::new(PolkadotSyncingStrategy::new(
1472 syncing_config,
1473 client,
1474 warp_sync_config,
1475 warp_sync_protocol_name,
1476 )?))
1477}