referrerpolicy=no-referrer-when-downgrade

cumulus_relay_chain_minimal_node/
collator_overseer.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use futures::{select, StreamExt};
19use std::sync::Arc;
20
21use polkadot_overseer::{
22	BlockInfo, Handle, Overseer, OverseerConnector, OverseerHandle, SpawnGlue, UnpinHandle,
23};
24use polkadot_service::overseer::{collator_overseer_builder, OverseerGenArgs};
25
26use sc_network::{request_responses::IncomingRequest, service::traits::NetworkService};
27use sc_service::TaskManager;
28use sc_utils::mpsc::tracing_unbounded;
29
30use cumulus_relay_chain_interface::RelayChainError;
31
32use crate::BlockChainRpcClient;
33
34fn build_overseer(
35	connector: OverseerConnector,
36	args: OverseerGenArgs<sc_service::SpawnTaskHandle, BlockChainRpcClient>,
37) -> Result<
38	(Overseer<SpawnGlue<sc_service::SpawnTaskHandle>, Arc<BlockChainRpcClient>>, OverseerHandle),
39	RelayChainError,
40> {
41	let builder =
42		collator_overseer_builder(args).map_err(|e| RelayChainError::Application(e.into()))?;
43
44	builder
45		.build_with_connector(connector)
46		.map_err(|e| RelayChainError::Application(e.into()))
47}
48
49pub(crate) fn spawn_overseer(
50	overseer_args: OverseerGenArgs<sc_service::SpawnTaskHandle, BlockChainRpcClient>,
51	task_manager: &TaskManager,
52	relay_chain_rpc_client: Arc<BlockChainRpcClient>,
53) -> Result<polkadot_overseer::Handle, RelayChainError> {
54	let (overseer, overseer_handle) = build_overseer(OverseerConnector::default(), overseer_args)
55		.map_err(|e| {
56		tracing::error!("Failed to initialize overseer: {}", e);
57		e
58	})?;
59
60	let overseer_handle = Handle::new(overseer_handle);
61	{
62		let handle = overseer_handle.clone();
63		task_manager.spawn_essential_handle().spawn_blocking(
64			"overseer",
65			None,
66			Box::pin(async move {
67				use futures::{pin_mut, FutureExt};
68
69				let forward = forward_collator_events(relay_chain_rpc_client, handle).fuse();
70
71				let overseer_fut = overseer.run().fuse();
72
73				pin_mut!(overseer_fut);
74				pin_mut!(forward);
75
76				select! {
77					_ = forward => (),
78					_ = overseer_fut => (),
79				}
80			}),
81		);
82	}
83	Ok(overseer_handle)
84}
85
86/// Minimal relay chain node representation
87pub struct NewMinimalNode {
88	/// Task manager running all tasks for the minimal node
89	pub task_manager: TaskManager,
90	/// Overseer handle to interact with subsystems
91	pub overseer_handle: Handle,
92	/// Network service
93	pub network_service: Arc<dyn NetworkService>,
94	/// Parachain bootnode request-response protocol receiver
95	pub paranode_rx: async_channel::Receiver<IncomingRequest>,
96}
97
98/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding
99/// import and finality notifications into the [`OverseerHandle`].
100async fn forward_collator_events(
101	client: Arc<BlockChainRpcClient>,
102	mut handle: Handle,
103) -> Result<(), RelayChainError> {
104	let mut finality = client.finality_notification_stream().await?.fuse();
105	let mut imports = client.import_notification_stream().await?.fuse();
106	// Collators do no need to pin any specific blocks
107	let (dummy_sink, _) = tracing_unbounded("does-not-matter", 42);
108	let dummy_unpin_handle = UnpinHandle::new(Default::default(), dummy_sink);
109
110	loop {
111		select! {
112			f = finality.next() => {
113				match f {
114					Some(header) => {
115						let hash = header.hash();
116						tracing::info!(
117							target: "minimal-polkadot-node",
118							"Received finalized block via RPC: #{} ({} -> {})",
119							header.number,
120							header.parent_hash,
121							hash,
122						);
123						let unpin_handle = dummy_unpin_handle.clone();
124						let block_info = BlockInfo { hash, parent_hash: header.parent_hash, number: header.number, unpin_handle };
125						handle.block_finalized(block_info).await;
126					}
127					None => return Err(RelayChainError::GenericError("Relay chain finality stream ended.".to_string())),
128				}
129			},
130			i = imports.next() => {
131				match i {
132					Some(header) => {
133						let hash = header.hash();
134						tracing::info!(
135							target: "minimal-polkadot-node",
136							"Received imported block via RPC: #{} ({} -> {})",
137							header.number,
138							header.parent_hash,
139							hash,
140						);
141						let unpin_handle = dummy_unpin_handle.clone();
142						let block_info = BlockInfo { hash, parent_hash: header.parent_hash, number: header.number, unpin_handle };
143						handle.block_imported(block_info).await;
144					}
145					None => return Err(RelayChainError::GenericError("Relay chain import stream ended.".to_string())),
146				}
147			}
148		}
149	}
150}