use crate::{
build_network_future, build_system_rpc_future,
client::{Client, ClientConfig},
config::{Configuration, KeystoreConfig, PrometheusConfig},
error::Error,
metrics::MetricsService,
start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
TaskManager, TransactionPoolAdapter,
};
use futures::{channel::oneshot, future::ready, FutureExt, StreamExt};
use jsonrpsee::RpcModule;
use log::info;
use prometheus_endpoint::Registry;
use sc_chain_spec::get_extension;
use sc_client_api::{
execution_extensions::ExecutionExtensions, proof_provider::ProofProvider, BadBlocks,
BlockBackend, BlockchainEvents, ExecutorProvider, ForkBlocks, StorageProvider, UsageProvider,
};
use sc_client_db::{Backend, DatabaseSettings};
use sc_consensus::import_queue::ImportQueue;
use sc_executor::{
sp_wasm_interface::HostFunctions, HeapAllocStrategy, NativeElseWasmExecutor,
NativeExecutionDispatch, RuntimeVersionOf, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY,
};
use sc_keystore::LocalKeystore;
use sc_network::{
config::{FullNetworkConfiguration, SyncMode},
peer_store::PeerStore,
NetworkService, NetworkStateInfo, NetworkStatusProvider,
};
use sc_network_bitswap::BitswapRequestHandler;
use sc_network_common::{role::Roles, sync::warp::WarpSyncParams};
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
use sc_network_sync::{
block_request_handler::BlockRequestHandler, engine::SyncingEngine,
service::network::NetworkServiceProvider, state_request_handler::StateRequestHandler,
warp_request_handler::RequestHandler as WarpSyncRequestHandler, SyncingService,
};
use sc_rpc::{
author::AuthorApiServer,
chain::ChainApiServer,
offchain::OffchainApiServer,
state::{ChildStateApiServer, StateApiServer},
system::SystemApiServer,
DenyUnsafe, SubscriptionTaskExecutor,
};
use sc_rpc_spec_v2::{chain_head::ChainHeadApiServer, transaction::TransactionApiServer};
use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO};
use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_api::{CallApiAt, ProvideRuntimeApi};
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus::block_validation::{
BlockAnnounceValidator, Chain, DefaultBlockAnnounceValidator,
};
use sp_core::traits::{CodeExecutor, SpawnNamed};
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
use std::{str::FromStr, sync::Arc, time::SystemTime};
pub type TFullClient<TBl, TRtApi, TExec> =
Client<TFullBackend<TBl>, TFullCallExecutor<TBl, TExec>, TBl, TRtApi>;
pub type TFullBackend<TBl> = Backend<TBl>;
pub type TFullCallExecutor<TBl, TExec> = crate::client::LocalCallExecutor<TBl, Backend<TBl>, TExec>;
type TFullParts<TBl, TRtApi, TExec> =
(TFullClient<TBl, TRtApi, TExec>, Arc<TFullBackend<TBl>>, KeystoreContainer, TaskManager);
pub struct KeystoreContainer(Arc<LocalKeystore>);
impl KeystoreContainer {
pub fn new(config: &KeystoreConfig) -> Result<Self, Error> {
let keystore = Arc::new(match config {
KeystoreConfig::Path { path, password } =>
LocalKeystore::open(path.clone(), password.clone())?,
KeystoreConfig::InMemory => LocalKeystore::in_memory(),
});
Ok(Self(keystore))
}
pub fn keystore(&self) -> KeystorePtr {
self.0.clone()
}
pub fn local_keystore(&self) -> Arc<LocalKeystore> {
self.0.clone()
}
}
pub fn new_full_client<TBl, TRtApi, TExec>(
config: &Configuration,
telemetry: Option<TelemetryHandle>,
executor: TExec,
) -> Result<TFullClient<TBl, TRtApi, TExec>, Error>
where
TBl: BlockT,
TExec: CodeExecutor + RuntimeVersionOf + Clone,
{
new_full_parts(config, telemetry, executor).map(|parts| parts.0)
}
pub fn new_full_parts<TBl, TRtApi, TExec>(
config: &Configuration,
telemetry: Option<TelemetryHandle>,
executor: TExec,
) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
where
TBl: BlockT,
TExec: CodeExecutor + RuntimeVersionOf + Clone,
{
let backend = new_db_backend(config.db_config())?;
let genesis_block_builder = GenesisBlockBuilder::new(
config.chain_spec.as_storage_builder(),
!config.no_genesis(),
backend.clone(),
executor.clone(),
)?;
new_full_parts_with_genesis_builder(config, telemetry, executor, backend, genesis_block_builder)
}
pub fn new_full_parts_with_genesis_builder<TBl, TRtApi, TExec, TBuildGenesisBlock>(
config: &Configuration,
telemetry: Option<TelemetryHandle>,
executor: TExec,
backend: Arc<TFullBackend<TBl>>,
genesis_block_builder: TBuildGenesisBlock,
) -> Result<TFullParts<TBl, TRtApi, TExec>, Error>
where
TBl: BlockT,
TExec: CodeExecutor + RuntimeVersionOf + Clone,
TBuildGenesisBlock: BuildGenesisBlock<
TBl,
BlockImportOperation = <Backend<TBl> as sc_client_api::backend::Backend<TBl>>::BlockImportOperation
>,
{
let keystore_container = KeystoreContainer::new(&config.keystore)?;
let task_manager = {
let registry = config.prometheus_config.as_ref().map(|cfg| &cfg.registry);
TaskManager::new(config.tokio_handle.clone(), registry)?
};
let chain_spec = &config.chain_spec;
let fork_blocks = get_extension::<ForkBlocks<TBl>>(chain_spec.extensions())
.cloned()
.unwrap_or_default();
let bad_blocks = get_extension::<BadBlocks<TBl>>(chain_spec.extensions())
.cloned()
.unwrap_or_default();
let client = {
let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
None,
Arc::new(executor.clone()),
);
let wasm_runtime_substitutes = config
.chain_spec
.code_substitutes()
.into_iter()
.map(|(n, c)| {
let number = NumberFor::<TBl>::from_str(&n).map_err(|_| {
Error::Application(Box::from(format!(
"Failed to parse `{}` as block number for code substitutes. \
In an old version the key for code substitute was a block hash. \
Please update the chain spec to a version that is compatible with your node.",
n
)))
})?;
Ok((number, c))
})
.collect::<Result<std::collections::HashMap<_, _>, Error>>()?;
let client = new_client(
backend.clone(),
executor,
genesis_block_builder,
fork_blocks,
bad_blocks,
extensions,
Box::new(task_manager.spawn_handle()),
config.prometheus_config.as_ref().map(|config| config.registry.clone()),
telemetry,
ClientConfig {
offchain_worker_enabled: config.offchain_worker.enabled,
offchain_indexing_api: config.offchain_worker.indexing_enabled,
wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
no_genesis: matches!(
config.network.sync_mode,
SyncMode::LightState { .. } | SyncMode::Warp { .. }
),
wasm_runtime_substitutes,
},
)?;
client
};
Ok((client, backend, keystore_container, task_manager))
}
pub fn new_native_or_wasm_executor<D: NativeExecutionDispatch>(
config: &Configuration,
) -> NativeElseWasmExecutor<D> {
NativeElseWasmExecutor::new_with_wasm_executor(new_wasm_executor(config))
}
pub fn new_wasm_executor<H: HostFunctions>(config: &Configuration) -> WasmExecutor<H> {
let strategy = config
.default_heap_pages
.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |p| HeapAllocStrategy::Static { extra_pages: p as _ });
WasmExecutor::<H>::builder()
.with_execution_method(config.wasm_method)
.with_onchain_heap_alloc_strategy(strategy)
.with_offchain_heap_alloc_strategy(strategy)
.with_max_runtime_instances(config.max_runtime_instances)
.with_runtime_cache_size(config.runtime_cache_size)
.build()
}
pub fn new_db_backend<Block>(
settings: DatabaseSettings,
) -> Result<Arc<Backend<Block>>, sp_blockchain::Error>
where
Block: BlockT,
{
const CANONICALIZATION_DELAY: u64 = 4096;
Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
}
pub fn new_client<E, Block, RA, G>(
backend: Arc<Backend<Block>>,
executor: E,
genesis_block_builder: G,
fork_blocks: ForkBlocks<Block>,
bad_blocks: BadBlocks<Block>,
execution_extensions: ExecutionExtensions<Block>,
spawn_handle: Box<dyn SpawnNamed>,
prometheus_registry: Option<Registry>,
telemetry: Option<TelemetryHandle>,
config: ClientConfig<Block>,
) -> Result<
Client<
Backend<Block>,
crate::client::LocalCallExecutor<Block, Backend<Block>, E>,
Block,
RA,
>,
sp_blockchain::Error,
>
where
Block: BlockT,
E: CodeExecutor + RuntimeVersionOf,
G: BuildGenesisBlock<
Block,
BlockImportOperation = <Backend<Block> as sc_client_api::backend::Backend<Block>>::BlockImportOperation
>,
{
let executor = crate::client::LocalCallExecutor::new(
backend.clone(),
executor,
config.clone(),
execution_extensions,
)?;
Client::new(
backend,
executor,
spawn_handle,
genesis_block_builder,
fork_blocks,
bad_blocks,
prometheus_registry,
telemetry,
config,
)
}
pub trait SpawnTaskNetwork<Block: BlockT>:
NetworkStateInfo + NetworkStatusProvider + Send + Sync + 'static
{
}
impl<T, Block> SpawnTaskNetwork<Block> for T
where
Block: BlockT,
T: NetworkStateInfo + NetworkStatusProvider + Send + Sync + 'static,
{
}
pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
pub config: Configuration,
pub client: Arc<TCl>,
pub backend: Arc<Backend>,
pub task_manager: &'a mut TaskManager,
pub keystore: KeystorePtr,
pub transaction_pool: Arc<TExPool>,
pub rpc_builder:
Box<dyn Fn(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>>,
pub network: Arc<dyn SpawnTaskNetwork<TBl>>,
pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
pub tx_handler_controller:
sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
pub sync_service: Arc<SyncingService<TBl>>,
pub telemetry: Option<&'a mut Telemetry>,
}
pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
) -> Result<RpcHandlers, Error>
where
TCl: ProvideRuntimeApi<TBl>
+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
+ Chain<TBl>
+ BlockBackend<TBl>
+ BlockIdTo<TBl, Error = sp_blockchain::Error>
+ ProofProvider<TBl>
+ HeaderBackend<TBl>
+ BlockchainEvents<TBl>
+ ExecutorProvider<TBl>
+ UsageProvider<TBl>
+ StorageProvider<TBl, TBackend>
+ CallApiAt<TBl>
+ Send
+ 'static,
<TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
+ sp_session::SessionKeys<TBl>
+ sp_api::ApiExt<TBl>,
TBl: BlockT,
TBl::Hash: Unpin,
TBl::Header: Unpin,
TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
{
let SpawnTasksParams {
mut config,
task_manager,
client,
backend,
keystore,
transaction_pool,
rpc_builder,
network,
system_rpc_tx,
tx_handler_controller,
sync_service,
telemetry,
} = params;
let chain_info = client.usage_info().chain;
sp_session::generate_initial_session_keys(
client.clone(),
chain_info.best_hash,
config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
keystore.clone(),
)
.map_err(|e| Error::Application(Box::new(e)))?;
let sysinfo = sc_sysinfo::gather_sysinfo();
sc_sysinfo::print_sysinfo(&sysinfo);
let telemetry = telemetry
.map(|telemetry| {
init_telemetry(&mut config, network.clone(), client.clone(), telemetry, Some(sysinfo))
})
.transpose()?;
info!("📦 Highest known block at #{}", chain_info.best_number);
let spawn_handle = task_manager.spawn_handle();
spawn_handle.spawn(
"txpool-notifications",
Some("transaction-pool"),
sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
);
spawn_handle.spawn(
"on-transaction-imported",
Some("transaction-pool"),
transaction_notifications(
transaction_pool.clone(),
tx_handler_controller,
telemetry.clone(),
),
);
let metrics_service =
if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
let metrics = MetricsService::with_prometheus(telemetry, ®istry, &config)?;
spawn_handle.spawn(
"prometheus-endpoint",
None,
prometheus_endpoint::init_prometheus(port, registry).map(drop),
);
metrics
} else {
MetricsService::new(telemetry)
};
spawn_handle.spawn(
"telemetry-periodic-send",
None,
metrics_service.run(
client.clone(),
transaction_pool.clone(),
network.clone(),
sync_service.clone(),
),
);
let rpc_id_provider = config.rpc_id_provider.take();
let gen_rpc_module = |deny_unsafe: DenyUnsafe| {
gen_rpc_module(
deny_unsafe,
task_manager.spawn_handle(),
client.clone(),
transaction_pool.clone(),
keystore.clone(),
system_rpc_tx.clone(),
&config,
backend.clone(),
&*rpc_builder,
)
};
let rpc = start_rpc_servers(&config, gen_rpc_module, rpc_id_provider)?;
let rpc_handlers = RpcHandlers(Arc::new(gen_rpc_module(sc_rpc::DenyUnsafe::No)?.into()));
spawn_handle.spawn(
"informant",
None,
sc_informant::build(
client.clone(),
network,
sync_service.clone(),
config.informant_output_format,
),
);
task_manager.keep_alive((config.base_path, rpc));
Ok(rpc_handlers)
}
async fn transaction_notifications<Block, ExPool>(
transaction_pool: Arc<ExPool>,
tx_handler_controller: sc_network_transactions::TransactionsHandlerController<
<Block as BlockT>::Hash,
>,
telemetry: Option<TelemetryHandle>,
) where
Block: BlockT,
ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
{
transaction_pool
.import_notification_stream()
.for_each(move |hash| {
tx_handler_controller.propagate_transaction(hash);
let status = transaction_pool.status();
telemetry!(
telemetry;
SUBSTRATE_INFO;
"txpool.import";
"ready" => status.ready,
"future" => status.future,
);
ready(())
})
.await;
}
fn init_telemetry<Block, Client, Network>(
config: &mut Configuration,
network: Network,
client: Arc<Client>,
telemetry: &mut Telemetry,
sysinfo: Option<sc_telemetry::SysInfo>,
) -> sc_telemetry::Result<TelemetryHandle>
where
Block: BlockT,
Client: BlockBackend<Block>,
Network: NetworkStateInfo,
{
let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default();
let connection_message = ConnectionMessage {
name: config.network.node_name.to_owned(),
implementation: config.impl_name.to_owned(),
version: config.impl_version.to_owned(),
target_os: sc_sysinfo::TARGET_OS.into(),
target_arch: sc_sysinfo::TARGET_ARCH.into(),
target_env: sc_sysinfo::TARGET_ENV.into(),
config: String::new(),
chain: config.chain_spec.name().to_owned(),
genesis_hash: format!("{:?}", genesis_hash),
authority: config.role.is_authority(),
startup_time: SystemTime::UNIX_EPOCH
.elapsed()
.map(|dur| dur.as_millis())
.unwrap_or(0)
.to_string(),
network_id: network.local_peer_id().to_base58(),
sysinfo,
};
telemetry.start_telemetry(connection_message)?;
Ok(telemetry.handle())
}
fn gen_rpc_module<TBl, TBackend, TCl, TRpc, TExPool>(
deny_unsafe: DenyUnsafe,
spawn_handle: SpawnTaskHandle,
client: Arc<TCl>,
transaction_pool: Arc<TExPool>,
keystore: KeystorePtr,
system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
config: &Configuration,
backend: Arc<TBackend>,
rpc_builder: &(dyn Fn(DenyUnsafe, SubscriptionTaskExecutor) -> Result<RpcModule<TRpc>, Error>),
) -> Result<RpcModule<()>, Error>
where
TBl: BlockT,
TCl: ProvideRuntimeApi<TBl>
+ BlockchainEvents<TBl>
+ HeaderBackend<TBl>
+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
+ ExecutorProvider<TBl>
+ CallApiAt<TBl>
+ ProofProvider<TBl>
+ StorageProvider<TBl, TBackend>
+ BlockBackend<TBl>
+ Send
+ Sync
+ 'static,
TBackend: sc_client_api::backend::Backend<TBl> + 'static,
<TCl as ProvideRuntimeApi<TBl>>::Api: sp_session::SessionKeys<TBl> + sp_api::Metadata<TBl>,
TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TBl::Hash: Unpin,
TBl::Header: Unpin,
{
let system_info = sc_rpc::system::SystemInfo {
chain_name: config.chain_spec.name().into(),
impl_name: config.impl_name.clone(),
impl_version: config.impl_version.clone(),
properties: config.chain_spec.properties(),
chain_type: config.chain_spec.chain_type(),
};
let mut rpc_api = RpcModule::new(());
let task_executor = Arc::new(spawn_handle);
let (chain, state, child_state) = {
let chain = sc_rpc::chain::new_full(client.clone(), task_executor.clone()).into_rpc();
let (state, child_state) =
sc_rpc::state::new_full(client.clone(), task_executor.clone(), deny_unsafe);
let state = state.into_rpc();
let child_state = child_state.into_rpc();
(chain, state, child_state)
};
let transaction_v2 = sc_rpc_spec_v2::transaction::Transaction::new(
client.clone(),
transaction_pool.clone(),
task_executor.clone(),
)
.into_rpc();
let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new(
client.clone(),
backend.clone(),
task_executor.clone(),
client.info().genesis_hash,
sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(),
)
.into_rpc();
let author = sc_rpc::author::Author::new(
client.clone(),
transaction_pool,
keystore,
deny_unsafe,
task_executor.clone(),
)
.into_rpc();
let system = sc_rpc::system::System::new(system_info, system_rpc_tx, deny_unsafe).into_rpc();
if let Some(storage) = backend.offchain_storage() {
let offchain = sc_rpc::offchain::Offchain::new(storage, deny_unsafe).into_rpc();
rpc_api.merge(offchain).map_err(|e| Error::Application(e.into()))?;
}
rpc_api.merge(transaction_v2).map_err(|e| Error::Application(e.into()))?;
rpc_api.merge(chain_head_v2).map_err(|e| Error::Application(e.into()))?;
rpc_api.merge(chain).map_err(|e| Error::Application(e.into()))?;
rpc_api.merge(author).map_err(|e| Error::Application(e.into()))?;
rpc_api.merge(system).map_err(|e| Error::Application(e.into()))?;
rpc_api.merge(state).map_err(|e| Error::Application(e.into()))?;
rpc_api.merge(child_state).map_err(|e| Error::Application(e.into()))?;
let extra_rpcs = rpc_builder(deny_unsafe, task_executor.clone())?;
rpc_api.merge(extra_rpcs).map_err(|e| Error::Application(e.into()))?;
Ok(rpc_api)
}
pub struct BuildNetworkParams<'a, TBl: BlockT, TExPool, TImpQu, TCl> {
pub config: &'a Configuration,
pub net_config: FullNetworkConfiguration,
pub client: Arc<TCl>,
pub transaction_pool: Arc<TExPool>,
pub spawn_handle: SpawnTaskHandle,
pub import_queue: TImpQu,
pub block_announce_validator_builder:
Option<Box<dyn FnOnce(Arc<TCl>) -> Box<dyn BlockAnnounceValidator<TBl> + Send> + Send>>,
pub warp_sync_params: Option<WarpSyncParams<TBl>>,
}
pub fn build_network<TBl, TExPool, TImpQu, TCl>(
params: BuildNetworkParams<TBl, TExPool, TImpQu, TCl>,
) -> Result<
(
Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
TracingUnboundedSender<sc_rpc::system::Request<TBl>>,
sc_network_transactions::TransactionsHandlerController<<TBl as BlockT>::Hash>,
NetworkStarter,
Arc<SyncingService<TBl>>,
),
Error,
>
where
TBl: BlockT,
TCl: ProvideRuntimeApi<TBl>
+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
+ Chain<TBl>
+ BlockBackend<TBl>
+ BlockIdTo<TBl, Error = sp_blockchain::Error>
+ ProofProvider<TBl>
+ HeaderBackend<TBl>
+ BlockchainEvents<TBl>
+ 'static,
TExPool: TransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TImpQu: ImportQueue<TBl> + 'static,
{
let BuildNetworkParams {
config,
mut net_config,
client,
transaction_pool,
spawn_handle,
import_queue,
block_announce_validator_builder,
warp_sync_params,
} = params;
if warp_sync_params.is_none() && config.network.sync_mode.is_warp() {
return Err("Warp sync enabled, but no warp sync provider configured.".into())
}
if client.requires_full_sync() {
match config.network.sync_mode {
SyncMode::LightState { .. } =>
return Err("Fast sync doesn't work for archive nodes".into()),
SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()),
SyncMode::Full => {},
}
}
let protocol_id = config.protocol_id();
let block_announce_validator = if let Some(f) = block_announce_validator_builder {
f(client.clone())
} else {
Box::new(DefaultBlockAnnounceValidator)
};
let (block_request_protocol_config, block_request_protocol_name) = {
let (handler, protocol_config) = BlockRequestHandler::new(
&protocol_id,
config.chain_spec.fork_id(),
client.clone(),
net_config.network_config.default_peers_set.in_peers as usize +
net_config.network_config.default_peers_set.out_peers as usize,
);
let config_name = protocol_config.name.clone();
spawn_handle.spawn("block-request-handler", Some("networking"), handler.run());
(protocol_config, config_name)
};
let (state_request_protocol_config, state_request_protocol_name) = {
let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize +
net_config.network_config.default_peers_set.reserved_nodes.len();
let (handler, protocol_config) = StateRequestHandler::new(
&protocol_id,
config.chain_spec.fork_id(),
client.clone(),
num_peer_hint,
);
let config_name = protocol_config.name.clone();
spawn_handle.spawn("state-request-handler", Some("networking"), handler.run());
(protocol_config, config_name)
};
let (warp_sync_protocol_config, warp_request_protocol_name) = match warp_sync_params.as_ref() {
Some(WarpSyncParams::WithProvider(warp_with_provider)) => {
let (handler, protocol_config) = WarpSyncRequestHandler::new(
protocol_id.clone(),
client
.block_hash(0u32.into())
.ok()
.flatten()
.expect("Genesis block exists; qed"),
config.chain_spec.fork_id(),
warp_with_provider.clone(),
);
let config_name = protocol_config.name.clone();
spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run());
(Some(protocol_config), Some(config_name))
},
_ => (None, None),
};
let light_client_request_protocol_config = {
let (handler, protocol_config) = LightClientRequestHandler::new(
&protocol_id,
config.chain_spec.fork_id(),
client.clone(),
);
spawn_handle.spawn("light-client-request-handler", Some("networking"), handler.run());
protocol_config
};
net_config.add_request_response_protocol(block_request_protocol_config);
net_config.add_request_response_protocol(state_request_protocol_config);
net_config.add_request_response_protocol(light_client_request_protocol_config);
if let Some(config) = warp_sync_protocol_config {
net_config.add_request_response_protocol(config);
}
if config.network.ipfs_server {
let (handler, protocol_config) = BitswapRequestHandler::new(client.clone());
spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler.run());
net_config.add_request_response_protocol(protocol_config);
}
let transactions_handler_proto = sc_network_transactions::TransactionsHandlerPrototype::new(
protocol_id.clone(),
client
.block_hash(0u32.into())
.ok()
.flatten()
.expect("Genesis block exists; qed"),
config.chain_spec.fork_id(),
);
net_config.add_notification_protocol(transactions_handler_proto.set_config());
let peer_store = PeerStore::new(
net_config
.network_config
.boot_nodes
.iter()
.map(|bootnode| bootnode.peer_id)
.collect(),
);
let peer_store_handle = peer_store.handle();
spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());
let (tx, rx) = sc_utils::mpsc::tracing_unbounded("mpsc_syncing_engine_protocol", 100_000);
let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new();
let (engine, sync_service, block_announce_config) = SyncingEngine::new(
Roles::from(&config.role),
client.clone(),
config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(),
&net_config,
protocol_id.clone(),
&config.chain_spec.fork_id().map(ToOwned::to_owned),
block_announce_validator,
warp_sync_params,
chain_sync_network_handle,
import_queue.service(),
block_request_protocol_name,
state_request_protocol_name,
warp_request_protocol_name,
rx,
)?;
let sync_service_import_queue = sync_service.clone();
let sync_service = Arc::new(sync_service);
let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed");
let network_params = sc_network::config::Params::<TBl> {
role: config.role.clone(),
executor: {
let spawn_handle = Clone::clone(&spawn_handle);
Box::new(move |fut| {
spawn_handle.spawn("libp2p-node", Some("networking"), fut);
})
},
network_config: net_config,
peer_store: peer_store_handle,
genesis_hash,
protocol_id: protocol_id.clone(),
fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned),
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_announce_config,
tx,
};
let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty();
let network_mut = sc_network::NetworkWorker::new(network_params)?;
let network = network_mut.service().clone();
let (tx_handler, tx_handler_controller) = transactions_handler_proto.build(
network.clone(),
sync_service.clone(),
Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }),
config.prometheus_config.as_ref().map(|config| &config.registry),
)?;
spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run());
spawn_handle.spawn_blocking(
"chain-sync-network-service-provider",
Some("networking"),
chain_sync_network_provider.run(network.clone()),
);
spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(sync_service_import_queue)));
spawn_handle.spawn_blocking("syncing", None, engine.run());
let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000);
spawn_handle.spawn(
"system-rpc-handler",
Some("networking"),
build_system_rpc_future(
config.role.clone(),
network_mut.service().clone(),
sync_service.clone(),
client.clone(),
system_rpc_rx,
has_bootnodes,
),
);
let future =
build_network_future(network_mut, client, sync_service.clone(), config.announce_block);
let (network_start_tx, network_start_rx) = oneshot::channel();
spawn_handle.spawn_blocking("network-worker", Some("networking"), async move {
if network_start_rx.await.is_err() {
log::warn!(
"The NetworkStart returned as part of `build_network` has been silently dropped"
);
return
}
future.await
});
Ok((
network,
system_rpc_tx,
tx_handler_controller,
NetworkStarter(network_start_tx),
sync_service.clone(),
))
}
#[must_use]
pub struct NetworkStarter(oneshot::Sender<()>);
impl NetworkStarter {
pub fn new(sender: oneshot::Sender<()>) -> Self {
NetworkStarter(sender)
}
pub fn start_network(self) {
let _ = self.0.send(());
}
}