referrerpolicy=no-referrer-when-downgrade

cumulus_client_consensus_aura/collators/slot_based/
block_builder_task.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use codec::{Codec, Encode};
19
20use super::CollatorMessage;
21use crate::{
22	collator as collator_util,
23	collators::{
24		check_validation_code_or_log,
25		slot_based::{
26			relay_chain_data_cache::{RelayChainData, RelayChainDataCache},
27			slot_timer::{SlotInfo, SlotTimer},
28		},
29		RelayParentData,
30	},
31	LOG_TARGET,
32};
33use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
34use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
35use cumulus_client_consensus_proposer::ProposerInterface;
36use cumulus_primitives_aura::{AuraUnincludedSegmentApi, Slot};
37use cumulus_primitives_core::{
38	extract_relay_parent, rpsr_digest, ClaimQueueOffset, CoreInfo, CoreSelector, CumulusDigestItem,
39	PersistedValidationData, RelayParentOffsetApi,
40};
41use cumulus_relay_chain_interface::RelayChainInterface;
42use futures::prelude::*;
43use polkadot_primitives::{
44	Block as RelayBlock, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId,
45};
46use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider};
47use sc_consensus::BlockImport;
48use sc_consensus_aura::SlotDuration;
49use sp_api::ProvideRuntimeApi;
50use sp_application_crypto::AppPublic;
51use sp_blockchain::HeaderBackend;
52use sp_consensus_aura::AuraApi;
53use sp_core::crypto::Pair;
54use sp_inherents::CreateInherentDataProviders;
55use sp_keystore::KeystorePtr;
56use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, Zero};
57use std::{collections::VecDeque, sync::Arc, time::Duration};
58
59/// Parameters for [`run_block_builder`].
60pub struct BuilderTaskParams<
61	Block: BlockT,
62	BI,
63	CIDP,
64	Client,
65	Backend,
66	RelayClient,
67	CHP,
68	Proposer,
69	CS,
70> {
71	/// Inherent data providers. Only non-consensus inherent data should be provided, i.e.
72	/// the timestamp, slot, and paras inherents should be omitted, as they are set by this
73	/// collator.
74	pub create_inherent_data_providers: CIDP,
75	/// Used to actually import blocks.
76	pub block_import: BI,
77	/// The underlying para client.
78	pub para_client: Arc<Client>,
79	/// The para client's backend, used to access the database.
80	pub para_backend: Arc<Backend>,
81	/// A handle to the relay-chain client.
82	pub relay_client: RelayClient,
83	/// A validation code hash provider, used to get the current validation code hash.
84	pub code_hash_provider: CHP,
85	/// The underlying keystore, which should contain Aura consensus keys.
86	pub keystore: KeystorePtr,
87	/// The para's ID.
88	pub para_id: ParaId,
89	/// The underlying block proposer this should call into.
90	pub proposer: Proposer,
91	/// The generic collator service used to plug into this consensus engine.
92	pub collator_service: CS,
93	/// The amount of time to spend authoring each block.
94	pub authoring_duration: Duration,
95	/// Channel to send built blocks to the collation task.
96	pub collator_sender: sc_utils::mpsc::TracingUnboundedSender<CollatorMessage<Block>>,
97	/// Slot duration of the relay chain.
98	pub relay_chain_slot_duration: Duration,
99	/// Offset all time operations by this duration.
100	///
101	/// This is a time quantity that is subtracted from the actual timestamp when computing
102	/// the time left to enter a new slot. In practice, this *left-shifts* the clock time with the
103	/// intent to keep our "clock" slightly behind the relay chain one and thus reducing the
104	/// likelihood of encountering unfavorable notification arrival timings (i.e. we don't want to
105	/// wait for relay chain notifications because we woke up too early).
106	pub slot_offset: Duration,
107	/// The maximum percentage of the maximum PoV size that the collator can use.
108	/// It will be removed once https://github.com/paritytech/polkadot-sdk/issues/6020 is fixed.
109	pub max_pov_percentage: Option<u32>,
110}
111
112/// Run block-builder.
113pub fn run_block_builder<Block, P, BI, CIDP, Client, Backend, RelayClient, CHP, Proposer, CS>(
114	params: BuilderTaskParams<Block, BI, CIDP, Client, Backend, RelayClient, CHP, Proposer, CS>,
115) -> impl Future<Output = ()> + Send + 'static
116where
117	Block: BlockT,
118	Client: ProvideRuntimeApi<Block>
119		+ UsageProvider<Block>
120		+ BlockOf
121		+ AuxStore
122		+ HeaderBackend<Block>
123		+ BlockBackend<Block>
124		+ Send
125		+ Sync
126		+ 'static,
127	Client::Api:
128		AuraApi<Block, P::Public> + RelayParentOffsetApi<Block> + AuraUnincludedSegmentApi<Block>,
129	Backend: sc_client_api::Backend<Block> + 'static,
130	RelayClient: RelayChainInterface + Clone + 'static,
131	CIDP: CreateInherentDataProviders<Block, ()> + 'static,
132	CIDP::InherentDataProviders: Send,
133	BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
134	Proposer: ProposerInterface<Block> + Send + Sync + 'static,
135	CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
136	CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
137	P: Pair,
138	P::Public: AppPublic + Member + Codec,
139	P::Signature: TryFrom<Vec<u8>> + Member + Codec,
140{
141	async move {
142		tracing::info!(target: LOG_TARGET, "Starting slot-based block-builder task.");
143		let BuilderTaskParams {
144			relay_client,
145			create_inherent_data_providers,
146			para_client,
147			keystore,
148			block_import,
149			para_id,
150			proposer,
151			collator_service,
152			collator_sender,
153			code_hash_provider,
154			authoring_duration,
155			relay_chain_slot_duration,
156			para_backend,
157			slot_offset,
158			max_pov_percentage,
159		} = params;
160
161		let mut slot_timer = SlotTimer::<_, _, P>::new_with_offset(
162			para_client.clone(),
163			slot_offset,
164			relay_chain_slot_duration,
165		);
166
167		let mut collator = {
168			let params = collator_util::Params {
169				create_inherent_data_providers,
170				block_import,
171				relay_client: relay_client.clone(),
172				keystore: keystore.clone(),
173				para_id,
174				proposer,
175				collator_service,
176			};
177
178			collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
179		};
180
181		let mut relay_chain_data_cache = RelayChainDataCache::new(relay_client.clone(), para_id);
182
183		loop {
184			// We wait here until the next slot arrives.
185			if slot_timer.wait_until_next_slot().await.is_err() {
186				tracing::error!(target: LOG_TARGET, "Unable to wait for next slot.");
187				return;
188			};
189
190			let Ok(relay_best_hash) = relay_client.best_block_hash().await else {
191				tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block hash.");
192				continue
193			};
194
195			let best_hash = para_client.info().best_hash;
196			let relay_parent_offset =
197				para_client.runtime_api().relay_parent_offset(best_hash).unwrap_or_default();
198
199			let Ok(para_slot_duration) = crate::slot_duration(&*para_client) else {
200				tracing::error!(target: LOG_TARGET, "Failed to fetch slot duration from runtime.");
201				continue;
202			};
203
204			let Ok(rp_data) = offset_relay_parent_find_descendants(
205				&mut relay_chain_data_cache,
206				relay_best_hash,
207				relay_parent_offset,
208			)
209			.await
210			else {
211				continue
212			};
213
214			let Some(para_slot) = adjust_para_to_relay_parent_slot(
215				rp_data.relay_parent(),
216				relay_chain_slot_duration,
217				para_slot_duration,
218			) else {
219				continue;
220			};
221
222			let relay_parent = rp_data.relay_parent().hash();
223			let relay_parent_header = rp_data.relay_parent().clone();
224
225			let Some((included_header, parent)) =
226				crate::collators::find_parent(relay_parent, para_id, &*para_backend, &relay_client)
227					.await
228			else {
229				continue
230			};
231
232			let parent_hash = parent.hash;
233			let parent_header = &parent.header;
234
235			// Retrieve the core.
236			let core = match determine_core(
237				&mut relay_chain_data_cache,
238				&relay_parent_header,
239				para_id,
240				parent_header,
241				relay_parent_offset,
242			)
243			.await
244			{
245				Err(()) => {
246					tracing::debug!(
247						target: LOG_TARGET,
248						?relay_parent,
249						"Failed to determine core"
250					);
251
252					continue
253				},
254				Ok(Some(cores)) => {
255					tracing::debug!(
256						target: LOG_TARGET,
257						?relay_parent,
258						core_selector = ?cores.selector,
259						claim_queue_offset = ?cores.claim_queue_offset,
260						"Going to claim core",
261					);
262
263					cores
264				},
265				Ok(None) => {
266					tracing::debug!(
267						target: LOG_TARGET,
268						?relay_parent,
269						"No core scheduled"
270					);
271
272					continue
273				},
274			};
275
276			let Ok(RelayChainData { max_pov_size, last_claimed_core_selector, .. }) =
277				relay_chain_data_cache.get_mut_relay_chain_data(relay_parent).await
278			else {
279				continue;
280			};
281
282			slot_timer.update_scheduling(core.total_cores().into());
283
284			// We mainly call this to inform users at genesis if there is a mismatch with the
285			// on-chain data.
286			collator.collator_service().check_block_status(parent_hash, parent_header);
287
288			let Ok(relay_slot) =
289				sc_consensus_babe::find_pre_digest::<RelayBlock>(&relay_parent_header)
290					.map(|babe_pre_digest| babe_pre_digest.slot())
291			else {
292				tracing::error!(target: crate::LOG_TARGET, "Relay chain does not contain babe slot. This should never happen.");
293				continue;
294			};
295
296			let included_header_hash = included_header.hash();
297
298			let slot_claim = match crate::collators::can_build_upon::<_, _, P>(
299				para_slot.slot,
300				relay_slot,
301				para_slot.timestamp,
302				parent_hash,
303				included_header_hash,
304				&*para_client,
305				&keystore,
306			)
307			.await
308			{
309				Some(slot) => slot,
310				None => {
311					tracing::debug!(
312						target: crate::LOG_TARGET,
313						unincluded_segment_len = parent.depth,
314						relay_parent = ?relay_parent,
315						relay_parent_num = %relay_parent_header.number(),
316						included_hash = ?included_header_hash,
317						included_num = %included_header.number(),
318						parent = ?parent_hash,
319						slot = ?para_slot.slot,
320						"Not building block."
321					);
322					continue
323				},
324			};
325
326			tracing::debug!(
327				target: crate::LOG_TARGET,
328				unincluded_segment_len = parent.depth,
329				relay_parent = %relay_parent,
330				relay_parent_num = %relay_parent_header.number(),
331				relay_parent_offset,
332				included_hash = %included_header_hash,
333				included_num = %included_header.number(),
334				parent = %parent_hash,
335				slot = ?para_slot.slot,
336				"Building block."
337			);
338
339			let validation_data = PersistedValidationData {
340				parent_head: parent_header.encode().into(),
341				relay_parent_number: *relay_parent_header.number(),
342				relay_parent_storage_root: *relay_parent_header.state_root(),
343				max_pov_size: *max_pov_size,
344			};
345
346			let (parachain_inherent_data, other_inherent_data) = match collator
347				.create_inherent_data_with_rp_offset(
348					relay_parent,
349					&validation_data,
350					parent_hash,
351					slot_claim.timestamp(),
352					Some(rp_data),
353				)
354				.await
355			{
356				Err(err) => {
357					tracing::error!(target: crate::LOG_TARGET, ?err);
358					break
359				},
360				Ok(x) => x,
361			};
362
363			let validation_code_hash = match code_hash_provider.code_hash_at(parent_hash) {
364				None => {
365					tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
366					break
367				},
368				Some(v) => v,
369			};
370
371			check_validation_code_or_log(
372				&validation_code_hash,
373				para_id,
374				&relay_client,
375				relay_parent,
376			)
377			.await;
378
379			let allowed_pov_size = if let Some(max_pov_percentage) = max_pov_percentage {
380				validation_data.max_pov_size * max_pov_percentage / 100
381			} else {
382				// Set the block limit to 85% of the maximum PoV size.
383				//
384				// Once https://github.com/paritytech/polkadot-sdk/issues/6020 issue is
385				// fixed, this should be removed.
386				validation_data.max_pov_size * 85 / 100
387			} as usize;
388
389			let adjusted_authoring_duration = match slot_timer.time_until_next_slot() {
390				Ok((duration, _slot)) => std::cmp::min(authoring_duration, duration),
391				Err(_) => authoring_duration,
392			};
393
394			tracing::debug!(target: crate::LOG_TARGET, duration = ?adjusted_authoring_duration, "Adjusted proposal duration.");
395
396			let Ok(Some(candidate)) = collator
397				.build_block_and_import(
398					&parent_header,
399					&slot_claim,
400					Some(vec![CumulusDigestItem::CoreInfo(core.core_info()).to_digest_item()]),
401					(parachain_inherent_data, other_inherent_data),
402					adjusted_authoring_duration,
403					allowed_pov_size,
404				)
405				.await
406			else {
407				tracing::error!(target: crate::LOG_TARGET, "Unable to build block at slot.");
408				continue;
409			};
410
411			let new_block_hash = candidate.block.header().hash();
412
413			// Announce the newly built block to our peers.
414			collator.collator_service().announce_block(new_block_hash, None);
415
416			*last_claimed_core_selector = Some(core.core_selector());
417
418			if let Err(err) = collator_sender.unbounded_send(CollatorMessage {
419				relay_parent,
420				parent_header: parent_header.clone(),
421				parachain_candidate: candidate,
422				validation_code_hash,
423				core_index: core.core_index(),
424				max_pov_size: validation_data.max_pov_size,
425			}) {
426				tracing::error!(target: crate::LOG_TARGET, ?err, "Unable to send block to collation task.");
427				return
428			}
429		}
430	}
431}
432
433/// Translate the slot of the relay parent to the slot of the parachain.
434fn adjust_para_to_relay_parent_slot(
435	relay_header: &RelayHeader,
436	relay_chain_slot_duration: Duration,
437	para_slot_duration: SlotDuration,
438) -> Option<SlotInfo> {
439	let relay_slot = sc_consensus_babe::find_pre_digest::<RelayBlock>(&relay_header)
440		.map(|babe_pre_digest| babe_pre_digest.slot())
441		.ok()?;
442	let new_slot = Slot::from_timestamp(
443		relay_slot
444			.timestamp(SlotDuration::from_millis(relay_chain_slot_duration.as_millis() as u64))?,
445		para_slot_duration,
446	);
447	let para_slot = SlotInfo { slot: new_slot, timestamp: new_slot.timestamp(para_slot_duration)? };
448	tracing::debug!(
449		target: LOG_TARGET,
450		timestamp = ?para_slot.timestamp,
451		slot = ?para_slot.slot,
452		"Parachain slot adjusted to relay chain.",
453	);
454	Some(para_slot)
455}
456
457/// Finds a relay chain parent block at a specified offset from the best block, collecting its
458/// descendants.
459///
460/// # Returns
461/// * `Ok(RelayParentData)` - Contains the target relay parent and its ordered list of descendants
462/// * `Err(())` - If any relay chain block header cannot be retrieved
463///
464/// The function traverses backwards from the best block until it finds the block at the specified
465/// offset, collecting all blocks in between to maintain the chain of ancestry.
466pub(crate) async fn offset_relay_parent_find_descendants<RelayClient>(
467	relay_chain_data_cache: &mut RelayChainDataCache<RelayClient>,
468	relay_best_block: RelayHash,
469	relay_parent_offset: u32,
470) -> Result<RelayParentData, ()>
471where
472	RelayClient: RelayChainInterface + Clone + 'static,
473{
474	let Ok(mut relay_header) = relay_chain_data_cache
475		.get_mut_relay_chain_data(relay_best_block)
476		.await
477		.map(|d| d.relay_parent_header.clone())
478	else {
479		tracing::error!(target: LOG_TARGET, ?relay_best_block, "Unable to fetch best relay chain block header.");
480		return Err(())
481	};
482
483	if relay_parent_offset == 0 {
484		return Ok(RelayParentData::new(relay_header));
485	}
486
487	let mut required_ancestors: VecDeque<RelayHeader> = Default::default();
488	required_ancestors.push_front(relay_header.clone());
489	while required_ancestors.len() < relay_parent_offset as usize {
490		let next_header = relay_chain_data_cache
491			.get_mut_relay_chain_data(*relay_header.parent_hash())
492			.await?
493			.relay_parent_header
494			.clone();
495		required_ancestors.push_front(next_header.clone());
496		relay_header = next_header;
497	}
498
499	let relay_parent = relay_chain_data_cache
500		.get_mut_relay_chain_data(*relay_header.parent_hash())
501		.await?
502		.relay_parent_header
503		.clone();
504
505	tracing::debug!(
506		target: LOG_TARGET,
507		relay_parent_hash = %relay_parent.hash(),
508		relay_parent_num = relay_parent.number(),
509		num_descendants = required_ancestors.len(),
510		"Relay parent descendants."
511	);
512
513	Ok(RelayParentData::new_with_descendants(relay_parent, required_ancestors.into()))
514}
515
516/// Return value of [`determine_core`].
517pub(crate) struct Core {
518	selector: CoreSelector,
519	claim_queue_offset: ClaimQueueOffset,
520	core_index: CoreIndex,
521	number_of_cores: u16,
522}
523
524impl Core {
525	/// Returns the current [`CoreInfo`].
526	fn core_info(&self) -> CoreInfo {
527		CoreInfo {
528			selector: self.selector,
529			claim_queue_offset: self.claim_queue_offset,
530			number_of_cores: self.number_of_cores.into(),
531		}
532	}
533
534	/// Returns the current [`CoreSelector`].
535	pub(crate) fn core_selector(&self) -> CoreSelector {
536		self.selector
537	}
538
539	/// Returns the current [`CoreIndex`].
540	pub(crate) fn core_index(&self) -> CoreIndex {
541		self.core_index
542	}
543
544	/// Returns the total number of cores.
545	pub(crate) fn total_cores(&self) -> u16 {
546		self.number_of_cores
547	}
548}
549
550/// Determine the core for the given `para_id`.
551pub(crate) async fn determine_core<H: HeaderT, RI: RelayChainInterface + 'static>(
552	relay_chain_data_cache: &mut RelayChainDataCache<RI>,
553	relay_parent: &RelayHeader,
554	para_id: ParaId,
555	para_parent: &H,
556	relay_parent_offset: u32,
557) -> Result<Option<Core>, ()> {
558	let cores_at_offset = &relay_chain_data_cache
559		.get_mut_relay_chain_data(relay_parent.hash())
560		.await?
561		.claim_queue
562		.iter_claims_at_depth_for_para(relay_parent_offset as usize, para_id)
563		.collect::<Vec<_>>();
564
565	let is_new_relay_parent = if para_parent.number().is_zero() {
566		true
567	} else {
568		match extract_relay_parent(para_parent.digest()) {
569			Some(last_relay_parent) => last_relay_parent != relay_parent.hash(),
570			None =>
571				rpsr_digest::extract_relay_parent_storage_root(para_parent.digest())
572					.ok_or(())?
573					.0 != *relay_parent.state_root(),
574		}
575	};
576
577	let core_info = CumulusDigestItem::find_core_info(para_parent.digest());
578
579	// If we are using a new relay parent, we can start over from the start.
580	let (selector, core_index) = if is_new_relay_parent {
581		let Some(core_index) = cores_at_offset.get(0) else { return Ok(None) };
582
583		(0, *core_index)
584	} else if let Some(core_info) = core_info {
585		let selector = core_info.selector.0 as usize + 1;
586		let Some(core_index) = cores_at_offset.get(selector) else { return Ok(None) };
587
588		(selector, *core_index)
589	} else {
590		let last_claimed_core_selector = relay_chain_data_cache
591			.get_mut_relay_chain_data(relay_parent.hash())
592			.await?
593			.last_claimed_core_selector;
594
595		let selector = last_claimed_core_selector.map_or(0, |cs| cs.0 as usize) + 1;
596		let Some(core_index) = cores_at_offset.get(selector) else { return Ok(None) };
597
598		(selector, *core_index)
599	};
600
601	Ok(Some(Core {
602		selector: CoreSelector(selector as u8),
603		core_index,
604		claim_queue_offset: ClaimQueueOffset(relay_parent_offset as u8),
605		number_of_cores: cores_at_offset.len() as u16,
606	}))
607}