referrerpolicy=no-referrer-when-downgrade

cumulus_client_collator/
service.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! The Cumulus [`CollatorService`] is a utility struct for performing common
19//! operations used in parachain consensus/authoring.
20
21use cumulus_client_network::WaitToAnnounce;
22use cumulus_primitives_core::{CollationInfo, CollectCollationInfo, ParachainBlockData};
23
24use sc_client_api::BlockBackend;
25use sp_api::{ApiExt, ProvideRuntimeApi};
26use sp_consensus::BlockStatus;
27use sp_core::traits::SpawnNamed;
28use sp_runtime::traits::{Block as BlockT, HashingFor, Header as HeaderT, Zero};
29
30use cumulus_client_consensus_common::ParachainCandidate;
31use polkadot_node_primitives::{
32	BlockData, Collation, CollationSecondedSignal, MaybeCompressedPoV, PoV,
33};
34
35use codec::Encode;
36use futures::channel::oneshot;
37use parking_lot::Mutex;
38use std::sync::Arc;
39
40/// The logging target.
41const LOG_TARGET: &str = "cumulus-collator";
42
43/// Utility functions generally applicable to writing collators for Cumulus.
44pub trait ServiceInterface<Block: BlockT> {
45	/// Checks the status of the given block hash in the Parachain.
46	///
47	/// Returns `true` if the block could be found and is good to be build on.
48	fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool;
49
50	/// Build a full [`Collation`] from a given [`ParachainCandidate`]. This requires
51	/// that the underlying block has been fully imported into the underlying client,
52	/// as implementations will fetch underlying runtime API data.
53	///
54	/// This also returns the unencoded parachain block data, in case that is desired.
55	fn build_collation(
56		&self,
57		parent_header: &Block::Header,
58		block_hash: Block::Hash,
59		candidate: ParachainCandidate<Block>,
60	) -> Option<(Collation, ParachainBlockData<Block>)>;
61
62	/// Inform networking systems that the block should be announced after a signal has
63	/// been received to indicate the block has been seconded by a relay-chain validator.
64	///
65	/// This sets up the barrier and returns the sending side of a channel, for the signal
66	/// to be passed through.
67	fn announce_with_barrier(
68		&self,
69		block_hash: Block::Hash,
70	) -> oneshot::Sender<CollationSecondedSignal>;
71
72	/// Directly announce a block on the network.
73	fn announce_block(&self, block_hash: Block::Hash, data: Option<Vec<u8>>);
74}
75
76/// The [`CollatorService`] provides common utilities for parachain consensus and authoring.
77///
78/// This includes logic for checking the block status of arbitrary parachain headers
79/// gathered from the relay chain state, creating full [`Collation`]s to be shared with validators,
80/// and distributing new parachain blocks along the network.
81pub struct CollatorService<Block: BlockT, BS, RA> {
82	block_status: Arc<BS>,
83	wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
84	announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
85	runtime_api: Arc<RA>,
86}
87
88impl<Block: BlockT, BS, RA> Clone for CollatorService<Block, BS, RA> {
89	fn clone(&self) -> Self {
90		Self {
91			block_status: self.block_status.clone(),
92			wait_to_announce: self.wait_to_announce.clone(),
93			announce_block: self.announce_block.clone(),
94			runtime_api: self.runtime_api.clone(),
95		}
96	}
97}
98
99impl<Block, BS, RA> CollatorService<Block, BS, RA>
100where
101	Block: BlockT,
102	BS: BlockBackend<Block>,
103	RA: ProvideRuntimeApi<Block>,
104	RA::Api: CollectCollationInfo<Block>,
105{
106	/// Create a new instance.
107	pub fn new(
108		block_status: Arc<BS>,
109		spawner: Arc<dyn SpawnNamed + Send + Sync>,
110		announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
111		runtime_api: Arc<RA>,
112	) -> Self {
113		let wait_to_announce =
114			Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block.clone())));
115
116		Self { block_status, wait_to_announce, announce_block, runtime_api }
117	}
118
119	/// Checks the status of the given block hash in the Parachain.
120	///
121	/// Returns `true` if the block could be found and is good to be build on.
122	pub fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
123		match self.block_status.block_status(hash) {
124			Ok(BlockStatus::Queued) => {
125				tracing::debug!(
126					target: LOG_TARGET,
127					block_hash = ?hash,
128					"Skipping candidate production, because block is still queued for import.",
129				);
130				false
131			},
132			Ok(BlockStatus::InChainWithState) => true,
133			Ok(BlockStatus::InChainPruned) => {
134				tracing::error!(
135					target: LOG_TARGET,
136					"Skipping candidate production, because block `{:?}` is already pruned!",
137					hash,
138				);
139				false
140			},
141			Ok(BlockStatus::KnownBad) => {
142				tracing::error!(
143					target: LOG_TARGET,
144					block_hash = ?hash,
145					"Block is tagged as known bad and is included in the relay chain! Skipping candidate production!",
146				);
147				false
148			},
149			Ok(BlockStatus::Unknown) => {
150				if header.number().is_zero() {
151					tracing::error!(
152						target: LOG_TARGET,
153						block_hash = ?hash,
154						"Could not find the header of the genesis block in the database!",
155					);
156				} else {
157					tracing::debug!(
158						target: LOG_TARGET,
159						block_hash = ?hash,
160						"Skipping candidate production, because block is unknown.",
161					);
162				}
163				false
164			},
165			Err(e) => {
166				tracing::error!(
167					target: LOG_TARGET,
168					block_hash = ?hash,
169					error = ?e,
170					"Failed to get block status.",
171				);
172				false
173			},
174		}
175	}
176
177	/// Fetch the collation info from the runtime.
178	///
179	/// Returns `Ok(Some((CollationInfo, ApiVersion)))` on success, `Err(_)` on error or `Ok(None)`
180	/// if the runtime api isn't implemented by the runtime. `ApiVersion` being the version of the
181	/// [`CollectCollationInfo`] runtime api.
182	pub fn fetch_collation_info(
183		&self,
184		block_hash: Block::Hash,
185		header: &Block::Header,
186	) -> Result<Option<(CollationInfo, u32)>, sp_api::ApiError> {
187		let runtime_api = self.runtime_api.runtime_api();
188
189		let api_version =
190			match runtime_api.api_version::<dyn CollectCollationInfo<Block>>(block_hash)? {
191				Some(version) => version,
192				None => {
193					tracing::error!(
194						target: LOG_TARGET,
195						"Could not fetch `CollectCollationInfo` runtime api version."
196					);
197					return Ok(None)
198				},
199			};
200
201		let collation_info = if api_version < 2 {
202			#[allow(deprecated)]
203			runtime_api
204				.collect_collation_info_before_version_2(block_hash)?
205				.into_latest(header.encode().into())
206		} else {
207			runtime_api.collect_collation_info(block_hash, header)?
208		};
209
210		Ok(Some((collation_info, api_version)))
211	}
212
213	/// Build a full [`Collation`] from a given [`ParachainCandidate`]. This requires
214	/// that the underlying block has been fully imported into the underlying client,
215	/// as it fetches underlying runtime API data.
216	///
217	/// This also returns the unencoded parachain block data, in case that is desired.
218	pub fn build_collation(
219		&self,
220		parent_header: &Block::Header,
221		block_hash: Block::Hash,
222		candidate: ParachainCandidate<Block>,
223	) -> Option<(Collation, ParachainBlockData<Block>)> {
224		let block = candidate.block;
225
226		let compact_proof = match candidate
227			.proof
228			.into_compact_proof::<HashingFor<Block>>(*parent_header.state_root())
229		{
230			Ok(proof) => proof,
231			Err(e) => {
232				tracing::error!(target: "cumulus-collator", "Failed to compact proof: {:?}", e);
233				return None
234			},
235		};
236
237		// Create the parachain block data for the validators.
238		let (collation_info, _api_version) = self
239			.fetch_collation_info(block_hash, block.header())
240			.map_err(|e| {
241				tracing::error!(
242					target: LOG_TARGET,
243					error = ?e,
244					"Failed to collect collation info.",
245				)
246			})
247			.ok()
248			.flatten()?;
249
250		// Workaround for: https://github.com/paritytech/polkadot-sdk/issues/64
251		//
252		// We are always using the `api_version` of the parent block. The `api_version` can only
253		// change with a runtime upgrade and this is when we want to observe the old `api_version`.
254		// Because this old `api_version` is the one used to validate this block. Otherwise we
255		// already assume the `api_version` is higher than what the relay chain will use and this
256		// will lead to validation errors.
257		let api_version = self
258			.runtime_api
259			.runtime_api()
260			.api_version::<dyn CollectCollationInfo<Block>>(parent_header.hash())
261			.ok()
262			.flatten()?;
263
264		let block_data = ParachainBlockData::<Block>::new(vec![block], compact_proof);
265
266		let pov = polkadot_node_primitives::maybe_compress_pov(PoV {
267			block_data: BlockData(if api_version >= 3 {
268				block_data.encode()
269			} else {
270				let block_data = block_data.as_v0();
271
272				if block_data.is_none() {
273					tracing::error!(
274						target: LOG_TARGET,
275						"Trying to submit a collation with multiple blocks is not supported by the current runtime."
276					);
277				}
278
279				block_data?.encode()
280			}),
281		});
282
283		let upward_messages = collation_info
284			.upward_messages
285			.try_into()
286			.map_err(|e| {
287				tracing::error!(
288					target: LOG_TARGET,
289					error = ?e,
290					"Number of upward messages should not be greater than `MAX_UPWARD_MESSAGE_NUM`",
291				)
292			})
293			.ok()?;
294		let horizontal_messages = collation_info
295			.horizontal_messages
296			.try_into()
297			.map_err(|e| {
298				tracing::error!(
299					target: LOG_TARGET,
300					error = ?e,
301					"Number of horizontal messages should not be greater than `MAX_HORIZONTAL_MESSAGE_NUM`",
302				)
303			})
304			.ok()?;
305
306		let collation = Collation {
307			upward_messages,
308			new_validation_code: collation_info.new_validation_code,
309			processed_downward_messages: collation_info.processed_downward_messages,
310			horizontal_messages,
311			hrmp_watermark: collation_info.hrmp_watermark,
312			head_data: collation_info.head_data,
313			proof_of_validity: MaybeCompressedPoV::Compressed(pov),
314		};
315
316		Some((collation, block_data))
317	}
318
319	/// Inform the networking systems that the block should be announced after an appropriate
320	/// signal has been received. This returns the sending half of the signal.
321	pub fn announce_with_barrier(
322		&self,
323		block_hash: Block::Hash,
324	) -> oneshot::Sender<CollationSecondedSignal> {
325		let (result_sender, signed_stmt_recv) = oneshot::channel();
326		self.wait_to_announce.lock().wait_to_announce(block_hash, signed_stmt_recv);
327		result_sender
328	}
329}
330
331impl<Block, BS, RA> ServiceInterface<Block> for CollatorService<Block, BS, RA>
332where
333	Block: BlockT,
334	BS: BlockBackend<Block>,
335	RA: ProvideRuntimeApi<Block>,
336	RA::Api: CollectCollationInfo<Block>,
337{
338	fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
339		CollatorService::check_block_status(self, hash, header)
340	}
341
342	fn build_collation(
343		&self,
344		parent_header: &Block::Header,
345		block_hash: Block::Hash,
346		candidate: ParachainCandidate<Block>,
347	) -> Option<(Collation, ParachainBlockData<Block>)> {
348		CollatorService::build_collation(self, parent_header, block_hash, candidate)
349	}
350
351	fn announce_with_barrier(
352		&self,
353		block_hash: Block::Hash,
354	) -> oneshot::Sender<CollationSecondedSignal> {
355		CollatorService::announce_with_barrier(self, block_hash)
356	}
357
358	fn announce_block(&self, block_hash: Block::Hash, data: Option<Vec<u8>>) {
359		(self.announce_block)(block_hash, data)
360	}
361}