use codec::{Codec, Encode};
use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{GetCoreSelectorApi, PersistedValidationData};
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_primitives::{Block as RelayBlock, Id as ParaId};
use super::CollatorMessage;
use crate::{
collator::{self as collator_util},
collators::{
check_validation_code_or_log,
slot_based::{
core_selector,
relay_chain_data_cache::{RelayChainData, RelayChainDataCache},
slot_timer::SlotTimer,
},
},
LOG_TARGET,
};
use futures::prelude::*;
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf, UsageProvider};
use sc_consensus::BlockImport;
use sp_api::ProvideRuntimeApi;
use sp_application_crypto::AppPublic;
use sp_blockchain::HeaderBackend;
use sp_consensus_aura::AuraApi;
use sp_core::crypto::Pair;
use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
use std::{sync::Arc, time::Duration};
pub struct BuilderTaskParams<
Block: BlockT,
BI,
CIDP,
Client,
Backend,
RelayClient,
CHP,
Proposer,
CS,
> {
pub create_inherent_data_providers: CIDP,
pub block_import: BI,
pub para_client: Arc<Client>,
pub para_backend: Arc<Backend>,
pub relay_client: RelayClient,
pub code_hash_provider: CHP,
pub keystore: KeystorePtr,
pub para_id: ParaId,
pub proposer: Proposer,
pub collator_service: CS,
pub authoring_duration: Duration,
pub collator_sender: sc_utils::mpsc::TracingUnboundedSender<CollatorMessage<Block>>,
pub relay_chain_slot_duration: Duration,
pub slot_offset: Duration,
}
pub fn run_block_builder<Block, P, BI, CIDP, Client, Backend, RelayClient, CHP, Proposer, CS>(
params: BuilderTaskParams<Block, BI, CIDP, Client, Backend, RelayClient, CHP, Proposer, CS>,
) -> impl Future<Output = ()> + Send + 'static
where
Block: BlockT,
Client: ProvideRuntimeApi<Block>
+ UsageProvider<Block>
+ BlockOf
+ AuxStore
+ HeaderBackend<Block>
+ BlockBackend<Block>
+ Send
+ Sync
+ 'static,
Client::Api:
AuraApi<Block, P::Public> + GetCoreSelectorApi<Block> + AuraUnincludedSegmentApi<Block>,
Backend: sc_client_api::Backend<Block> + 'static,
RelayClient: RelayChainInterface + Clone + 'static,
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
CIDP::InherentDataProviders: Send,
BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
Proposer: ProposerInterface<Block> + Send + Sync + 'static,
CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
P: Pair,
P::Public: AppPublic + Member + Codec,
P::Signature: TryFrom<Vec<u8>> + Member + Codec,
{
async move {
tracing::info!(target: LOG_TARGET, "Starting slot-based block-builder task.");
let BuilderTaskParams {
relay_client,
create_inherent_data_providers,
para_client,
keystore,
block_import,
para_id,
proposer,
collator_service,
collator_sender,
code_hash_provider,
authoring_duration,
relay_chain_slot_duration,
para_backend,
slot_offset,
} = params;
let mut slot_timer = SlotTimer::<_, _, P>::new_with_offset(
para_client.clone(),
slot_offset,
relay_chain_slot_duration,
);
let mut collator = {
let params = collator_util::Params {
create_inherent_data_providers,
block_import,
relay_client: relay_client.clone(),
keystore: keystore.clone(),
para_id,
proposer,
collator_service,
};
collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
};
let mut relay_chain_data_cache = RelayChainDataCache::new(relay_client.clone(), para_id);
loop {
let Some(para_slot) = slot_timer.wait_until_next_slot().await else {
return;
};
let Ok(relay_parent) = relay_client.best_block_hash().await else {
tracing::warn!(target: crate::LOG_TARGET, "Unable to fetch latest relay chain block hash.");
continue
};
let Some((included_block, parent)) =
crate::collators::find_parent(relay_parent, para_id, &*para_backend, &relay_client)
.await
else {
continue
};
let parent_hash = parent.hash;
let (core_selector, claim_queue_offset) =
match core_selector(&*para_client, parent.hash, *parent.header.number()) {
Ok(core_selector) => core_selector,
Err(err) => {
tracing::trace!(
target: crate::LOG_TARGET,
"Unable to retrieve the core selector from the runtime API: {}",
err
);
continue
},
};
let Ok(RelayChainData {
relay_parent_header,
max_pov_size,
scheduled_cores,
claimed_cores,
}) = relay_chain_data_cache
.get_mut_relay_chain_data(relay_parent, claim_queue_offset)
.await
else {
continue;
};
if scheduled_cores.is_empty() {
tracing::debug!(target: LOG_TARGET, "Parachain not scheduled, skipping slot.");
continue;
} else {
tracing::debug!(
target: LOG_TARGET,
?relay_parent,
"Parachain is scheduled on cores: {:?}",
scheduled_cores
);
}
slot_timer.update_scheduling(scheduled_cores.len() as u32);
let core_selector = core_selector.0 as usize % scheduled_cores.len();
let Some(core_index) = scheduled_cores.get(core_selector) else {
continue;
};
if !claimed_cores.insert(*core_index) {
tracing::debug!(
target: LOG_TARGET,
"Core {:?} was already claimed at this relay chain slot",
core_index
);
continue
}
let parent_header = parent.header;
collator.collator_service().check_block_status(parent_hash, &parent_header);
let Ok(relay_slot) =
sc_consensus_babe::find_pre_digest::<RelayBlock>(relay_parent_header)
.map(|babe_pre_digest| babe_pre_digest.slot())
else {
tracing::error!(target: crate::LOG_TARGET, "Relay chain does not contain babe slot. This should never happen.");
continue;
};
let slot_claim = match crate::collators::can_build_upon::<_, _, P>(
para_slot.slot,
relay_slot,
para_slot.timestamp,
parent_hash,
included_block,
&*para_client,
&keystore,
)
.await
{
Some(slot) => slot,
None => {
tracing::debug!(
target: crate::LOG_TARGET,
?core_index,
slot_info = ?para_slot,
unincluded_segment_len = parent.depth,
relay_parent = %relay_parent,
included = %included_block,
parent = %parent_hash,
"Not building block."
);
continue
},
};
tracing::debug!(
target: crate::LOG_TARGET,
?core_index,
slot_info = ?para_slot,
unincluded_segment_len = parent.depth,
relay_parent = %relay_parent,
included = %included_block,
parent = %parent_hash,
"Building block."
);
let validation_data = PersistedValidationData {
parent_head: parent_header.encode().into(),
relay_parent_number: *relay_parent_header.number(),
relay_parent_storage_root: *relay_parent_header.state_root(),
max_pov_size: *max_pov_size,
};
let (parachain_inherent_data, other_inherent_data) = match collator
.create_inherent_data(
relay_parent,
&validation_data,
parent_hash,
slot_claim.timestamp(),
)
.await
{
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err);
break
},
Ok(x) => x,
};
let validation_code_hash = match code_hash_provider.code_hash_at(parent_hash) {
None => {
tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
break
},
Some(v) => v,
};
check_validation_code_or_log(
&validation_code_hash,
para_id,
&relay_client,
relay_parent,
)
.await;
let allowed_pov_size = if cfg!(feature = "full-pov-size") {
validation_data.max_pov_size
} else {
validation_data.max_pov_size / 2
} as usize;
let Ok(Some(candidate)) = collator
.build_block_and_import(
&parent_header,
&slot_claim,
None,
(parachain_inherent_data, other_inherent_data),
authoring_duration,
allowed_pov_size,
)
.await
else {
tracing::error!(target: crate::LOG_TARGET, "Unable to build block at slot.");
continue;
};
let new_block_hash = candidate.block.header().hash();
collator.collator_service().announce_block(new_block_hash, None);
if let Err(err) = collator_sender.unbounded_send(CollatorMessage {
relay_parent,
parent_header,
parachain_candidate: candidate,
validation_code_hash,
core_index: *core_index,
max_pov_size: validation_data.max_pov_size,
}) {
tracing::error!(target: crate::LOG_TARGET, ?err, "Unable to send block to collation task.");
return
}
}
}
}