referrerpolicy=no-referrer-when-downgrade

cumulus_client_consensus_aura/collators/
lookahead.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! A collator for Aura that looks ahead of the most recently included parachain block
19//! when determining what to build upon.
20//!
21//! This collator also builds additional blocks when the maximum backlog is not saturated.
22//! The size of the backlog is determined by invoking a runtime API. If that runtime API
23//! is not supported, this assumes a maximum backlog size of 1.
24//!
25//! This takes more advantage of asynchronous backing, though not complete advantage.
26//! When the backlog is not saturated, this approach lets the backlog temporarily 'catch up'
27//! with periods of higher throughput. When the backlog is saturated, we typically
28//! fall back to the limited cadence of a single parachain block per relay-chain block.
29//!
30//! Despite this, the fact that there is a backlog at all allows us to spend more time
31//! building the block, as there is some buffer before it can get posted to the relay-chain.
32//! The main limitation is block propagation time - i.e. the new blocks created by an author
33//! must be propagated to the next author before their turn.
34
35use codec::{Codec, Encode};
36use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
37use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
38use cumulus_client_consensus_proposer::ProposerInterface;
39use cumulus_primitives_aura::AuraUnincludedSegmentApi;
40use cumulus_primitives_core::{CollectCollationInfo, PersistedValidationData};
41use cumulus_relay_chain_interface::RelayChainInterface;
42
43use polkadot_node_primitives::SubmitCollationParams;
44use polkadot_node_subsystem::messages::CollationGenerationMessage;
45use polkadot_overseer::Handle as OverseerHandle;
46use polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption};
47
48use crate::{collator as collator_util, collators::claim_queue_at, export_pov_to_path};
49use futures::prelude::*;
50use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
51use sc_consensus::BlockImport;
52use sp_api::ProvideRuntimeApi;
53use sp_application_crypto::AppPublic;
54use sp_blockchain::HeaderBackend;
55use sp_consensus_aura::{AuraApi, Slot};
56use sp_core::crypto::Pair;
57use sp_inherents::CreateInherentDataProviders;
58use sp_keystore::KeystorePtr;
59use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
60use std::{path::PathBuf, sync::Arc, time::Duration};
61
62/// Parameters for [`run`].
63pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
64	/// Inherent data providers. Only non-consensus inherent data should be provided, i.e.
65	/// the timestamp, slot, and paras inherents should be omitted, as they are set by this
66	/// collator.
67	pub create_inherent_data_providers: CIDP,
68	/// Used to actually import blocks.
69	pub block_import: BI,
70	/// The underlying para client.
71	pub para_client: Arc<Client>,
72	/// The para client's backend, used to access the database.
73	pub para_backend: Arc<Backend>,
74	/// A handle to the relay-chain client.
75	pub relay_client: RClient,
76	/// A validation code hash provider, used to get the current validation code hash.
77	pub code_hash_provider: CHP,
78	/// The underlying keystore, which should contain Aura consensus keys.
79	pub keystore: KeystorePtr,
80	/// The collator key used to sign collations before submitting to validators.
81	pub collator_key: CollatorPair,
82	/// The para's ID.
83	pub para_id: ParaId,
84	/// A handle to the relay-chain client's "Overseer" or task orchestrator.
85	pub overseer_handle: OverseerHandle,
86	/// The length of slots in the relay chain.
87	pub relay_chain_slot_duration: Duration,
88	/// The underlying block proposer this should call into.
89	pub proposer: Proposer,
90	/// The generic collator service used to plug into this consensus engine.
91	pub collator_service: CS,
92	/// The amount of time to spend authoring each block.
93	pub authoring_duration: Duration,
94	/// Whether we should reinitialize the collator config (i.e. we are transitioning to aura).
95	pub reinitialize: bool,
96	/// The maximum percentage of the maximum PoV size that the collator can use.
97	/// It will be removed once <https://github.com/paritytech/polkadot-sdk/issues/6020> is fixed.
98	pub max_pov_percentage: Option<u32>,
99}
100
101/// Run async-backing-friendly Aura.
102pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>(
103	params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>,
104) -> impl Future<Output = ()> + Send + 'static
105where
106	Block: BlockT,
107	Client: ProvideRuntimeApi<Block>
108		+ BlockOf
109		+ AuxStore
110		+ HeaderBackend<Block>
111		+ BlockBackend<Block>
112		+ Send
113		+ Sync
114		+ 'static,
115	Client::Api:
116		AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
117	Backend: sc_client_api::Backend<Block> + 'static,
118	RClient: RelayChainInterface + Clone + 'static,
119	CIDP: CreateInherentDataProviders<Block, ()> + 'static,
120	CIDP::InherentDataProviders: Send,
121	BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
122	Proposer: ProposerInterface<Block> + Send + Sync + 'static,
123	CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
124	CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
125	P: Pair,
126	P::Public: AppPublic + Member + Codec,
127	P::Signature: TryFrom<Vec<u8>> + Member + Codec,
128{
129	run_with_export::<_, P, _, _, _, _, _, _, _, _>(ParamsWithExport { params, export_pov: None })
130}
131
132/// Parameters for [`run_with_export`].
133pub struct ParamsWithExport<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
134	/// The parameters.
135	pub params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>,
136
137	/// When set, the collator will export every produced `POV` to this folder.
138	pub export_pov: Option<PathBuf>,
139}
140
141/// Run async-backing-friendly Aura.
142///
143/// This is exactly the same as [`run`], but it supports the optional export of each produced `POV`
144/// to the file system.
145pub fn run_with_export<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>(
146	ParamsWithExport { mut params, export_pov }: ParamsWithExport<
147		BI,
148		CIDP,
149		Client,
150		Backend,
151		RClient,
152		CHP,
153		Proposer,
154		CS,
155	>,
156) -> impl Future<Output = ()> + Send + 'static
157where
158	Block: BlockT,
159	Client: ProvideRuntimeApi<Block>
160		+ BlockOf
161		+ AuxStore
162		+ HeaderBackend<Block>
163		+ BlockBackend<Block>
164		+ Send
165		+ Sync
166		+ 'static,
167	Client::Api:
168		AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
169	Backend: sc_client_api::Backend<Block> + 'static,
170	RClient: RelayChainInterface + Clone + 'static,
171	CIDP: CreateInherentDataProviders<Block, ()> + 'static,
172	CIDP::InherentDataProviders: Send,
173	BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
174	Proposer: ProposerInterface<Block> + Send + Sync + 'static,
175	CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
176	CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
177	P: Pair,
178	P::Public: AppPublic + Member + Codec,
179	P::Signature: TryFrom<Vec<u8>> + Member + Codec,
180{
181	async move {
182		cumulus_client_collator::initialize_collator_subsystems(
183			&mut params.overseer_handle,
184			params.collator_key,
185			params.para_id,
186			params.reinitialize,
187		)
188		.await;
189
190		let mut import_notifications = match params.relay_client.import_notification_stream().await
191		{
192			Ok(s) => s,
193			Err(err) => {
194				tracing::error!(
195					target: crate::LOG_TARGET,
196					?err,
197					"Failed to initialize consensus: no relay chain import notification stream"
198				);
199
200				return
201			},
202		};
203
204		let mut collator = {
205			let params = collator_util::Params {
206				create_inherent_data_providers: params.create_inherent_data_providers,
207				block_import: params.block_import,
208				relay_client: params.relay_client.clone(),
209				keystore: params.keystore.clone(),
210				para_id: params.para_id,
211				proposer: params.proposer,
212				collator_service: params.collator_service,
213			};
214
215			collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
216		};
217
218		while let Some(relay_parent_header) = import_notifications.next().await {
219			let relay_parent = relay_parent_header.hash();
220
221			let Some(core_index) = claim_queue_at(relay_parent, &mut params.relay_client)
222				.await
223				.iter_claims_at_depth_for_para(0, params.para_id)
224				.next()
225			else {
226				tracing::trace!(
227					target: crate::LOG_TARGET,
228					?relay_parent,
229					?params.para_id,
230					"Para is not scheduled on any core, skipping import notification",
231				);
232
233				continue
234			};
235
236			let max_pov_size = match params
237				.relay_client
238				.persisted_validation_data(
239					relay_parent,
240					params.para_id,
241					OccupiedCoreAssumption::Included,
242				)
243				.await
244			{
245				Ok(None) => continue,
246				Ok(Some(pvd)) => pvd.max_pov_size,
247				Err(err) => {
248					tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
249					continue
250				},
251			};
252
253			let (included_block, initial_parent) = match crate::collators::find_parent(
254				relay_parent,
255				params.para_id,
256				&*params.para_backend,
257				&params.relay_client,
258			)
259			.await
260			{
261				Some(value) => value,
262				None => continue,
263			};
264
265			let para_client = &*params.para_client;
266			let keystore = &params.keystore;
267			let can_build_upon = |block_hash| {
268				let slot_duration = match sc_consensus_aura::standalone::slot_duration_at(
269					&*params.para_client,
270					block_hash,
271				) {
272					Ok(sd) => sd,
273					Err(err) => {
274						tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to acquire parachain slot duration");
275						return None
276					},
277				};
278				tracing::debug!(target: crate::LOG_TARGET, ?slot_duration, ?block_hash, "Parachain slot duration acquired");
279				let (relay_slot, timestamp) = consensus_common::relay_slot_and_timestamp(
280					&relay_parent_header,
281					params.relay_chain_slot_duration,
282				)?;
283				let slot_now = Slot::from_timestamp(timestamp, slot_duration);
284				tracing::debug!(
285					target: crate::LOG_TARGET,
286					?relay_slot,
287					para_slot = ?slot_now,
288					?timestamp,
289					?slot_duration,
290					relay_chain_slot_duration = ?params.relay_chain_slot_duration,
291					"Adjusted relay-chain slot to parachain slot"
292				);
293				Some(super::can_build_upon::<_, _, P>(
294					slot_now,
295					relay_slot,
296					timestamp,
297					block_hash,
298					included_block.hash(),
299					para_client,
300					&keystore,
301				))
302			};
303
304			// Build in a loop until not allowed. Note that the authorities can change
305			// at any block, so we need to re-claim our slot every time.
306			let mut parent_hash = initial_parent.hash;
307			let mut parent_header = initial_parent.header;
308			let overseer_handle = &mut params.overseer_handle;
309
310			// Do not try to build upon an unknown, pruned or bad block
311			if !collator.collator_service().check_block_status(parent_hash, &parent_header) {
312				continue
313			}
314
315			// This needs to change to support elastic scaling, but for continuously
316			// scheduled chains this ensures that the backlog will grow steadily.
317			for n_built in 0..2 {
318				let slot_claim = match can_build_upon(parent_hash) {
319					Some(fut) => match fut.await {
320						None => break,
321						Some(c) => c,
322					},
323					None => break,
324				};
325
326				tracing::debug!(
327					target: crate::LOG_TARGET,
328					?relay_parent,
329					unincluded_segment_len = initial_parent.depth + n_built,
330					"Slot claimed. Building"
331				);
332
333				let validation_data = PersistedValidationData {
334					parent_head: parent_header.encode().into(),
335					relay_parent_number: *relay_parent_header.number(),
336					relay_parent_storage_root: *relay_parent_header.state_root(),
337					max_pov_size,
338				};
339
340				// Build and announce collations recursively until
341				// `can_build_upon` fails or building a collation fails.
342				let (parachain_inherent_data, other_inherent_data) = match collator
343					.create_inherent_data(
344						relay_parent,
345						&validation_data,
346						parent_hash,
347						slot_claim.timestamp(),
348					)
349					.await
350				{
351					Err(err) => {
352						tracing::error!(target: crate::LOG_TARGET, ?err);
353						break
354					},
355					Ok(x) => x,
356				};
357
358				let Some(validation_code_hash) =
359					params.code_hash_provider.code_hash_at(parent_hash)
360				else {
361					tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
362					break
363				};
364
365				super::check_validation_code_or_log(
366					&validation_code_hash,
367					params.para_id,
368					&params.relay_client,
369					relay_parent,
370				)
371				.await;
372
373				let allowed_pov_size = if let Some(max_pov_percentage) = params.max_pov_percentage {
374					validation_data.max_pov_size * max_pov_percentage / 100
375				} else {
376					// Set the block limit to 85% of the maximum PoV size.
377					//
378					// Once https://github.com/paritytech/polkadot-sdk/issues/6020 issue is
379					// fixed, the reservation should be removed.
380					validation_data.max_pov_size * 85 / 100
381				} as usize;
382
383				match collator
384					.collate(
385						&parent_header,
386						&slot_claim,
387						None,
388						(parachain_inherent_data, other_inherent_data),
389						params.authoring_duration,
390						allowed_pov_size,
391					)
392					.await
393				{
394					Ok(Some((collation, block_data))) => {
395						let Some(new_block_header) =
396							block_data.blocks().first().map(|b| b.header().clone())
397						else {
398							tracing::error!(target: crate::LOG_TARGET,  "Produced PoV doesn't contain any blocks");
399							break
400						};
401
402						let new_block_hash = new_block_header.hash();
403
404						// Here we are assuming that the import logic protects against equivocations
405						// and provides sybil-resistance, as it should.
406						collator.collator_service().announce_block(new_block_hash, None);
407
408						if let Some(ref export_pov) = export_pov {
409							export_pov_to_path::<Block>(
410								export_pov.clone(),
411								collation.proof_of_validity.clone().into_compressed(),
412								new_block_hash,
413								*new_block_header.number(),
414								parent_header.clone(),
415								*relay_parent_header.state_root(),
416								*relay_parent_header.number(),
417								validation_data.max_pov_size,
418							);
419						}
420
421						// Send a submit-collation message to the collation generation subsystem,
422						// which then distributes this to validators.
423						//
424						// Here we are assuming that the leaf is imported, as we've gotten an
425						// import notification.
426						overseer_handle
427							.send_msg(
428								CollationGenerationMessage::SubmitCollation(
429									SubmitCollationParams {
430										relay_parent,
431										collation,
432										parent_head: parent_header.encode().into(),
433										validation_code_hash,
434										result_sender: None,
435										core_index,
436									},
437								),
438								"SubmitCollation",
439							)
440							.await;
441
442						parent_hash = new_block_hash;
443						parent_header = new_block_header;
444					},
445					Ok(None) => {
446						tracing::debug!(target: crate::LOG_TARGET, "No block proposal");
447						break
448					},
449					Err(err) => {
450						tracing::error!(target: crate::LOG_TARGET, ?err);
451						break
452					},
453				}
454			}
455		}
456	}
457}