referrerpolicy=no-referrer-when-downgrade

cumulus_client_collator/
service.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! The Cumulus [`CollatorService`] is a utility struct for performing common
19//! operations used in parachain consensus/authoring.
20
21use cumulus_client_network::WaitToAnnounce;
22use cumulus_primitives_core::{
23	CollationInfo, CollectCollationInfo, ParachainBlockData, SchedulingProof,
24};
25
26use polkadot_primitives::UMP_SEPARATOR;
27use sc_client_api::BlockBackend;
28use sp_api::{ApiExt, ProvideRuntimeApi, StorageProof};
29use sp_consensus::BlockStatus;
30use sp_core::traits::SpawnNamed;
31use sp_runtime::traits::{Block as BlockT, HashingFor, Header as HeaderT, Zero};
32
33use cumulus_client_consensus_common::ParachainCandidate;
34use polkadot_node_primitives::{
35	BlockData, Collation, CollationSecondedSignal, MaybeCompressedPoV, PoV,
36};
37
38use codec::Encode;
39use futures::channel::oneshot;
40use parking_lot::Mutex;
41use std::sync::Arc;
42/// The logging target.
43const LOG_TARGET: &str = "cumulus-collator";
44
45/// Utility functions generally applicable to writing collators for Cumulus.
46pub trait ServiceInterface<Block: BlockT> {
47	/// Checks the status of the given block hash in the Parachain.
48	///
49	/// Returns `true` if the block could be found and is good to be build on.
50	fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool;
51
52	/// Build a full [`Collation`] from a given [`ParachainCandidate`]. This requires
53	/// that the underlying block has been fully imported into the underlying client,
54	/// as implementations will fetch underlying runtime API data.
55	///
56	/// `scheduling_proof` is `Some` for V3 candidates (produces [`ParachainBlockData::V2`])
57	/// and `None` for legacy candidates (produces [`ParachainBlockData::V1`]).
58	///
59	/// This also returns the unencoded parachain block data, in case that is desired.
60	fn build_collation(
61		&self,
62		parent_header: &Block::Header,
63		block_hash: Block::Hash,
64		candidate: ParachainCandidate<Block>,
65		scheduling_proof: Option<SchedulingProof>,
66	) -> Option<(Collation, ParachainBlockData<Block>)>;
67
68	/// Build a multi-block collation.
69	///
70	/// Does the same as [`Self::build_collation`], but includes multiple blocks into one collation.
71	/// The given `parent_header` should be the header from the parent of the first block.
72	///
73	/// `scheduling_proof` is `Some` for V3 candidates (produces [`ParachainBlockData::V2`])
74	/// and `None` for legacy candidates (produces [`ParachainBlockData::V1`]).
75	fn build_multi_block_collation(
76		&self,
77		parent_header: &Block::Header,
78		blocks: Vec<Block>,
79		proof: StorageProof,
80		scheduling_proof: Option<SchedulingProof>,
81	) -> Option<(Collation, ParachainBlockData<Block>)>;
82
83	/// Inform networking systems that the block should be announced after a signal has
84	/// been received to indicate the block has been seconded by a relay-chain validator.
85	///
86	/// This sets up the barrier and returns the sending side of a channel, for the signal
87	/// to be passed through.
88	fn announce_with_barrier(
89		&self,
90		block_hash: Block::Hash,
91	) -> oneshot::Sender<CollationSecondedSignal>;
92
93	/// Directly announce a block on the network.
94	fn announce_block(&self, block_hash: Block::Hash, data: Option<Vec<u8>>);
95}
96
97/// The [`CollatorService`] provides common utilities for parachain consensus and authoring.
98///
99/// This includes logic for checking the block status of arbitrary parachain headers
100/// gathered from the relay chain state, creating full [`Collation`]s to be shared with validators,
101/// and distributing new parachain blocks along the network.
102pub struct CollatorService<Block: BlockT, BS, RA> {
103	block_status: Arc<BS>,
104	wait_to_announce: Arc<Mutex<WaitToAnnounce<Block>>>,
105	announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
106	runtime_api: Arc<RA>,
107}
108
109impl<Block: BlockT, BS, RA> Clone for CollatorService<Block, BS, RA> {
110	fn clone(&self) -> Self {
111		Self {
112			block_status: self.block_status.clone(),
113			wait_to_announce: self.wait_to_announce.clone(),
114			announce_block: self.announce_block.clone(),
115			runtime_api: self.runtime_api.clone(),
116		}
117	}
118}
119
120impl<Block, BS, RA> CollatorService<Block, BS, RA>
121where
122	Block: BlockT,
123	BS: BlockBackend<Block>,
124	RA: ProvideRuntimeApi<Block>,
125	RA::Api: CollectCollationInfo<Block>,
126{
127	fn split_at_separator(messages: Vec<Vec<u8>>) -> (Vec<Vec<u8>>, Vec<Vec<u8>>) {
128		let mut parts = messages.splitn(2, |m: &Vec<u8>| m.is_empty());
129		(parts.next().unwrap_or(&[]).to_vec(), parts.next().unwrap_or(&[]).to_vec())
130	}
131
132	/// Create a new instance.
133	pub fn new(
134		block_status: Arc<BS>,
135		spawner: Arc<dyn SpawnNamed + Send + Sync>,
136		announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
137		runtime_api: Arc<RA>,
138	) -> Self {
139		let wait_to_announce =
140			Arc::new(Mutex::new(WaitToAnnounce::new(spawner, announce_block.clone())));
141
142		Self { block_status, wait_to_announce, announce_block, runtime_api }
143	}
144
145	/// Checks the status of the given block hash in the Parachain.
146	///
147	/// Returns `true` if the block could be found and is good to be build on.
148	pub fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
149		match self.block_status.block_status(hash) {
150			Ok(BlockStatus::Queued) => {
151				tracing::debug!(
152					target: LOG_TARGET,
153					block_hash = ?hash,
154					"Skipping candidate production, because block is still queued for import.",
155				);
156				false
157			},
158			Ok(BlockStatus::InChainWithState) => true,
159			Ok(BlockStatus::InChainPruned) => {
160				tracing::error!(
161					target: LOG_TARGET,
162					"Skipping candidate production, because block `{:?}` is already pruned!",
163					hash,
164				);
165				false
166			},
167			Ok(BlockStatus::KnownBad) => {
168				tracing::error!(
169					target: LOG_TARGET,
170					block_hash = ?hash,
171					"Block is tagged as known bad and is included in the relay chain! Skipping candidate production!",
172				);
173				false
174			},
175			Ok(BlockStatus::Unknown) => {
176				if header.number().is_zero() {
177					tracing::error!(
178						target: LOG_TARGET,
179						block_hash = ?hash,
180						"Could not find the header of the genesis block in the database!",
181					);
182				} else {
183					tracing::debug!(
184						target: LOG_TARGET,
185						block_hash = ?hash,
186						"Skipping candidate production, because block is unknown.",
187					);
188				}
189				false
190			},
191			Err(e) => {
192				tracing::error!(
193					target: LOG_TARGET,
194					block_hash = ?hash,
195					error = ?e,
196					"Failed to get block status.",
197				);
198				false
199			},
200		}
201	}
202
203	/// Fetch the collation info from the runtime.
204	///
205	/// Returns `Ok(Some((CollationInfo, ApiVersion)))` on success, `Err(_)` on error or `Ok(None)`
206	/// if the runtime api isn't implemented by the runtime. `ApiVersion` being the version of the
207	/// [`CollectCollationInfo`] runtime api.
208	pub fn fetch_collation_info(
209		&self,
210		block_hash: Block::Hash,
211		header: &Block::Header,
212	) -> Result<Option<(CollationInfo, u32)>, sp_api::ApiError> {
213		let runtime_api = self.runtime_api.runtime_api();
214
215		let api_version =
216			match runtime_api.api_version::<dyn CollectCollationInfo<Block>>(block_hash)? {
217				Some(version) => version,
218				None => {
219					tracing::error!(
220						target: LOG_TARGET,
221						"Could not fetch `CollectCollationInfo` runtime api version."
222					);
223					return Ok(None);
224				},
225			};
226
227		let collation_info = if api_version < 2 {
228			#[allow(deprecated)]
229			runtime_api
230				.collect_collation_info_before_version_2(block_hash)?
231				.into_latest(header.encode().into())
232		} else {
233			runtime_api.collect_collation_info(block_hash, header)?
234		};
235
236		Ok(Some((collation_info, api_version)))
237	}
238
239	/// Build a full [`Collation`] from a given [`ParachainCandidate`]. This requires
240	/// that the underlying block has been fully imported into the underlying client,
241	/// as it fetches underlying runtime API data.
242	///
243	/// This also returns the unencoded parachain block data, in case that is desired.
244	fn build_multi_block_collation(
245		&self,
246		parent_header: &Block::Header,
247		blocks: Vec<Block>,
248		proof: StorageProof,
249		scheduling_proof: Option<SchedulingProof>,
250	) -> Option<(Collation, ParachainBlockData<Block>)> {
251		let compact_proof =
252			match proof.into_compact_proof::<HashingFor<Block>>(*parent_header.state_root()) {
253				Ok(proof) => proof,
254				Err(e) => {
255					tracing::error!(target: "cumulus-collator", "Failed to compact proof: {:?}", e);
256					return None;
257				},
258			};
259
260		// We are always using the `api_version` of the parent block. The `api_version` can only
261		// change with a runtime upgrade and this is when we want to observe the old
262		// `api_version`. Because this old `api_version` is the one used to validate this
263		// block. Otherwise, we already assume the `api_version` is higher than what the relay
264		// chain will use and this will lead to validation errors.
265		let api_version = self
266			.runtime_api
267			.runtime_api()
268			.api_version::<dyn CollectCollationInfo<Block>>(parent_header.hash())
269			.ok()
270			.flatten()?;
271		let mut upward_messages = Vec::new();
272		let mut upward_message_signals = Vec::<Vec<u8>>::with_capacity(4);
273		let mut horizontal_messages = Vec::new();
274		let mut new_validation_code = None;
275		let mut processed_downward_messages = 0;
276		let mut hrmp_watermark = None;
277		let mut head_data = None;
278
279		for block in &blocks {
280			// Create the parachain block data for the validators.
281			let (collation_info, _api_version) = self
282				.fetch_collation_info(block.hash(), block.header())
283				.map_err(|e| {
284					tracing::error!(
285						target: LOG_TARGET,
286						error = ?e,
287						"Failed to collect collation info.",
288					)
289				})
290				.ok()
291				.flatten()?;
292
293			let (messages, signals) = Self::split_at_separator(collation_info.upward_messages);
294
295			upward_messages.extend(messages);
296			upward_message_signals.extend(signals);
297			horizontal_messages.extend(collation_info.horizontal_messages);
298
299			if let Some(new_code) = collation_info.new_validation_code {
300				if new_validation_code.replace(new_code).is_some() {
301					tracing::warn!(
302						target: LOG_TARGET,
303						block = ?block.hash(),
304						"Overwriting validation code from an earlier block in the bundle.",
305					);
306				}
307			}
308			processed_downward_messages += collation_info.processed_downward_messages;
309			hrmp_watermark = Some(collation_info.hrmp_watermark);
310			head_data = Some(collation_info.head_data);
311		}
312
313		// Sort by recipient as required by the relay chain rules.
314		horizontal_messages.sort_by(|a, b| a.recipient.cmp(&b.recipient));
315
316		let block_data = ParachainBlockData::<Block>::new(blocks, compact_proof, scheduling_proof);
317
318		let pov = polkadot_node_primitives::maybe_compress_pov(PoV {
319			block_data: BlockData(if api_version >= 3 {
320				block_data.encode()
321			} else {
322				let block_data = block_data.as_v0();
323
324				if block_data.is_none() {
325					tracing::error!(
326						target: LOG_TARGET,
327						"Trying to submit a collation with multiple blocks is not supported by the current runtime."
328					);
329				}
330
331				block_data?.encode()
332			}),
333		});
334
335		// If we got some signals, push them now.
336		if !upward_message_signals.is_empty() {
337			upward_messages.push(UMP_SEPARATOR);
338			upward_messages.extend(upward_message_signals.into_iter());
339		}
340
341		let upward_messages = upward_messages
342			.try_into()
343			.map_err(|e| {
344				tracing::error!(
345					target: LOG_TARGET,
346					error = ?e,
347					"Number of upward messages should not be greater than `MAX_UPWARD_MESSAGE_NUM`",
348				)
349			})
350			.ok()?;
351		let horizontal_messages = horizontal_messages
352			.try_into()
353			.map_err(|e| {
354				tracing::error!(
355					target: LOG_TARGET,
356					error = ?e,
357					"Number of horizontal messages should not be greater than `MAX_HORIZONTAL_MESSAGE_NUM`",
358				)
359			})
360			.ok()?;
361
362		let collation = Collation {
363			upward_messages,
364			new_validation_code,
365			processed_downward_messages,
366			horizontal_messages,
367			// If these are `None`, there was no block.
368			hrmp_watermark: hrmp_watermark?,
369			head_data: head_data?,
370			proof_of_validity: MaybeCompressedPoV::Compressed(pov),
371		};
372
373		Some((collation, block_data))
374	}
375
376	/// Inform the networking systems that the block should be announced after an appropriate
377	/// signal has been received. This returns the sending half of the signal.
378	pub fn announce_with_barrier(
379		&self,
380		block_hash: Block::Hash,
381	) -> oneshot::Sender<CollationSecondedSignal> {
382		let (result_sender, signed_stmt_recv) = oneshot::channel();
383		self.wait_to_announce.lock().wait_to_announce(block_hash, signed_stmt_recv);
384		result_sender
385	}
386}
387
388impl<Block, BS, RA> ServiceInterface<Block> for CollatorService<Block, BS, RA>
389where
390	Block: BlockT,
391	BS: BlockBackend<Block>,
392	RA: ProvideRuntimeApi<Block>,
393	RA::Api: CollectCollationInfo<Block>,
394{
395	fn check_block_status(&self, hash: Block::Hash, header: &Block::Header) -> bool {
396		CollatorService::check_block_status(self, hash, header)
397	}
398
399	fn build_collation(
400		&self,
401		parent_header: &Block::Header,
402		_: Block::Hash,
403		candidate: ParachainCandidate<Block>,
404		scheduling_proof: Option<SchedulingProof>,
405	) -> Option<(Collation, ParachainBlockData<Block>)> {
406		CollatorService::build_multi_block_collation(
407			self,
408			parent_header,
409			vec![candidate.block],
410			candidate.proof,
411			scheduling_proof,
412		)
413	}
414
415	fn announce_with_barrier(
416		&self,
417		block_hash: Block::Hash,
418	) -> oneshot::Sender<CollationSecondedSignal> {
419		CollatorService::announce_with_barrier(self, block_hash)
420	}
421
422	fn announce_block(&self, block_hash: Block::Hash, data: Option<Vec<u8>>) {
423		(self.announce_block)(block_hash, data)
424	}
425
426	fn build_multi_block_collation(
427		&self,
428		parent_header: &<Block as BlockT>::Header,
429		blocks: Vec<Block>,
430		proof: StorageProof,
431		scheduling_proof: Option<SchedulingProof>,
432	) -> Option<(Collation, ParachainBlockData<Block>)> {
433		CollatorService::build_multi_block_collation(
434			self,
435			parent_header,
436			blocks,
437			proof,
438			scheduling_proof,
439		)
440	}
441}