referrerpolicy=no-referrer-when-downgrade

cumulus_client_consensus_aura/collators/slot_based/
collation_task.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use codec::Encode;
19use std::path::PathBuf;
20
21use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
22use cumulus_relay_chain_interface::RelayChainInterface;
23
24use polkadot_node_primitives::{MaybeCompressedPoV, SubmitCollationParams};
25use polkadot_node_subsystem::messages::CollationGenerationMessage;
26use polkadot_overseer::Handle as OverseerHandle;
27use polkadot_primitives::{CollatorPair, Id as ParaId};
28
29use cumulus_primitives_core::relay_chain::BlockId;
30use futures::prelude::*;
31
32use crate::export_pov_to_path;
33use sc_utils::mpsc::TracingUnboundedReceiver;
34use sp_runtime::traits::{Block as BlockT, Header};
35
36use super::CollatorMessage;
37
38const LOG_TARGET: &str = "aura::cumulus::collation_task";
39
40/// Parameters for the collation task.
41pub struct Params<Block: BlockT, RClient, CS> {
42	/// A handle to the relay-chain client.
43	pub relay_client: RClient,
44	/// The collator key used to sign collations before submitting to validators.
45	pub collator_key: CollatorPair,
46	/// The para's ID.
47	pub para_id: ParaId,
48	/// Whether we should reinitialize the collator config (i.e. we are transitioning to aura).
49	pub reinitialize: bool,
50	/// Collator service interface
51	pub collator_service: CS,
52	/// Receiver channel for communication with the block builder task.
53	pub collator_receiver: TracingUnboundedReceiver<CollatorMessage<Block>>,
54	/// The handle from the special slot based block import.
55	pub block_import_handle: super::SlotBasedBlockImportHandle<Block>,
56	/// When set, the collator will export every produced `POV` to this folder.
57	pub export_pov: Option<PathBuf>,
58}
59
60/// Asynchronously executes the collation task for a parachain.
61///
62/// This function initializes the collator subsystems necessary for producing and submitting
63/// collations to the relay chain. It listens for new best relay chain block notifications and
64/// handles collator messages. If our parachain is scheduled on a core and we have a candidate,
65/// the task will build a collation and send it to the relay chain.
66pub async fn run_collation_task<Block, RClient, CS>(
67	Params {
68		relay_client,
69		collator_key,
70		para_id,
71		reinitialize,
72		collator_service,
73		mut collator_receiver,
74		mut block_import_handle,
75		export_pov,
76	}: Params<Block, RClient, CS>,
77) where
78	Block: BlockT,
79	CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
80	RClient: RelayChainInterface + Clone + 'static,
81{
82	let Ok(mut overseer_handle) = relay_client.overseer_handle() else {
83		tracing::error!(target: LOG_TARGET, "Failed to get overseer handle.");
84		return
85	};
86
87	cumulus_client_collator::initialize_collator_subsystems(
88		&mut overseer_handle,
89		collator_key,
90		para_id,
91		reinitialize,
92	)
93	.await;
94
95	loop {
96		futures::select! {
97			collator_message = collator_receiver.next() => {
98				let Some(message) = collator_message else {
99					return;
100				};
101
102				handle_collation_message(message, &collator_service, &mut overseer_handle,relay_client.clone(),export_pov.clone()).await;
103			},
104			block_import_msg = block_import_handle.next().fuse() => {
105				// TODO: Implement me.
106				// Issue: https://github.com/paritytech/polkadot-sdk/issues/6495
107				let _ = block_import_msg;
108			}
109		}
110	}
111}
112
113/// Handle an incoming collation message from the block builder task.
114/// This builds the collation from the [`CollatorMessage`] and submits it to
115/// the collation-generation subsystem of the relay chain.
116async fn handle_collation_message<Block: BlockT, RClient: RelayChainInterface + Clone + 'static>(
117	message: CollatorMessage<Block>,
118	collator_service: &impl CollatorServiceInterface<Block>,
119	overseer_handle: &mut OverseerHandle,
120	relay_client: RClient,
121	export_pov: Option<PathBuf>,
122) {
123	let CollatorMessage {
124		parent_header,
125		parachain_candidate,
126		validation_code_hash,
127		relay_parent,
128		core_index,
129		max_pov_size,
130	} = message;
131
132	let hash = parachain_candidate.block.header().hash();
133	let number = *parachain_candidate.block.header().number();
134	let (collation, block_data) =
135		match collator_service.build_collation(&parent_header, hash, parachain_candidate) {
136			Some(collation) => collation,
137			None => {
138				tracing::warn!(target: LOG_TARGET, %hash, ?number, ?core_index, "Unable to build collation.");
139				return;
140			},
141		};
142
143	block_data.log_size_info();
144
145	if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
146		if let Some(pov_path) = export_pov {
147			if let Ok(Some(relay_parent_header)) =
148				relay_client.header(BlockId::Hash(relay_parent)).await
149			{
150				if let Some(header) = block_data.blocks().first().map(|b| b.header()) {
151					export_pov_to_path::<Block>(
152						pov_path.clone(),
153						pov.clone(),
154						header.hash(),
155						*header.number(),
156						parent_header.clone(),
157						relay_parent_header.state_root,
158						relay_parent_header.number,
159						max_pov_size,
160					);
161				}
162			} else {
163				tracing::error!(target: LOG_TARGET, "Failed to get relay parent header from hash: {relay_parent:?}");
164			}
165		}
166
167		tracing::info!(
168			target: LOG_TARGET,
169			"Compressed PoV size: {}kb",
170			pov.block_data.0.len() as f64 / 1024f64,
171		);
172	}
173
174	tracing::debug!(target: LOG_TARGET, ?core_index, ?hash, %number, "Submitting collation for core.");
175
176	overseer_handle
177		.send_msg(
178			CollationGenerationMessage::SubmitCollation(SubmitCollationParams {
179				relay_parent,
180				collation,
181				parent_head: parent_header.encode().into(),
182				validation_code_hash,
183				core_index,
184				result_sender: None,
185			}),
186			"SubmitCollation",
187		)
188		.await;
189}