referrerpolicy=no-referrer-when-downgrade

cumulus_client_consensus_aura/collators/slot_based/
block_builder_task.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use codec::{Codec, Encode};
19
20use super::CollatorMessage;
21use crate::{
22	collator as collator_util,
23	collators::{
24		check_validation_code_or_log,
25		slot_based::{
26			relay_chain_data_cache::{RelayChainData, RelayChainDataCache},
27			slot_timer::{SlotInfo, SlotTimer},
28		},
29		RelayParentData,
30	},
31	LOG_TARGET,
32};
33use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
34use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
35use cumulus_client_consensus_proposer::ProposerInterface;
36use cumulus_primitives_aura::{AuraUnincludedSegmentApi, Slot};
37use cumulus_primitives_core::{
38	extract_relay_parent, rpsr_digest, ClaimQueueOffset, CoreInfo, CoreSelector, CumulusDigestItem,
39	PersistedValidationData, RelayParentOffsetApi,
40};
41use cumulus_relay_chain_interface::RelayChainInterface;
42use futures::prelude::*;
43use polkadot_primitives::{
44	Block as RelayBlock, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId,
45};
46use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider};
47use sc_consensus::BlockImport;
48use sc_consensus_aura::SlotDuration;
49use sp_api::ProvideRuntimeApi;
50use sp_application_crypto::AppPublic;
51use sp_blockchain::HeaderBackend;
52use sp_consensus_aura::AuraApi;
53use sp_core::crypto::Pair;
54use sp_inherents::CreateInherentDataProviders;
55use sp_keystore::KeystorePtr;
56use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, Zero};
57use std::{collections::VecDeque, sync::Arc, time::Duration};
58
59/// Parameters for [`run_block_builder`].
60pub struct BuilderTaskParams<
61	Block: BlockT,
62	BI,
63	CIDP,
64	Client,
65	Backend,
66	RelayClient,
67	CHP,
68	Proposer,
69	CS,
70> {
71	/// Inherent data providers. Only non-consensus inherent data should be provided, i.e.
72	/// the timestamp, slot, and paras inherents should be omitted, as they are set by this
73	/// collator.
74	pub create_inherent_data_providers: CIDP,
75	/// Used to actually import blocks.
76	pub block_import: BI,
77	/// The underlying para client.
78	pub para_client: Arc<Client>,
79	/// The para client's backend, used to access the database.
80	pub para_backend: Arc<Backend>,
81	/// A handle to the relay-chain client.
82	pub relay_client: RelayClient,
83	/// A validation code hash provider, used to get the current validation code hash.
84	pub code_hash_provider: CHP,
85	/// The underlying keystore, which should contain Aura consensus keys.
86	pub keystore: KeystorePtr,
87	/// The para's ID.
88	pub para_id: ParaId,
89	/// The underlying block proposer this should call into.
90	pub proposer: Proposer,
91	/// The generic collator service used to plug into this consensus engine.
92	pub collator_service: CS,
93	/// The amount of time to spend authoring each block.
94	pub authoring_duration: Duration,
95	/// Channel to send built blocks to the collation task.
96	pub collator_sender: sc_utils::mpsc::TracingUnboundedSender<CollatorMessage<Block>>,
97	/// Slot duration of the relay chain.
98	pub relay_chain_slot_duration: Duration,
99	/// Offset all time operations by this duration.
100	///
101	/// This is a time quantity that is subtracted from the actual timestamp when computing
102	/// the time left to enter a new slot. In practice, this *left-shifts* the clock time with the
103	/// intent to keep our "clock" slightly behind the relay chain one and thus reducing the
104	/// likelihood of encountering unfavorable notification arrival timings (i.e. we don't want to
105	/// wait for relay chain notifications because we woke up too early).
106	pub slot_offset: Duration,
107	/// The maximum percentage of the maximum PoV size that the collator can use.
108	/// It will be removed once https://github.com/paritytech/polkadot-sdk/issues/6020 is fixed.
109	pub max_pov_percentage: Option<u32>,
110}
111
112/// Run block-builder.
113pub fn run_block_builder<Block, P, BI, CIDP, Client, Backend, RelayClient, CHP, Proposer, CS>(
114	params: BuilderTaskParams<Block, BI, CIDP, Client, Backend, RelayClient, CHP, Proposer, CS>,
115) -> impl Future<Output = ()> + Send + 'static
116where
117	Block: BlockT,
118	Client: ProvideRuntimeApi<Block>
119		+ UsageProvider<Block>
120		+ BlockOf
121		+ AuxStore
122		+ HeaderBackend<Block>
123		+ BlockBackend<Block>
124		+ Send
125		+ Sync
126		+ 'static,
127	Client::Api:
128		AuraApi<Block, P::Public> + RelayParentOffsetApi<Block> + AuraUnincludedSegmentApi<Block>,
129	Backend: sc_client_api::Backend<Block> + 'static,
130	RelayClient: RelayChainInterface + Clone + 'static,
131	CIDP: CreateInherentDataProviders<Block, ()> + 'static,
132	CIDP::InherentDataProviders: Send,
133	BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
134	Proposer: ProposerInterface<Block> + Send + Sync + 'static,
135	CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
136	CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
137	P: Pair,
138	P::Public: AppPublic + Member + Codec,
139	P::Signature: TryFrom<Vec<u8>> + Member + Codec,
140{
141	async move {
142		tracing::info!(target: LOG_TARGET, "Starting slot-based block-builder task.");
143		let BuilderTaskParams {
144			relay_client,
145			create_inherent_data_providers,
146			para_client,
147			keystore,
148			block_import,
149			para_id,
150			proposer,
151			collator_service,
152			collator_sender,
153			code_hash_provider,
154			authoring_duration,
155			relay_chain_slot_duration,
156			para_backend,
157			slot_offset,
158			max_pov_percentage,
159		} = params;
160
161		let mut slot_timer = SlotTimer::<_, _, P>::new_with_offset(
162			para_client.clone(),
163			slot_offset,
164			relay_chain_slot_duration,
165		);
166
167		let mut collator = {
168			let params = collator_util::Params {
169				create_inherent_data_providers,
170				block_import,
171				relay_client: relay_client.clone(),
172				keystore: keystore.clone(),
173				para_id,
174				proposer,
175				collator_service,
176			};
177
178			collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
179		};
180
181		let mut relay_chain_data_cache = RelayChainDataCache::new(relay_client.clone(), para_id);
182
183		loop {
184			// We wait here until the next slot arrives.
185			if slot_timer.wait_until_next_slot().await.is_err() {
186				tracing::error!(target: LOG_TARGET, "Unable to wait for next slot.");
187				return;
188			};
189
190			let Ok(relay_best_hash) = relay_client.best_block_hash().await else {
191				tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block hash.");
192				continue
193			};
194
195			let best_hash = para_client.info().best_hash;
196			let relay_parent_offset =
197				para_client.runtime_api().relay_parent_offset(best_hash).unwrap_or_default();
198
199			let Ok(para_slot_duration) = crate::slot_duration(&*para_client) else {
200				tracing::error!(target: LOG_TARGET, "Failed to fetch slot duration from runtime.");
201				continue;
202			};
203
204			let Ok(rp_data) = offset_relay_parent_find_descendants(
205				&mut relay_chain_data_cache,
206				relay_best_hash,
207				relay_parent_offset,
208			)
209			.await
210			else {
211				continue
212			};
213
214			let Some(para_slot) = adjust_para_to_relay_parent_slot(
215				rp_data.relay_parent(),
216				relay_chain_slot_duration,
217				para_slot_duration,
218			) else {
219				continue;
220			};
221
222			let relay_parent = rp_data.relay_parent().hash();
223			let relay_parent_header = rp_data.relay_parent().clone();
224
225			let Some((included_header, parent)) =
226				crate::collators::find_parent(relay_parent, para_id, &*para_backend, &relay_client)
227					.await
228			else {
229				continue
230			};
231
232			let parent_hash = parent.hash;
233			let parent_header = &parent.header;
234
235			// Retrieve the core.
236			let core = match determine_core(
237				&mut relay_chain_data_cache,
238				&relay_parent_header,
239				para_id,
240				parent_header,
241				relay_parent_offset,
242			)
243			.await
244			{
245				Err(()) => {
246					tracing::debug!(
247						target: LOG_TARGET,
248						?relay_parent,
249						"Failed to determine core"
250					);
251
252					continue
253				},
254				Ok(Some(cores)) => {
255					tracing::debug!(
256						target: LOG_TARGET,
257						?relay_parent,
258						core_selector = ?cores.selector,
259						claim_queue_offset = ?cores.claim_queue_offset,
260						"Going to claim core",
261					);
262
263					cores
264				},
265				Ok(None) => {
266					tracing::debug!(
267						target: LOG_TARGET,
268						?relay_parent,
269						"No core scheduled"
270					);
271
272					continue
273				},
274			};
275
276			let Ok(RelayChainData { max_pov_size, last_claimed_core_selector, .. }) =
277				relay_chain_data_cache.get_mut_relay_chain_data(relay_parent).await
278			else {
279				continue;
280			};
281
282			slot_timer.update_scheduling(core.total_cores().into());
283
284			// We mainly call this to inform users at genesis if there is a mismatch with the
285			// on-chain data.
286			collator.collator_service().check_block_status(parent_hash, parent_header);
287
288			let Ok(relay_slot) =
289				sc_consensus_babe::find_pre_digest::<RelayBlock>(&relay_parent_header)
290					.map(|babe_pre_digest| babe_pre_digest.slot())
291			else {
292				tracing::error!(target: crate::LOG_TARGET, "Relay chain does not contain babe slot. This should never happen.");
293				continue;
294			};
295
296			let included_header_hash = included_header.hash();
297
298			let slot_claim = match crate::collators::can_build_upon::<_, _, P>(
299				para_slot.slot,
300				relay_slot,
301				para_slot.timestamp,
302				parent_hash,
303				included_header_hash,
304				&*para_client,
305				&keystore,
306			)
307			.await
308			{
309				Some(slot) => slot,
310				None => {
311					tracing::debug!(
312						target: crate::LOG_TARGET,
313						unincluded_segment_len = parent.depth,
314						relay_parent = ?relay_parent,
315						relay_parent_num = %relay_parent_header.number(),
316						included_hash = ?included_header_hash,
317						included_num = %included_header.number(),
318						parent = ?parent_hash,
319						slot = ?para_slot.slot,
320						"Not building block."
321					);
322					continue
323				},
324			};
325
326			tracing::debug!(
327				target: crate::LOG_TARGET,
328				unincluded_segment_len = parent.depth,
329				relay_parent = %relay_parent,
330				relay_parent_num = %relay_parent_header.number(),
331				relay_parent_offset,
332				included_hash = %included_header_hash,
333				included_num = %included_header.number(),
334				parent = %parent_hash,
335				slot = ?para_slot.slot,
336				"Building block."
337			);
338
339			let validation_data = PersistedValidationData {
340				parent_head: parent_header.encode().into(),
341				relay_parent_number: *relay_parent_header.number(),
342				relay_parent_storage_root: *relay_parent_header.state_root(),
343				max_pov_size: *max_pov_size,
344			};
345
346			let (parachain_inherent_data, other_inherent_data) = match collator
347				.create_inherent_data_with_rp_offset(
348					relay_parent,
349					&validation_data,
350					parent_hash,
351					slot_claim.timestamp(),
352					Some(rp_data),
353				)
354				.await
355			{
356				Err(err) => {
357					tracing::error!(target: crate::LOG_TARGET, ?err);
358					break
359				},
360				Ok(x) => x,
361			};
362
363			let validation_code_hash = match code_hash_provider.code_hash_at(parent_hash) {
364				None => {
365					tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
366					break
367				},
368				Some(v) => v,
369			};
370
371			check_validation_code_or_log(
372				&validation_code_hash,
373				para_id,
374				&relay_client,
375				relay_parent,
376			)
377			.await;
378
379			let allowed_pov_size = if let Some(max_pov_percentage) = max_pov_percentage {
380				validation_data.max_pov_size * max_pov_percentage / 100
381			} else {
382				// Set the block limit to 85% of the maximum PoV size.
383				//
384				// Once https://github.com/paritytech/polkadot-sdk/issues/6020 issue is
385				// fixed, this should be removed.
386				validation_data.max_pov_size * 85 / 100
387			} as usize;
388
389			let Ok(Some(candidate)) = collator
390				.build_block_and_import(
391					&parent_header,
392					&slot_claim,
393					Some(vec![CumulusDigestItem::CoreInfo(core.core_info()).to_digest_item()]),
394					(parachain_inherent_data, other_inherent_data),
395					authoring_duration,
396					allowed_pov_size,
397				)
398				.await
399			else {
400				tracing::error!(target: crate::LOG_TARGET, "Unable to build block at slot.");
401				continue;
402			};
403
404			let new_block_hash = candidate.block.header().hash();
405
406			// Announce the newly built block to our peers.
407			collator.collator_service().announce_block(new_block_hash, None);
408
409			*last_claimed_core_selector = Some(core.core_selector());
410
411			if let Err(err) = collator_sender.unbounded_send(CollatorMessage {
412				relay_parent,
413				parent_header: parent_header.clone(),
414				parachain_candidate: candidate,
415				validation_code_hash,
416				core_index: core.core_index(),
417				max_pov_size: validation_data.max_pov_size,
418			}) {
419				tracing::error!(target: crate::LOG_TARGET, ?err, "Unable to send block to collation task.");
420				return
421			}
422		}
423	}
424}
425
426/// Translate the slot of the relay parent to the slot of the parachain.
427fn adjust_para_to_relay_parent_slot(
428	relay_header: &RelayHeader,
429	relay_chain_slot_duration: Duration,
430	para_slot_duration: SlotDuration,
431) -> Option<SlotInfo> {
432	let relay_slot = sc_consensus_babe::find_pre_digest::<RelayBlock>(&relay_header)
433		.map(|babe_pre_digest| babe_pre_digest.slot())
434		.ok()?;
435	let new_slot = Slot::from_timestamp(
436		relay_slot
437			.timestamp(SlotDuration::from_millis(relay_chain_slot_duration.as_millis() as u64))?,
438		para_slot_duration,
439	);
440	let para_slot = SlotInfo { slot: new_slot, timestamp: new_slot.timestamp(para_slot_duration)? };
441	tracing::debug!(
442		target: LOG_TARGET,
443		timestamp = ?para_slot.timestamp,
444		slot = ?para_slot.slot,
445		"Parachain slot adjusted to relay chain.",
446	);
447	Some(para_slot)
448}
449
450/// Finds a relay chain parent block at a specified offset from the best block, collecting its
451/// descendants.
452///
453/// # Returns
454/// * `Ok(RelayParentData)` - Contains the target relay parent and its ordered list of descendants
455/// * `Err(())` - If any relay chain block header cannot be retrieved
456///
457/// The function traverses backwards from the best block until it finds the block at the specified
458/// offset, collecting all blocks in between to maintain the chain of ancestry.
459pub(crate) async fn offset_relay_parent_find_descendants<RelayClient>(
460	relay_chain_data_cache: &mut RelayChainDataCache<RelayClient>,
461	relay_best_block: RelayHash,
462	relay_parent_offset: u32,
463) -> Result<RelayParentData, ()>
464where
465	RelayClient: RelayChainInterface + Clone + 'static,
466{
467	let Ok(mut relay_header) = relay_chain_data_cache
468		.get_mut_relay_chain_data(relay_best_block)
469		.await
470		.map(|d| d.relay_parent_header.clone())
471	else {
472		tracing::error!(target: LOG_TARGET, ?relay_best_block, "Unable to fetch best relay chain block header.");
473		return Err(())
474	};
475
476	if relay_parent_offset == 0 {
477		return Ok(RelayParentData::new(relay_header));
478	}
479
480	let mut required_ancestors: VecDeque<RelayHeader> = Default::default();
481	required_ancestors.push_front(relay_header.clone());
482	while required_ancestors.len() < relay_parent_offset as usize {
483		let next_header = relay_chain_data_cache
484			.get_mut_relay_chain_data(*relay_header.parent_hash())
485			.await?
486			.relay_parent_header
487			.clone();
488		required_ancestors.push_front(next_header.clone());
489		relay_header = next_header;
490	}
491
492	let relay_parent = relay_chain_data_cache
493		.get_mut_relay_chain_data(*relay_header.parent_hash())
494		.await?
495		.relay_parent_header
496		.clone();
497
498	tracing::debug!(
499		target: LOG_TARGET,
500		relay_parent_hash = %relay_parent.hash(),
501		relay_parent_num = relay_parent.number(),
502		num_descendants = required_ancestors.len(),
503		"Relay parent descendants."
504	);
505
506	Ok(RelayParentData::new_with_descendants(relay_parent, required_ancestors.into()))
507}
508
509/// Return value of [`determine_core`].
510pub(crate) struct Core {
511	selector: CoreSelector,
512	claim_queue_offset: ClaimQueueOffset,
513	core_index: CoreIndex,
514	number_of_cores: u16,
515}
516
517impl Core {
518	/// Returns the current [`CoreInfo`].
519	fn core_info(&self) -> CoreInfo {
520		CoreInfo {
521			selector: self.selector,
522			claim_queue_offset: self.claim_queue_offset,
523			number_of_cores: self.number_of_cores.into(),
524		}
525	}
526
527	/// Returns the current [`CoreSelector`].
528	pub(crate) fn core_selector(&self) -> CoreSelector {
529		self.selector
530	}
531
532	/// Returns the current [`CoreIndex`].
533	pub(crate) fn core_index(&self) -> CoreIndex {
534		self.core_index
535	}
536
537	/// Returns the total number of cores.
538	pub(crate) fn total_cores(&self) -> u16 {
539		self.number_of_cores
540	}
541}
542
543/// Determine the core for the given `para_id`.
544pub(crate) async fn determine_core<H: HeaderT, RI: RelayChainInterface + 'static>(
545	relay_chain_data_cache: &mut RelayChainDataCache<RI>,
546	relay_parent: &RelayHeader,
547	para_id: ParaId,
548	para_parent: &H,
549	relay_parent_offset: u32,
550) -> Result<Option<Core>, ()> {
551	let cores_at_offset = &relay_chain_data_cache
552		.get_mut_relay_chain_data(relay_parent.hash())
553		.await?
554		.claim_queue
555		.iter_claims_at_depth_for_para(relay_parent_offset as usize, para_id)
556		.collect::<Vec<_>>();
557
558	let is_new_relay_parent = if para_parent.number().is_zero() {
559		true
560	} else {
561		match extract_relay_parent(para_parent.digest()) {
562			Some(last_relay_parent) => last_relay_parent != relay_parent.hash(),
563			None =>
564				rpsr_digest::extract_relay_parent_storage_root(para_parent.digest())
565					.ok_or(())?
566					.0 != *relay_parent.state_root(),
567		}
568	};
569
570	let core_info = CumulusDigestItem::find_core_info(para_parent.digest());
571
572	// If we are using a new relay parent, we can start over from the start.
573	let (selector, core_index) = if is_new_relay_parent {
574		let Some(core_index) = cores_at_offset.get(0) else { return Ok(None) };
575
576		(0, *core_index)
577	} else if let Some(core_info) = core_info {
578		let selector = core_info.selector.0 as usize + 1;
579		let Some(core_index) = cores_at_offset.get(selector) else { return Ok(None) };
580
581		(selector, *core_index)
582	} else {
583		let last_claimed_core_selector = relay_chain_data_cache
584			.get_mut_relay_chain_data(relay_parent.hash())
585			.await?
586			.last_claimed_core_selector;
587
588		let selector = last_claimed_core_selector.map_or(0, |cs| cs.0 as usize) + 1;
589		let Some(core_index) = cores_at_offset.get(selector) else { return Ok(None) };
590
591		(selector, *core_index)
592	};
593
594	Ok(Some(Core {
595		selector: CoreSelector(selector as u8),
596		core_index,
597		claim_queue_offset: ClaimQueueOffset(relay_parent_offset as u8),
598		number_of_cores: cores_at_offset.len() as u16,
599	}))
600}