referrerpolicy=no-referrer-when-downgrade

cumulus_relay_chain_minimal_node/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use collator_overseer::NewMinimalNode;
19
20use cumulus_client_bootnodes::bootnode_request_response_config;
21use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
22use cumulus_relay_chain_rpc_interface::{RelayChainRpcClient, RelayChainRpcInterface, Url};
23use network::build_collator_network;
24use polkadot_network_bridge::{peer_sets_info, IsAuthority};
25use polkadot_node_network_protocol::{
26	peer_set::{PeerSet, PeerSetProtocolNames},
27	request_response::{
28		v1, v2, IncomingRequest, IncomingRequestReceiver, Protocol, ReqProtocolNames,
29	},
30};
31
32use polkadot_core_primitives::{Block as RelayBlock, Hash as RelayHash};
33use polkadot_node_subsystem_util::metrics::prometheus::Registry;
34use polkadot_primitives::CollatorPair;
35use polkadot_service::{overseer::OverseerGenArgs, IsParachainNode};
36
37use sc_authority_discovery::Service as AuthorityDiscoveryService;
38use sc_network::{
39	config::FullNetworkConfiguration, request_responses::IncomingRequest as GenericIncomingRequest,
40	service::traits::NetworkService, Event, NetworkBackend, NetworkEventStream,
41};
42use sc_service::{config::PrometheusConfig, Configuration, TaskManager};
43use sp_runtime::{app_crypto::Pair, traits::Block as BlockT};
44
45use futures::{FutureExt, StreamExt};
46use std::sync::Arc;
47
48mod blockchain_rpc_client;
49mod collator_overseer;
50mod network;
51
52pub use blockchain_rpc_client::BlockChainRpcClient;
53
54const LOG_TARGET: &str = "minimal-relaychain-node";
55
56fn build_authority_discovery_service<Block: BlockT>(
57	task_manager: &TaskManager,
58	client: Arc<BlockChainRpcClient>,
59	config: &Configuration,
60	network: Arc<dyn NetworkService>,
61	prometheus_registry: Option<Registry>,
62) -> AuthorityDiscoveryService {
63	let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
64	let auth_disc_public_addresses = config.network.public_addresses.clone();
65	let authority_discovery_role = sc_authority_discovery::Role::Discover;
66	let dht_event_stream = network.event_stream("authority-discovery").filter_map(|e| async move {
67		match e {
68			Event::Dht(e) => Some(e),
69			_ => None,
70		}
71	});
72	let net_config_path = config.network.net_config_path.clone();
73	let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config(
74		sc_authority_discovery::WorkerConfig {
75			publish_non_global_ips: auth_disc_publish_non_global_ips,
76			public_addresses: auth_disc_public_addresses,
77			// Require that authority discovery records are signed.
78			strict_record_validation: true,
79			persisted_cache_directory: net_config_path,
80			..Default::default()
81		},
82		client,
83		Arc::new(network.clone()),
84		Box::pin(dht_event_stream),
85		authority_discovery_role,
86		prometheus_registry,
87		task_manager.spawn_handle(),
88	);
89
90	task_manager.spawn_handle().spawn(
91		"authority-discovery-worker",
92		Some("authority-discovery"),
93		worker.run(),
94	);
95	service
96}
97
98async fn build_interface(
99	polkadot_config: Configuration,
100	task_manager: &mut TaskManager,
101	client: RelayChainRpcClient,
102) -> RelayChainResult<(
103	Arc<(dyn RelayChainInterface + 'static)>,
104	Option<CollatorPair>,
105	Arc<dyn NetworkService>,
106	async_channel::Receiver<GenericIncomingRequest>,
107)> {
108	let collator_pair = CollatorPair::generate().0;
109	let blockchain_rpc_client = Arc::new(BlockChainRpcClient::new(client.clone()));
110	let collator_node = match polkadot_config.network.network_backend {
111		sc_network::config::NetworkBackendType::Libp2p =>
112			new_minimal_relay_chain::<RelayBlock, sc_network::NetworkWorker<RelayBlock, RelayHash>>(
113				polkadot_config,
114				collator_pair.clone(),
115				blockchain_rpc_client,
116			)
117			.await?,
118		sc_network::config::NetworkBackendType::Litep2p =>
119			new_minimal_relay_chain::<RelayBlock, sc_network::Litep2pNetworkBackend>(
120				polkadot_config,
121				collator_pair.clone(),
122				blockchain_rpc_client,
123			)
124			.await?,
125	};
126	task_manager.add_child(collator_node.task_manager);
127	Ok((
128		Arc::new(RelayChainRpcInterface::new(client, collator_node.overseer_handle)),
129		Some(collator_pair),
130		collator_node.network_service,
131		collator_node.paranode_rx,
132	))
133}
134
135pub async fn build_minimal_relay_chain_node_with_rpc(
136	relay_chain_config: Configuration,
137	parachain_prometheus_registry: Option<&Registry>,
138	task_manager: &mut TaskManager,
139	relay_chain_url: Vec<Url>,
140) -> RelayChainResult<(
141	Arc<(dyn RelayChainInterface + 'static)>,
142	Option<CollatorPair>,
143	Arc<dyn NetworkService>,
144	async_channel::Receiver<GenericIncomingRequest>,
145)> {
146	let client = cumulus_relay_chain_rpc_interface::create_client_and_start_worker(
147		relay_chain_url,
148		task_manager,
149		parachain_prometheus_registry,
150	)
151	.await?;
152
153	build_interface(relay_chain_config, task_manager, client).await
154}
155
156/// Builds a minimal relay chain node. Chain data is fetched
157/// via [`BlockChainRpcClient`] and fed into the overseer and its subsystems.
158///
159/// Instead of spawning all subsystems, this minimal node will only spawn subsystems
160/// required to collate:
161/// - AvailabilityRecovery
162/// - CollationGeneration
163/// - CollatorProtocol
164/// - NetworkBridgeRx
165/// - NetworkBridgeTx
166/// - RuntimeApi
167#[sc_tracing::logging::prefix_logs_with("Relaychain")]
168async fn new_minimal_relay_chain<Block: BlockT, Network: NetworkBackend<RelayBlock, RelayHash>>(
169	config: Configuration,
170	collator_pair: CollatorPair,
171	relay_chain_rpc_client: Arc<BlockChainRpcClient>,
172) -> Result<NewMinimalNode, RelayChainError> {
173	let role = config.role;
174	let mut net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(
175		&config.network,
176		config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()),
177	);
178	let metrics = Network::register_notification_metrics(
179		config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
180	);
181	let peer_store_handle = net_config.peer_store_handle();
182
183	let prometheus_registry = config.prometheus_registry();
184	let task_manager = TaskManager::new(config.tokio_handle.clone(), prometheus_registry)?;
185
186	if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() {
187		task_manager.spawn_handle().spawn(
188			"prometheus-endpoint",
189			None,
190			prometheus_endpoint::init_prometheus(port, registry).map(drop),
191		);
192	}
193
194	let genesis_hash = relay_chain_rpc_client.block_get_hash(Some(0)).await?.unwrap_or_default();
195	let peerset_protocol_names =
196		PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
197	let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No };
198	let notification_services = peer_sets_info::<_, Network>(
199		is_authority,
200		&peerset_protocol_names,
201		metrics.clone(),
202		Arc::clone(&peer_store_handle),
203	)
204	.into_iter()
205	.map(|(config, (peerset, service))| {
206		net_config.add_notification_protocol(config);
207		(peerset, service)
208	})
209	.collect::<std::collections::HashMap<PeerSet, Box<dyn sc_network::NotificationService>>>();
210
211	let request_protocol_names = ReqProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
212	let (collation_req_v1_receiver, collation_req_v2_receiver, available_data_req_receiver) =
213		build_request_response_protocol_receivers(&request_protocol_names, &mut net_config);
214
215	let (cfg, paranode_rx) = bootnode_request_response_config::<_, _, Network>(
216		genesis_hash,
217		config.chain_spec.fork_id(),
218	);
219	net_config.add_request_response_protocol(cfg);
220
221	let best_header = relay_chain_rpc_client
222		.chain_get_header(None)
223		.await?
224		.ok_or_else(|| RelayChainError::RpcCallError("Unable to fetch best header".to_string()))?;
225	let (network, sync_service) = build_collator_network::<Network>(
226		&config,
227		net_config,
228		task_manager.spawn_handle(),
229		genesis_hash,
230		best_header,
231		metrics,
232	)
233	.map_err(|e| RelayChainError::Application(Box::new(e)))?;
234
235	let authority_discovery_service = build_authority_discovery_service::<Block>(
236		&task_manager,
237		relay_chain_rpc_client.clone(),
238		&config,
239		network.clone(),
240		prometheus_registry.cloned(),
241	);
242
243	let overseer_args = OverseerGenArgs {
244		runtime_client: relay_chain_rpc_client.clone(),
245		network_service: network.clone(),
246		sync_service,
247		authority_discovery_service,
248		collation_req_v1_receiver,
249		collation_req_v2_receiver,
250		available_data_req_receiver,
251		registry: prometheus_registry,
252		spawner: task_manager.spawn_handle(),
253		is_parachain_node: IsParachainNode::Collator(collator_pair),
254		overseer_message_channel_capacity_override: None,
255		req_protocol_names: request_protocol_names,
256		peerset_protocol_names,
257		notification_services,
258	};
259
260	let overseer_handle =
261		collator_overseer::spawn_overseer(overseer_args, &task_manager, relay_chain_rpc_client)?;
262
263	Ok(NewMinimalNode { task_manager, overseer_handle, network_service: network, paranode_rx })
264}
265
266fn build_request_response_protocol_receivers<
267	Block: BlockT,
268	Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
269>(
270	request_protocol_names: &ReqProtocolNames,
271	config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Network>,
272) -> (
273	IncomingRequestReceiver<v1::CollationFetchingRequest>,
274	IncomingRequestReceiver<v2::CollationFetchingRequest>,
275	IncomingRequestReceiver<v1::AvailableDataFetchingRequest>,
276) {
277	let (collation_req_v1_receiver, cfg) =
278		IncomingRequest::get_config_receiver::<_, Network>(request_protocol_names);
279	config.add_request_response_protocol(cfg);
280	let (collation_req_v2_receiver, cfg) =
281		IncomingRequest::get_config_receiver::<_, Network>(request_protocol_names);
282	config.add_request_response_protocol(cfg);
283	let (available_data_req_receiver, cfg) =
284		IncomingRequest::get_config_receiver::<_, Network>(request_protocol_names);
285	config.add_request_response_protocol(cfg);
286	let cfg =
287		Protocol::ChunkFetchingV1.get_outbound_only_config::<_, Network>(request_protocol_names);
288	config.add_request_response_protocol(cfg);
289	let cfg =
290		Protocol::ChunkFetchingV2.get_outbound_only_config::<_, Network>(request_protocol_names);
291	config.add_request_response_protocol(cfg);
292	(collation_req_v1_receiver, collation_req_v2_receiver, available_data_req_receiver)
293}