1use codec::{Codec, Encode};
19
20use super::CollatorMessage;
21use crate::{
22 collator as collator_util,
23 collators::{
24 check_validation_code_or_log,
25 slot_based::{
26 relay_chain_data_cache::{RelayChainData, RelayChainDataCache},
27 slot_timer::{SlotInfo, SlotTimer},
28 },
29 RelayParentData,
30 },
31 LOG_TARGET,
32};
33use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
34use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
35use cumulus_client_consensus_proposer::ProposerInterface;
36use cumulus_primitives_aura::{AuraUnincludedSegmentApi, Slot};
37use cumulus_primitives_core::{
38 extract_relay_parent, rpsr_digest, ClaimQueueOffset, CoreInfo, CoreSelector, CumulusDigestItem,
39 PersistedValidationData, RelayParentOffsetApi,
40};
41use cumulus_relay_chain_interface::RelayChainInterface;
42use futures::prelude::*;
43use polkadot_primitives::{
44 Block as RelayBlock, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId,
45};
46use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider};
47use sc_consensus::BlockImport;
48use sc_consensus_aura::SlotDuration;
49use sp_api::ProvideRuntimeApi;
50use sp_application_crypto::AppPublic;
51use sp_blockchain::HeaderBackend;
52use sp_consensus_aura::AuraApi;
53use sp_core::crypto::Pair;
54use sp_inherents::CreateInherentDataProviders;
55use sp_keystore::KeystorePtr;
56use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, Zero};
57use std::{collections::VecDeque, sync::Arc, time::Duration};
58
59pub struct BuilderTaskParams<
61 Block: BlockT,
62 BI,
63 CIDP,
64 Client,
65 Backend,
66 RelayClient,
67 CHP,
68 Proposer,
69 CS,
70> {
71 pub create_inherent_data_providers: CIDP,
75 pub block_import: BI,
77 pub para_client: Arc<Client>,
79 pub para_backend: Arc<Backend>,
81 pub relay_client: RelayClient,
83 pub code_hash_provider: CHP,
85 pub keystore: KeystorePtr,
87 pub para_id: ParaId,
89 pub proposer: Proposer,
91 pub collator_service: CS,
93 pub authoring_duration: Duration,
95 pub collator_sender: sc_utils::mpsc::TracingUnboundedSender<CollatorMessage<Block>>,
97 pub relay_chain_slot_duration: Duration,
99 pub slot_offset: Duration,
107 pub max_pov_percentage: Option<u32>,
110}
111
112pub fn run_block_builder<Block, P, BI, CIDP, Client, Backend, RelayClient, CHP, Proposer, CS>(
114 params: BuilderTaskParams<Block, BI, CIDP, Client, Backend, RelayClient, CHP, Proposer, CS>,
115) -> impl Future<Output = ()> + Send + 'static
116where
117 Block: BlockT,
118 Client: ProvideRuntimeApi<Block>
119 + UsageProvider<Block>
120 + BlockOf
121 + AuxStore
122 + HeaderBackend<Block>
123 + BlockBackend<Block>
124 + Send
125 + Sync
126 + 'static,
127 Client::Api:
128 AuraApi<Block, P::Public> + RelayParentOffsetApi<Block> + AuraUnincludedSegmentApi<Block>,
129 Backend: sc_client_api::Backend<Block> + 'static,
130 RelayClient: RelayChainInterface + Clone + 'static,
131 CIDP: CreateInherentDataProviders<Block, ()> + 'static,
132 CIDP::InherentDataProviders: Send,
133 BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
134 Proposer: ProposerInterface<Block> + Send + Sync + 'static,
135 CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
136 CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
137 P: Pair,
138 P::Public: AppPublic + Member + Codec,
139 P::Signature: TryFrom<Vec<u8>> + Member + Codec,
140{
141 async move {
142 tracing::info!(target: LOG_TARGET, "Starting slot-based block-builder task.");
143 let BuilderTaskParams {
144 relay_client,
145 create_inherent_data_providers,
146 para_client,
147 keystore,
148 block_import,
149 para_id,
150 proposer,
151 collator_service,
152 collator_sender,
153 code_hash_provider,
154 authoring_duration,
155 relay_chain_slot_duration,
156 para_backend,
157 slot_offset,
158 max_pov_percentage,
159 } = params;
160
161 let mut slot_timer = SlotTimer::<_, _, P>::new_with_offset(
162 para_client.clone(),
163 slot_offset,
164 relay_chain_slot_duration,
165 );
166
167 let mut collator = {
168 let params = collator_util::Params {
169 create_inherent_data_providers,
170 block_import,
171 relay_client: relay_client.clone(),
172 keystore: keystore.clone(),
173 para_id,
174 proposer,
175 collator_service,
176 };
177
178 collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
179 };
180
181 let mut relay_chain_data_cache = RelayChainDataCache::new(relay_client.clone(), para_id);
182
183 loop {
184 if slot_timer.wait_until_next_slot().await.is_err() {
186 tracing::error!(target: LOG_TARGET, "Unable to wait for next slot.");
187 return;
188 };
189
190 let Ok(relay_best_hash) = relay_client.best_block_hash().await else {
191 tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block hash.");
192 continue
193 };
194
195 let best_hash = para_client.info().best_hash;
196 let relay_parent_offset =
197 para_client.runtime_api().relay_parent_offset(best_hash).unwrap_or_default();
198
199 let Ok(para_slot_duration) = crate::slot_duration(&*para_client) else {
200 tracing::error!(target: LOG_TARGET, "Failed to fetch slot duration from runtime.");
201 continue;
202 };
203
204 let Ok(rp_data) = offset_relay_parent_find_descendants(
205 &mut relay_chain_data_cache,
206 relay_best_hash,
207 relay_parent_offset,
208 )
209 .await
210 else {
211 continue
212 };
213
214 let Some(para_slot) = adjust_para_to_relay_parent_slot(
215 rp_data.relay_parent(),
216 relay_chain_slot_duration,
217 para_slot_duration,
218 ) else {
219 continue;
220 };
221
222 let relay_parent = rp_data.relay_parent().hash();
223 let relay_parent_header = rp_data.relay_parent().clone();
224
225 let Some((included_header, parent)) =
226 crate::collators::find_parent(relay_parent, para_id, &*para_backend, &relay_client)
227 .await
228 else {
229 continue
230 };
231
232 let parent_hash = parent.hash;
233 let parent_header = &parent.header;
234
235 let core = match determine_core(
237 &mut relay_chain_data_cache,
238 &relay_parent_header,
239 para_id,
240 parent_header,
241 relay_parent_offset,
242 )
243 .await
244 {
245 Err(()) => {
246 tracing::debug!(
247 target: LOG_TARGET,
248 ?relay_parent,
249 "Failed to determine core"
250 );
251
252 continue
253 },
254 Ok(Some(cores)) => {
255 tracing::debug!(
256 target: LOG_TARGET,
257 ?relay_parent,
258 core_selector = ?cores.selector,
259 claim_queue_offset = ?cores.claim_queue_offset,
260 "Going to claim core",
261 );
262
263 cores
264 },
265 Ok(None) => {
266 tracing::debug!(
267 target: LOG_TARGET,
268 ?relay_parent,
269 "No core scheduled"
270 );
271
272 continue
273 },
274 };
275
276 let Ok(RelayChainData { max_pov_size, last_claimed_core_selector, .. }) =
277 relay_chain_data_cache.get_mut_relay_chain_data(relay_parent).await
278 else {
279 continue;
280 };
281
282 slot_timer.update_scheduling(core.total_cores().into());
283
284 collator.collator_service().check_block_status(parent_hash, parent_header);
287
288 let Ok(relay_slot) =
289 sc_consensus_babe::find_pre_digest::<RelayBlock>(&relay_parent_header)
290 .map(|babe_pre_digest| babe_pre_digest.slot())
291 else {
292 tracing::error!(target: crate::LOG_TARGET, "Relay chain does not contain babe slot. This should never happen.");
293 continue;
294 };
295
296 let included_header_hash = included_header.hash();
297
298 let slot_claim = match crate::collators::can_build_upon::<_, _, P>(
299 para_slot.slot,
300 relay_slot,
301 para_slot.timestamp,
302 parent_hash,
303 included_header_hash,
304 &*para_client,
305 &keystore,
306 )
307 .await
308 {
309 Some(slot) => slot,
310 None => {
311 tracing::debug!(
312 target: crate::LOG_TARGET,
313 unincluded_segment_len = parent.depth,
314 relay_parent = ?relay_parent,
315 relay_parent_num = %relay_parent_header.number(),
316 included_hash = ?included_header_hash,
317 included_num = %included_header.number(),
318 parent = ?parent_hash,
319 slot = ?para_slot.slot,
320 "Not building block."
321 );
322 continue
323 },
324 };
325
326 tracing::debug!(
327 target: crate::LOG_TARGET,
328 unincluded_segment_len = parent.depth,
329 relay_parent = %relay_parent,
330 relay_parent_num = %relay_parent_header.number(),
331 relay_parent_offset,
332 included_hash = %included_header_hash,
333 included_num = %included_header.number(),
334 parent = %parent_hash,
335 slot = ?para_slot.slot,
336 "Building block."
337 );
338
339 let validation_data = PersistedValidationData {
340 parent_head: parent_header.encode().into(),
341 relay_parent_number: *relay_parent_header.number(),
342 relay_parent_storage_root: *relay_parent_header.state_root(),
343 max_pov_size: *max_pov_size,
344 };
345
346 let (parachain_inherent_data, other_inherent_data) = match collator
347 .create_inherent_data_with_rp_offset(
348 relay_parent,
349 &validation_data,
350 parent_hash,
351 slot_claim.timestamp(),
352 Some(rp_data),
353 )
354 .await
355 {
356 Err(err) => {
357 tracing::error!(target: crate::LOG_TARGET, ?err);
358 break
359 },
360 Ok(x) => x,
361 };
362
363 let validation_code_hash = match code_hash_provider.code_hash_at(parent_hash) {
364 None => {
365 tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
366 break
367 },
368 Some(v) => v,
369 };
370
371 check_validation_code_or_log(
372 &validation_code_hash,
373 para_id,
374 &relay_client,
375 relay_parent,
376 )
377 .await;
378
379 let allowed_pov_size = if let Some(max_pov_percentage) = max_pov_percentage {
380 validation_data.max_pov_size * max_pov_percentage / 100
381 } else {
382 validation_data.max_pov_size * 85 / 100
387 } as usize;
388
389 let adjusted_authoring_duration = match slot_timer.time_until_next_slot() {
390 Ok((duration, _slot)) => std::cmp::min(authoring_duration, duration),
391 Err(_) => authoring_duration,
392 };
393
394 tracing::debug!(target: crate::LOG_TARGET, duration = ?adjusted_authoring_duration, "Adjusted proposal duration.");
395
396 let Ok(Some(candidate)) = collator
397 .build_block_and_import(
398 &parent_header,
399 &slot_claim,
400 Some(vec![CumulusDigestItem::CoreInfo(core.core_info()).to_digest_item()]),
401 (parachain_inherent_data, other_inherent_data),
402 adjusted_authoring_duration,
403 allowed_pov_size,
404 )
405 .await
406 else {
407 tracing::error!(target: crate::LOG_TARGET, "Unable to build block at slot.");
408 continue;
409 };
410
411 let new_block_hash = candidate.block.header().hash();
412
413 collator.collator_service().announce_block(new_block_hash, None);
415
416 *last_claimed_core_selector = Some(core.core_selector());
417
418 if let Err(err) = collator_sender.unbounded_send(CollatorMessage {
419 relay_parent,
420 parent_header: parent_header.clone(),
421 parachain_candidate: candidate,
422 validation_code_hash,
423 core_index: core.core_index(),
424 max_pov_size: validation_data.max_pov_size,
425 }) {
426 tracing::error!(target: crate::LOG_TARGET, ?err, "Unable to send block to collation task.");
427 return
428 }
429 }
430 }
431}
432
433fn adjust_para_to_relay_parent_slot(
435 relay_header: &RelayHeader,
436 relay_chain_slot_duration: Duration,
437 para_slot_duration: SlotDuration,
438) -> Option<SlotInfo> {
439 let relay_slot = sc_consensus_babe::find_pre_digest::<RelayBlock>(&relay_header)
440 .map(|babe_pre_digest| babe_pre_digest.slot())
441 .ok()?;
442 let new_slot = Slot::from_timestamp(
443 relay_slot
444 .timestamp(SlotDuration::from_millis(relay_chain_slot_duration.as_millis() as u64))?,
445 para_slot_duration,
446 );
447 let para_slot = SlotInfo { slot: new_slot, timestamp: new_slot.timestamp(para_slot_duration)? };
448 tracing::debug!(
449 target: LOG_TARGET,
450 timestamp = ?para_slot.timestamp,
451 slot = ?para_slot.slot,
452 "Parachain slot adjusted to relay chain.",
453 );
454 Some(para_slot)
455}
456
457pub(crate) async fn offset_relay_parent_find_descendants<RelayClient>(
467 relay_chain_data_cache: &mut RelayChainDataCache<RelayClient>,
468 relay_best_block: RelayHash,
469 relay_parent_offset: u32,
470) -> Result<RelayParentData, ()>
471where
472 RelayClient: RelayChainInterface + Clone + 'static,
473{
474 let Ok(mut relay_header) = relay_chain_data_cache
475 .get_mut_relay_chain_data(relay_best_block)
476 .await
477 .map(|d| d.relay_parent_header.clone())
478 else {
479 tracing::error!(target: LOG_TARGET, ?relay_best_block, "Unable to fetch best relay chain block header.");
480 return Err(())
481 };
482
483 if relay_parent_offset == 0 {
484 return Ok(RelayParentData::new(relay_header));
485 }
486
487 let mut required_ancestors: VecDeque<RelayHeader> = Default::default();
488 required_ancestors.push_front(relay_header.clone());
489 while required_ancestors.len() < relay_parent_offset as usize {
490 let next_header = relay_chain_data_cache
491 .get_mut_relay_chain_data(*relay_header.parent_hash())
492 .await?
493 .relay_parent_header
494 .clone();
495 required_ancestors.push_front(next_header.clone());
496 relay_header = next_header;
497 }
498
499 let relay_parent = relay_chain_data_cache
500 .get_mut_relay_chain_data(*relay_header.parent_hash())
501 .await?
502 .relay_parent_header
503 .clone();
504
505 tracing::debug!(
506 target: LOG_TARGET,
507 relay_parent_hash = %relay_parent.hash(),
508 relay_parent_num = relay_parent.number(),
509 num_descendants = required_ancestors.len(),
510 "Relay parent descendants."
511 );
512
513 Ok(RelayParentData::new_with_descendants(relay_parent, required_ancestors.into()))
514}
515
516pub(crate) struct Core {
518 selector: CoreSelector,
519 claim_queue_offset: ClaimQueueOffset,
520 core_index: CoreIndex,
521 number_of_cores: u16,
522}
523
524impl Core {
525 fn core_info(&self) -> CoreInfo {
527 CoreInfo {
528 selector: self.selector,
529 claim_queue_offset: self.claim_queue_offset,
530 number_of_cores: self.number_of_cores.into(),
531 }
532 }
533
534 pub(crate) fn core_selector(&self) -> CoreSelector {
536 self.selector
537 }
538
539 pub(crate) fn core_index(&self) -> CoreIndex {
541 self.core_index
542 }
543
544 pub(crate) fn total_cores(&self) -> u16 {
546 self.number_of_cores
547 }
548}
549
550pub(crate) async fn determine_core<H: HeaderT, RI: RelayChainInterface + 'static>(
552 relay_chain_data_cache: &mut RelayChainDataCache<RI>,
553 relay_parent: &RelayHeader,
554 para_id: ParaId,
555 para_parent: &H,
556 relay_parent_offset: u32,
557) -> Result<Option<Core>, ()> {
558 let cores_at_offset = &relay_chain_data_cache
559 .get_mut_relay_chain_data(relay_parent.hash())
560 .await?
561 .claim_queue
562 .iter_claims_at_depth_for_para(relay_parent_offset as usize, para_id)
563 .collect::<Vec<_>>();
564
565 let is_new_relay_parent = if para_parent.number().is_zero() {
566 true
567 } else {
568 match extract_relay_parent(para_parent.digest()) {
569 Some(last_relay_parent) => last_relay_parent != relay_parent.hash(),
570 None =>
571 rpsr_digest::extract_relay_parent_storage_root(para_parent.digest())
572 .ok_or(())?
573 .0 != *relay_parent.state_root(),
574 }
575 };
576
577 let core_info = CumulusDigestItem::find_core_info(para_parent.digest());
578
579 let (selector, core_index) = if is_new_relay_parent {
581 let Some(core_index) = cores_at_offset.get(0) else { return Ok(None) };
582
583 (0, *core_index)
584 } else if let Some(core_info) = core_info {
585 let selector = core_info.selector.0 as usize + 1;
586 let Some(core_index) = cores_at_offset.get(selector) else { return Ok(None) };
587
588 (selector, *core_index)
589 } else {
590 let last_claimed_core_selector = relay_chain_data_cache
591 .get_mut_relay_chain_data(relay_parent.hash())
592 .await?
593 .last_claimed_core_selector;
594
595 let selector = last_claimed_core_selector.map_or(0, |cs| cs.0 as usize) + 1;
596 let Some(core_index) = cores_at_offset.get(selector) else { return Ok(None) };
597
598 (selector, *core_index)
599 };
600
601 Ok(Some(Core {
602 selector: CoreSelector(selector as u8),
603 core_index,
604 claim_queue_offset: ClaimQueueOffset(relay_parent_offset as u8),
605 number_of_cores: cores_at_offset.len() as u16,
606 }))
607}