referrerpolicy=no-referrer-when-downgrade

cumulus_client_consensus_aura/collators/
lookahead.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! A collator for Aura that looks ahead of the most recently included parachain block
19//! when determining what to build upon.
20//!
21//! This collator also builds additional blocks when the maximum backlog is not saturated.
22//! The size of the backlog is determined by invoking a runtime API. If that runtime API
23//! is not supported, this assumes a maximum backlog size of 1.
24//!
25//! This takes more advantage of asynchronous backing, though not complete advantage.
26//! When the backlog is not saturated, this approach lets the backlog temporarily 'catch up'
27//! with periods of higher throughput. When the backlog is saturated, we typically
28//! fall back to the limited cadence of a single parachain block per relay-chain block.
29//!
30//! Despite this, the fact that there is a backlog at all allows us to spend more time
31//! building the block, as there is some buffer before it can get posted to the relay-chain.
32//! The main limitation is block propagation time - i.e. the new blocks created by an author
33//! must be propagated to the next author before their turn.
34
35use codec::{Codec, Encode};
36use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
37use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
38use cumulus_primitives_aura::AuraUnincludedSegmentApi;
39use cumulus_primitives_core::{CollectCollationInfo, PersistedValidationData};
40use cumulus_relay_chain_interface::RelayChainInterface;
41use sp_consensus::Environment;
42
43use polkadot_node_primitives::SubmitCollationParams;
44use polkadot_node_subsystem::messages::CollationGenerationMessage;
45use polkadot_overseer::Handle as OverseerHandle;
46use polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption};
47
48use crate::{
49	collator as collator_util,
50	collators::{claim_queue_at, BackingGroupConnectionHelper},
51	export_pov_to_path,
52};
53use futures::prelude::*;
54use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
55use sc_consensus::BlockImport;
56use sc_network_types::PeerId;
57use sp_api::ProvideRuntimeApi;
58use sp_application_crypto::AppPublic;
59use sp_blockchain::HeaderBackend;
60use sp_consensus_aura::{AuraApi, Slot};
61use sp_core::crypto::Pair;
62use sp_inherents::CreateInherentDataProviders;
63use sp_keystore::KeystorePtr;
64use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
65use sp_timestamp::Timestamp;
66use std::{path::PathBuf, sync::Arc, time::Duration};
67
68/// Parameters for [`run`].
69pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, ProposerFactory, CS> {
70	/// Inherent data providers. Only non-consensus inherent data should be provided, i.e.
71	/// the timestamp, slot, and paras inherents should be omitted, as they are set by this
72	/// collator.
73	pub create_inherent_data_providers: CIDP,
74	/// Used to actually import blocks.
75	pub block_import: BI,
76	/// The underlying para client.
77	pub para_client: Arc<Client>,
78	/// The para client's backend, used to access the database.
79	pub para_backend: Arc<Backend>,
80	/// A handle to the relay-chain client.
81	pub relay_client: RClient,
82	/// A validation code hash provider, used to get the current validation code hash.
83	pub code_hash_provider: CHP,
84	/// The underlying keystore, which should contain Aura consensus keys.
85	pub keystore: KeystorePtr,
86	/// The collator key used to sign collations before submitting to validators.
87	pub collator_key: CollatorPair,
88	/// The collator network peer id.
89	pub collator_peer_id: PeerId,
90	/// The para's ID.
91	pub para_id: ParaId,
92	/// A handle to the relay-chain client's "Overseer" or task orchestrator.
93	pub overseer_handle: OverseerHandle,
94	/// The length of slots in the relay chain.
95	pub relay_chain_slot_duration: Duration,
96	/// The proposer for building blocks.
97	pub proposer: ProposerFactory,
98	/// The generic collator service used to plug into this consensus engine.
99	pub collator_service: CS,
100	/// The amount of time to spend authoring each block.
101	pub authoring_duration: Duration,
102	/// Whether we should reinitialize the collator config (i.e. we are transitioning to aura).
103	pub reinitialize: bool,
104	/// The maximum percentage of the maximum PoV size that the collator can use.
105	/// It will be removed once <https://github.com/paritytech/polkadot-sdk/issues/6020> is fixed.
106	pub max_pov_percentage: Option<u32>,
107}
108
109/// Get the current parachain slot from a given block hash.
110///
111/// Returns the parachain slot, relay chain slot, and timestamp.
112fn get_parachain_slot<Block, Client, P>(
113	para_client: &Client,
114	block_hash: Block::Hash,
115	relay_parent_header: &polkadot_primitives::Header,
116	relay_chain_slot_duration: Duration,
117) -> Option<(Slot, Slot, Timestamp)>
118where
119	Block: BlockT,
120	Client: ProvideRuntimeApi<Block>,
121	Client::Api: AuraApi<Block, P>,
122	P: Codec,
123{
124	let slot_duration =
125		match sc_consensus_aura::standalone::slot_duration_at(para_client, block_hash) {
126			Ok(sd) => sd,
127			Err(err) => {
128				tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to acquire parachain slot duration");
129				return None
130			},
131		};
132
133	tracing::debug!(target: crate::LOG_TARGET, ?slot_duration, ?block_hash, "Parachain slot duration acquired");
134
135	let (relay_slot, timestamp) =
136		consensus_common::relay_slot_and_timestamp(relay_parent_header, relay_chain_slot_duration)?;
137
138	let slot_now = Slot::from_timestamp(timestamp, slot_duration);
139
140	tracing::debug!(
141		target: crate::LOG_TARGET,
142		?relay_slot,
143		para_slot = ?slot_now,
144		?timestamp,
145		?slot_duration,
146		?relay_chain_slot_duration,
147		"Adjusted relay-chain slot to parachain slot"
148	);
149
150	Some((slot_now, relay_slot, timestamp))
151}
152
153/// Run async-backing-friendly Aura.
154pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>(
155	params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>,
156) -> impl Future<Output = ()> + Send + 'static
157where
158	Block: BlockT,
159	Client: ProvideRuntimeApi<Block>
160		+ BlockOf
161		+ AuxStore
162		+ HeaderBackend<Block>
163		+ BlockBackend<Block>
164		+ Send
165		+ Sync
166		+ 'static,
167	Client::Api:
168		AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
169	Backend: sc_client_api::Backend<Block> + 'static,
170	RClient: RelayChainInterface + Clone + 'static,
171	CIDP: CreateInherentDataProviders<Block, ()> + 'static,
172	CIDP::InherentDataProviders: Send,
173	BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
174	Proposer: Environment<Block> + Clone + Send + Sync + 'static,
175	CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
176	CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
177	P: Pair + Send + Sync + 'static,
178	P::Public: AppPublic + Member + Codec,
179	P::Signature: TryFrom<Vec<u8>> + Member + Codec,
180{
181	run_with_export::<_, P, _, _, _, _, _, _, _, _>(ParamsWithExport { params, export_pov: None })
182}
183
184/// Parameters for [`run_with_export`].
185pub struct ParamsWithExport<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
186	/// The parameters.
187	pub params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>,
188
189	/// When set, the collator will export every produced `POV` to this folder.
190	pub export_pov: Option<PathBuf>,
191}
192
193/// Run async-backing-friendly Aura.
194///
195/// This is exactly the same as [`run`], but it supports the optional export of each produced `POV`
196/// to the file system.
197pub fn run_with_export<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>(
198	ParamsWithExport { mut params, export_pov }: ParamsWithExport<
199		BI,
200		CIDP,
201		Client,
202		Backend,
203		RClient,
204		CHP,
205		Proposer,
206		CS,
207	>,
208) -> impl Future<Output = ()> + Send + 'static
209where
210	Block: BlockT,
211	Client: ProvideRuntimeApi<Block>
212		+ BlockOf
213		+ AuxStore
214		+ HeaderBackend<Block>
215		+ BlockBackend<Block>
216		+ Send
217		+ Sync
218		+ 'static,
219	Client::Api:
220		AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
221	Backend: sc_client_api::Backend<Block> + 'static,
222	RClient: RelayChainInterface + Clone + 'static,
223	CIDP: CreateInherentDataProviders<Block, ()> + 'static,
224	CIDP::InherentDataProviders: Send,
225	BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
226	Proposer: Environment<Block> + Clone + Send + Sync + 'static,
227	CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
228	CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
229	P: Pair + Send + Sync + 'static,
230	P::Public: AppPublic + Member + Codec,
231	P::Signature: TryFrom<Vec<u8>> + Member + Codec,
232{
233	async move {
234		cumulus_client_collator::initialize_collator_subsystems(
235			&mut params.overseer_handle,
236			params.collator_key,
237			params.para_id,
238			params.reinitialize,
239		)
240		.await;
241
242		let mut import_notifications = match params.relay_client.import_notification_stream().await
243		{
244			Ok(s) => s,
245			Err(err) => {
246				tracing::error!(
247					target: crate::LOG_TARGET,
248					?err,
249					"Failed to initialize consensus: no relay chain import notification stream"
250				);
251
252				return
253			},
254		};
255
256		let mut collator = {
257			let params = collator_util::Params {
258				create_inherent_data_providers: params.create_inherent_data_providers,
259				block_import: params.block_import,
260				relay_client: params.relay_client.clone(),
261				keystore: params.keystore.clone(),
262				collator_peer_id: params.collator_peer_id,
263				para_id: params.para_id,
264				proposer: params.proposer,
265				collator_service: params.collator_service,
266			};
267
268			collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
269		};
270
271		let mut connection_helper = BackingGroupConnectionHelper::new(
272			params.keystore.clone(),
273			params.overseer_handle.clone(),
274		);
275
276		while let Some(relay_parent_header) = import_notifications.next().await {
277			let relay_parent = relay_parent_header.hash();
278
279			let Some(core_index) = claim_queue_at(relay_parent, &mut params.relay_client)
280				.await
281				.iter_claims_at_depth_for_para(0, params.para_id)
282				.next()
283			else {
284				tracing::trace!(
285					target: crate::LOG_TARGET,
286					?relay_parent,
287					?params.para_id,
288					"Para is not scheduled on any core, skipping import notification",
289				);
290
291				continue
292			};
293
294			let max_pov_size = match params
295				.relay_client
296				.persisted_validation_data(
297					relay_parent,
298					params.para_id,
299					OccupiedCoreAssumption::Included,
300				)
301				.await
302			{
303				Ok(None) => continue,
304				Ok(Some(pvd)) => pvd.max_pov_size,
305				Err(err) => {
306					tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
307					continue
308				},
309			};
310
311			let (included_block, initial_parent) = match crate::collators::find_parent(
312				relay_parent,
313				params.para_id,
314				&*params.para_backend,
315				&params.relay_client,
316			)
317			.await
318			{
319				Some(value) => value,
320				None => continue,
321			};
322
323			let para_client = &*params.para_client;
324			let keystore = &params.keystore;
325			let can_build_upon = |block_hash| {
326				let (slot_now, relay_slot, timestamp) = get_parachain_slot::<_, _, P::Public>(
327					para_client,
328					block_hash,
329					&relay_parent_header,
330					params.relay_chain_slot_duration,
331				)?;
332
333				Some(super::can_build_upon::<_, _, P>(
334					slot_now,
335					relay_slot,
336					timestamp,
337					block_hash,
338					included_block.hash(),
339					para_client,
340					&keystore,
341				))
342			};
343
344			// Build in a loop until not allowed. Note that the authorities can change
345			// at any block, so we need to re-claim our slot every time.
346			let mut parent_hash = initial_parent.hash;
347			let mut parent_header = initial_parent.header;
348			let overseer_handle = &mut params.overseer_handle;
349
350			// Do not try to build upon an unknown, pruned or bad block
351			if !collator.collator_service().check_block_status(parent_hash, &parent_header) {
352				continue
353			}
354
355			// Trigger pre-conect to backing groups if necessary.
356			if let (Some((slot_now, _relay_slot, _timestamp)), Ok(authorities)) = (
357				get_parachain_slot::<_, _, P::Public>(
358					para_client,
359					parent_hash,
360					&relay_parent_header,
361					params.relay_chain_slot_duration,
362				),
363				para_client.runtime_api().authorities(parent_hash),
364			) {
365				connection_helper.update::<P>(slot_now, &authorities).await;
366			}
367
368			// This needs to change to support elastic scaling, but for continuously
369			// scheduled chains this ensures that the backlog will grow steadily.
370			for n_built in 0..2 {
371				let slot_claim = match can_build_upon(parent_hash) {
372					Some(fut) => match fut.await {
373						None => break,
374						Some(c) => c,
375					},
376					None => break,
377				};
378
379				tracing::debug!(
380					target: crate::LOG_TARGET,
381					?relay_parent,
382					unincluded_segment_len = initial_parent.depth + n_built,
383					"Slot claimed. Building"
384				);
385
386				let validation_data = PersistedValidationData {
387					parent_head: parent_header.encode().into(),
388					relay_parent_number: *relay_parent_header.number(),
389					relay_parent_storage_root: *relay_parent_header.state_root(),
390					max_pov_size,
391				};
392
393				// Build and announce collations recursively until
394				// `can_build_upon` fails or building a collation fails.
395				let (parachain_inherent_data, other_inherent_data) = match collator
396					.create_inherent_data(
397						relay_parent,
398						&validation_data,
399						parent_hash,
400						slot_claim.timestamp(),
401						params.collator_peer_id,
402					)
403					.await
404				{
405					Err(err) => {
406						tracing::error!(target: crate::LOG_TARGET, ?err);
407						break
408					},
409					Ok(x) => x,
410				};
411
412				let Some(validation_code_hash) =
413					params.code_hash_provider.code_hash_at(parent_hash)
414				else {
415					tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
416					break
417				};
418
419				super::check_validation_code_or_log(
420					&validation_code_hash,
421					params.para_id,
422					&params.relay_client,
423					relay_parent,
424				)
425				.await;
426
427				let allowed_pov_size = if let Some(max_pov_percentage) = params.max_pov_percentage {
428					validation_data.max_pov_size * max_pov_percentage / 100
429				} else {
430					// Set the block limit to 85% of the maximum PoV size.
431					//
432					// Once https://github.com/paritytech/polkadot-sdk/issues/6020 issue is
433					// fixed, the reservation should be removed.
434					validation_data.max_pov_size * 85 / 100
435				} as usize;
436
437				match collator
438					.collate(
439						&parent_header,
440						&slot_claim,
441						None,
442						(parachain_inherent_data, other_inherent_data),
443						params.authoring_duration,
444						allowed_pov_size,
445					)
446					.await
447				{
448					Ok(Some((collation, block_data))) => {
449						let Some(new_block_header) =
450							block_data.blocks().first().map(|b| b.header().clone())
451						else {
452							tracing::error!(target: crate::LOG_TARGET,  "Produced PoV doesn't contain any blocks");
453							break
454						};
455
456						let new_block_hash = new_block_header.hash();
457
458						// Here we are assuming that the import logic protects against equivocations
459						// and provides sybil-resistance, as it should.
460						collator.collator_service().announce_block(new_block_hash, None);
461
462						if let Some(ref export_pov) = export_pov {
463							export_pov_to_path::<Block>(
464								export_pov.clone(),
465								collation.proof_of_validity.clone().into_compressed(),
466								new_block_hash,
467								*new_block_header.number(),
468								parent_header.clone(),
469								*relay_parent_header.state_root(),
470								*relay_parent_header.number(),
471								validation_data.max_pov_size,
472							);
473						}
474
475						// Send a submit-collation message to the collation generation subsystem,
476						// which then distributes this to validators.
477						//
478						// Here we are assuming that the leaf is imported, as we've gotten an
479						// import notification.
480						overseer_handle
481							.send_msg(
482								CollationGenerationMessage::SubmitCollation(
483									SubmitCollationParams {
484										relay_parent,
485										collation,
486										parent_head: parent_header.encode().into(),
487										validation_code_hash,
488										result_sender: None,
489										core_index,
490									},
491								),
492								"SubmitCollation",
493							)
494							.await;
495
496						parent_hash = new_block_hash;
497						parent_header = new_block_header;
498					},
499					Ok(None) => {
500						tracing::debug!(target: crate::LOG_TARGET, "No block proposal");
501						break
502					},
503					Err(err) => {
504						tracing::error!(target: crate::LOG_TARGET, ?err);
505						break
506					},
507				}
508			}
509		}
510	}
511}