1use codec::{Codec, Encode};
19
20use super::CollatorMessage;
21use crate::{
22 collator as collator_util,
23 collators::{
24 check_validation_code_or_log,
25 slot_based::{
26 relay_chain_data_cache::{RelayChainData, RelayChainDataCache},
27 slot_timer::{SlotInfo, SlotTimer},
28 },
29 RelayParentData,
30 },
31 LOG_TARGET,
32};
33use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
34use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
35use cumulus_client_consensus_proposer::ProposerInterface;
36use cumulus_primitives_aura::{AuraUnincludedSegmentApi, Slot};
37use cumulus_primitives_core::{
38 extract_relay_parent, rpsr_digest, ClaimQueueOffset, CoreInfo, CoreSelector, CumulusDigestItem,
39 PersistedValidationData, RelayParentOffsetApi,
40};
41use cumulus_relay_chain_interface::RelayChainInterface;
42use futures::prelude::*;
43use polkadot_primitives::{
44 Block as RelayBlock, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId,
45};
46use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider};
47use sc_consensus::BlockImport;
48use sc_consensus_aura::SlotDuration;
49use sp_api::ProvideRuntimeApi;
50use sp_application_crypto::AppPublic;
51use sp_blockchain::HeaderBackend;
52use sp_consensus_aura::AuraApi;
53use sp_core::crypto::Pair;
54use sp_inherents::CreateInherentDataProviders;
55use sp_keystore::KeystorePtr;
56use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, Zero};
57use std::{collections::VecDeque, sync::Arc, time::Duration};
58
59pub struct BuilderTaskParams<
61 Block: BlockT,
62 BI,
63 CIDP,
64 Client,
65 Backend,
66 RelayClient,
67 CHP,
68 Proposer,
69 CS,
70> {
71 pub create_inherent_data_providers: CIDP,
75 pub block_import: BI,
77 pub para_client: Arc<Client>,
79 pub para_backend: Arc<Backend>,
81 pub relay_client: RelayClient,
83 pub code_hash_provider: CHP,
85 pub keystore: KeystorePtr,
87 pub para_id: ParaId,
89 pub proposer: Proposer,
91 pub collator_service: CS,
93 pub authoring_duration: Duration,
95 pub collator_sender: sc_utils::mpsc::TracingUnboundedSender<CollatorMessage<Block>>,
97 pub relay_chain_slot_duration: Duration,
99 pub slot_offset: Duration,
107 pub max_pov_percentage: Option<u32>,
110}
111
112pub fn run_block_builder<Block, P, BI, CIDP, Client, Backend, RelayClient, CHP, Proposer, CS>(
114 params: BuilderTaskParams<Block, BI, CIDP, Client, Backend, RelayClient, CHP, Proposer, CS>,
115) -> impl Future<Output = ()> + Send + 'static
116where
117 Block: BlockT,
118 Client: ProvideRuntimeApi<Block>
119 + UsageProvider<Block>
120 + BlockOf
121 + AuxStore
122 + HeaderBackend<Block>
123 + BlockBackend<Block>
124 + Send
125 + Sync
126 + 'static,
127 Client::Api:
128 AuraApi<Block, P::Public> + RelayParentOffsetApi<Block> + AuraUnincludedSegmentApi<Block>,
129 Backend: sc_client_api::Backend<Block> + 'static,
130 RelayClient: RelayChainInterface + Clone + 'static,
131 CIDP: CreateInherentDataProviders<Block, ()> + 'static,
132 CIDP::InherentDataProviders: Send,
133 BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
134 Proposer: ProposerInterface<Block> + Send + Sync + 'static,
135 CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
136 CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
137 P: Pair,
138 P::Public: AppPublic + Member + Codec,
139 P::Signature: TryFrom<Vec<u8>> + Member + Codec,
140{
141 async move {
142 tracing::info!(target: LOG_TARGET, "Starting slot-based block-builder task.");
143 let BuilderTaskParams {
144 relay_client,
145 create_inherent_data_providers,
146 para_client,
147 keystore,
148 block_import,
149 para_id,
150 proposer,
151 collator_service,
152 collator_sender,
153 code_hash_provider,
154 authoring_duration,
155 relay_chain_slot_duration,
156 para_backend,
157 slot_offset,
158 max_pov_percentage,
159 } = params;
160
161 let mut slot_timer = SlotTimer::<_, _, P>::new_with_offset(
162 para_client.clone(),
163 slot_offset,
164 relay_chain_slot_duration,
165 );
166
167 let mut collator = {
168 let params = collator_util::Params {
169 create_inherent_data_providers,
170 block_import,
171 relay_client: relay_client.clone(),
172 keystore: keystore.clone(),
173 para_id,
174 proposer,
175 collator_service,
176 };
177
178 collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
179 };
180
181 let mut relay_chain_data_cache = RelayChainDataCache::new(relay_client.clone(), para_id);
182
183 loop {
184 if slot_timer.wait_until_next_slot().await.is_err() {
186 tracing::error!(target: LOG_TARGET, "Unable to wait for next slot.");
187 return;
188 };
189
190 let Ok(relay_best_hash) = relay_client.best_block_hash().await else {
191 tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block hash.");
192 continue
193 };
194
195 let best_hash = para_client.info().best_hash;
196 let relay_parent_offset =
197 para_client.runtime_api().relay_parent_offset(best_hash).unwrap_or_default();
198
199 let Ok(para_slot_duration) = crate::slot_duration(&*para_client) else {
200 tracing::error!(target: LOG_TARGET, "Failed to fetch slot duration from runtime.");
201 continue;
202 };
203
204 let Ok(rp_data) = offset_relay_parent_find_descendants(
205 &mut relay_chain_data_cache,
206 relay_best_hash,
207 relay_parent_offset,
208 )
209 .await
210 else {
211 continue
212 };
213
214 let Some(para_slot) = adjust_para_to_relay_parent_slot(
215 rp_data.relay_parent(),
216 relay_chain_slot_duration,
217 para_slot_duration,
218 ) else {
219 continue;
220 };
221
222 let relay_parent = rp_data.relay_parent().hash();
223 let relay_parent_header = rp_data.relay_parent().clone();
224
225 let Some((included_header, parent)) =
226 crate::collators::find_parent(relay_parent, para_id, &*para_backend, &relay_client)
227 .await
228 else {
229 continue
230 };
231
232 let parent_hash = parent.hash;
233 let parent_header = &parent.header;
234
235 let core = match determine_core(
237 &mut relay_chain_data_cache,
238 &relay_parent_header,
239 para_id,
240 parent_header,
241 relay_parent_offset,
242 )
243 .await
244 {
245 Err(()) => {
246 tracing::debug!(
247 target: LOG_TARGET,
248 ?relay_parent,
249 "Failed to determine core"
250 );
251
252 continue
253 },
254 Ok(Some(cores)) => {
255 tracing::debug!(
256 target: LOG_TARGET,
257 ?relay_parent,
258 core_selector = ?cores.selector,
259 claim_queue_offset = ?cores.claim_queue_offset,
260 "Going to claim core",
261 );
262
263 cores
264 },
265 Ok(None) => {
266 tracing::debug!(
267 target: LOG_TARGET,
268 ?relay_parent,
269 "No core scheduled"
270 );
271
272 continue
273 },
274 };
275
276 let Ok(RelayChainData { max_pov_size, last_claimed_core_selector, .. }) =
277 relay_chain_data_cache.get_mut_relay_chain_data(relay_parent).await
278 else {
279 continue;
280 };
281
282 slot_timer.update_scheduling(core.total_cores().into());
283
284 collator.collator_service().check_block_status(parent_hash, parent_header);
287
288 let Ok(relay_slot) =
289 sc_consensus_babe::find_pre_digest::<RelayBlock>(&relay_parent_header)
290 .map(|babe_pre_digest| babe_pre_digest.slot())
291 else {
292 tracing::error!(target: crate::LOG_TARGET, "Relay chain does not contain babe slot. This should never happen.");
293 continue;
294 };
295
296 let included_header_hash = included_header.hash();
297
298 let slot_claim = match crate::collators::can_build_upon::<_, _, P>(
299 para_slot.slot,
300 relay_slot,
301 para_slot.timestamp,
302 parent_hash,
303 included_header_hash,
304 &*para_client,
305 &keystore,
306 )
307 .await
308 {
309 Some(slot) => slot,
310 None => {
311 tracing::debug!(
312 target: crate::LOG_TARGET,
313 unincluded_segment_len = parent.depth,
314 relay_parent = ?relay_parent,
315 relay_parent_num = %relay_parent_header.number(),
316 included_hash = ?included_header_hash,
317 included_num = %included_header.number(),
318 parent = ?parent_hash,
319 slot = ?para_slot.slot,
320 "Not building block."
321 );
322 continue
323 },
324 };
325
326 tracing::debug!(
327 target: crate::LOG_TARGET,
328 unincluded_segment_len = parent.depth,
329 relay_parent = %relay_parent,
330 relay_parent_num = %relay_parent_header.number(),
331 relay_parent_offset,
332 included_hash = %included_header_hash,
333 included_num = %included_header.number(),
334 parent = %parent_hash,
335 slot = ?para_slot.slot,
336 "Building block."
337 );
338
339 let validation_data = PersistedValidationData {
340 parent_head: parent_header.encode().into(),
341 relay_parent_number: *relay_parent_header.number(),
342 relay_parent_storage_root: *relay_parent_header.state_root(),
343 max_pov_size: *max_pov_size,
344 };
345
346 let (parachain_inherent_data, other_inherent_data) = match collator
347 .create_inherent_data_with_rp_offset(
348 relay_parent,
349 &validation_data,
350 parent_hash,
351 slot_claim.timestamp(),
352 Some(rp_data),
353 )
354 .await
355 {
356 Err(err) => {
357 tracing::error!(target: crate::LOG_TARGET, ?err);
358 break
359 },
360 Ok(x) => x,
361 };
362
363 let validation_code_hash = match code_hash_provider.code_hash_at(parent_hash) {
364 None => {
365 tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
366 break
367 },
368 Some(v) => v,
369 };
370
371 check_validation_code_or_log(
372 &validation_code_hash,
373 para_id,
374 &relay_client,
375 relay_parent,
376 )
377 .await;
378
379 let allowed_pov_size = if let Some(max_pov_percentage) = max_pov_percentage {
380 validation_data.max_pov_size * max_pov_percentage / 100
381 } else {
382 validation_data.max_pov_size * 85 / 100
387 } as usize;
388
389 let Ok(Some(candidate)) = collator
390 .build_block_and_import(
391 &parent_header,
392 &slot_claim,
393 Some(vec![CumulusDigestItem::CoreInfo(core.core_info()).to_digest_item()]),
394 (parachain_inherent_data, other_inherent_data),
395 authoring_duration,
396 allowed_pov_size,
397 )
398 .await
399 else {
400 tracing::error!(target: crate::LOG_TARGET, "Unable to build block at slot.");
401 continue;
402 };
403
404 let new_block_hash = candidate.block.header().hash();
405
406 collator.collator_service().announce_block(new_block_hash, None);
408
409 *last_claimed_core_selector = Some(core.core_selector());
410
411 if let Err(err) = collator_sender.unbounded_send(CollatorMessage {
412 relay_parent,
413 parent_header: parent_header.clone(),
414 parachain_candidate: candidate,
415 validation_code_hash,
416 core_index: core.core_index(),
417 max_pov_size: validation_data.max_pov_size,
418 }) {
419 tracing::error!(target: crate::LOG_TARGET, ?err, "Unable to send block to collation task.");
420 return
421 }
422 }
423 }
424}
425
426fn adjust_para_to_relay_parent_slot(
428 relay_header: &RelayHeader,
429 relay_chain_slot_duration: Duration,
430 para_slot_duration: SlotDuration,
431) -> Option<SlotInfo> {
432 let relay_slot = sc_consensus_babe::find_pre_digest::<RelayBlock>(&relay_header)
433 .map(|babe_pre_digest| babe_pre_digest.slot())
434 .ok()?;
435 let new_slot = Slot::from_timestamp(
436 relay_slot
437 .timestamp(SlotDuration::from_millis(relay_chain_slot_duration.as_millis() as u64))?,
438 para_slot_duration,
439 );
440 let para_slot = SlotInfo { slot: new_slot, timestamp: new_slot.timestamp(para_slot_duration)? };
441 tracing::debug!(
442 target: LOG_TARGET,
443 timestamp = ?para_slot.timestamp,
444 slot = ?para_slot.slot,
445 "Parachain slot adjusted to relay chain.",
446 );
447 Some(para_slot)
448}
449
450pub(crate) async fn offset_relay_parent_find_descendants<RelayClient>(
460 relay_chain_data_cache: &mut RelayChainDataCache<RelayClient>,
461 relay_best_block: RelayHash,
462 relay_parent_offset: u32,
463) -> Result<RelayParentData, ()>
464where
465 RelayClient: RelayChainInterface + Clone + 'static,
466{
467 let Ok(mut relay_header) = relay_chain_data_cache
468 .get_mut_relay_chain_data(relay_best_block)
469 .await
470 .map(|d| d.relay_parent_header.clone())
471 else {
472 tracing::error!(target: LOG_TARGET, ?relay_best_block, "Unable to fetch best relay chain block header.");
473 return Err(())
474 };
475
476 if relay_parent_offset == 0 {
477 return Ok(RelayParentData::new(relay_header));
478 }
479
480 let mut required_ancestors: VecDeque<RelayHeader> = Default::default();
481 required_ancestors.push_front(relay_header.clone());
482 while required_ancestors.len() < relay_parent_offset as usize {
483 let next_header = relay_chain_data_cache
484 .get_mut_relay_chain_data(*relay_header.parent_hash())
485 .await?
486 .relay_parent_header
487 .clone();
488 required_ancestors.push_front(next_header.clone());
489 relay_header = next_header;
490 }
491
492 let relay_parent = relay_chain_data_cache
493 .get_mut_relay_chain_data(*relay_header.parent_hash())
494 .await?
495 .relay_parent_header
496 .clone();
497
498 tracing::debug!(
499 target: LOG_TARGET,
500 relay_parent_hash = %relay_parent.hash(),
501 relay_parent_num = relay_parent.number(),
502 num_descendants = required_ancestors.len(),
503 "Relay parent descendants."
504 );
505
506 Ok(RelayParentData::new_with_descendants(relay_parent, required_ancestors.into()))
507}
508
509pub(crate) struct Core {
511 selector: CoreSelector,
512 claim_queue_offset: ClaimQueueOffset,
513 core_index: CoreIndex,
514 number_of_cores: u16,
515}
516
517impl Core {
518 fn core_info(&self) -> CoreInfo {
520 CoreInfo {
521 selector: self.selector,
522 claim_queue_offset: self.claim_queue_offset,
523 number_of_cores: self.number_of_cores.into(),
524 }
525 }
526
527 pub(crate) fn core_selector(&self) -> CoreSelector {
529 self.selector
530 }
531
532 pub(crate) fn core_index(&self) -> CoreIndex {
534 self.core_index
535 }
536
537 pub(crate) fn total_cores(&self) -> u16 {
539 self.number_of_cores
540 }
541}
542
543pub(crate) async fn determine_core<H: HeaderT, RI: RelayChainInterface + 'static>(
545 relay_chain_data_cache: &mut RelayChainDataCache<RI>,
546 relay_parent: &RelayHeader,
547 para_id: ParaId,
548 para_parent: &H,
549 relay_parent_offset: u32,
550) -> Result<Option<Core>, ()> {
551 let cores_at_offset = &relay_chain_data_cache
552 .get_mut_relay_chain_data(relay_parent.hash())
553 .await?
554 .claim_queue
555 .iter_claims_at_depth_for_para(relay_parent_offset as usize, para_id)
556 .collect::<Vec<_>>();
557
558 let is_new_relay_parent = if para_parent.number().is_zero() {
559 true
560 } else {
561 match extract_relay_parent(para_parent.digest()) {
562 Some(last_relay_parent) => last_relay_parent != relay_parent.hash(),
563 None =>
564 rpsr_digest::extract_relay_parent_storage_root(para_parent.digest())
565 .ok_or(())?
566 .0 != *relay_parent.state_root(),
567 }
568 };
569
570 let core_info = CumulusDigestItem::find_core_info(para_parent.digest());
571
572 let (selector, core_index) = if is_new_relay_parent {
574 let Some(core_index) = cores_at_offset.get(0) else { return Ok(None) };
575
576 (0, *core_index)
577 } else if let Some(core_info) = core_info {
578 let selector = core_info.selector.0 as usize + 1;
579 let Some(core_index) = cores_at_offset.get(selector) else { return Ok(None) };
580
581 (selector, *core_index)
582 } else {
583 let last_claimed_core_selector = relay_chain_data_cache
584 .get_mut_relay_chain_data(relay_parent.hash())
585 .await?
586 .last_claimed_core_selector;
587
588 let selector = last_claimed_core_selector.map_or(0, |cs| cs.0 as usize) + 1;
589 let Some(core_index) = cores_at_offset.get(selector) else { return Ok(None) };
590
591 (selector, *core_index)
592 };
593
594 Ok(Some(Core {
595 selector: CoreSelector(selector as u8),
596 core_index,
597 claim_queue_offset: ClaimQueueOffset(relay_parent_offset as u8),
598 number_of_cores: cores_at_offset.len() as u16,
599 }))
600}