use codec::{Codec, Decode};
use cumulus_client_collator::{
relay_chain_driven::CollationRequest, service::ServiceInterface as CollatorServiceInterface,
};
use cumulus_client_consensus_common::ParachainBlockImportMarker;
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_core::{relay_chain::BlockId as RBlockId, CollectCollationInfo};
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_node_primitives::CollationResult;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{CollatorPair, Id as ParaId, ValidationCode};
use futures::{channel::mpsc::Receiver, prelude::*};
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
use sc_consensus::BlockImport;
use sp_api::{CallApiAt, ProvideRuntimeApi};
use sp_application_crypto::AppPublic;
use sp_blockchain::HeaderBackend;
use sp_consensus_aura::AuraApi;
use sp_core::crypto::Pair;
use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
use sp_state_machine::Backend as _;
use std::{sync::Arc, time::Duration};
use crate::collator as collator_util;
pub struct Params<BI, CIDP, Client, RClient, Proposer, CS> {
pub create_inherent_data_providers: CIDP,
pub block_import: BI,
pub para_client: Arc<Client>,
pub relay_client: RClient,
pub keystore: KeystorePtr,
pub collator_key: CollatorPair,
pub para_id: ParaId,
pub overseer_handle: OverseerHandle,
pub relay_chain_slot_duration: Duration,
pub proposer: Proposer,
pub collator_service: CS,
pub authoring_duration: Duration,
pub collation_request_receiver: Option<Receiver<CollationRequest>>,
}
pub fn run<Block, P, BI, CIDP, Client, RClient, Proposer, CS>(
params: Params<BI, CIDP, Client, RClient, Proposer, CS>,
) -> impl Future<Output = ()> + Send + 'static
where
Block: BlockT + Send,
Client: ProvideRuntimeApi<Block>
+ BlockOf
+ AuxStore
+ HeaderBackend<Block>
+ BlockBackend<Block>
+ CallApiAt<Block>
+ Send
+ Sync
+ 'static,
Client::Api: AuraApi<Block, P::Public> + CollectCollationInfo<Block>,
RClient: RelayChainInterface + Send + Clone + 'static,
CIDP: CreateInherentDataProviders<Block, ()> + Send + 'static,
CIDP::InherentDataProviders: Send,
BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
Proposer: ProposerInterface<Block> + Send + Sync + 'static,
CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
P: Pair,
P::Public: AppPublic + Member + Codec,
P::Signature: TryFrom<Vec<u8>> + Member + Codec,
{
async move {
let mut collation_requests = match params.collation_request_receiver {
Some(receiver) => receiver,
None =>
cumulus_client_collator::relay_chain_driven::init(
params.collator_key,
params.para_id,
params.overseer_handle,
)
.await,
};
let mut collator = {
let params = collator_util::Params {
create_inherent_data_providers: params.create_inherent_data_providers,
block_import: params.block_import,
relay_client: params.relay_client.clone(),
keystore: params.keystore.clone(),
para_id: params.para_id,
proposer: params.proposer,
collator_service: params.collator_service,
};
collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
};
let mut last_processed_slot = 0;
let mut last_relay_chain_block = Default::default();
while let Some(request) = collation_requests.next().await {
macro_rules! reject_with_error {
($err:expr) => {{
request.complete(None);
tracing::error!(target: crate::LOG_TARGET, err = ?{ $err });
continue;
}};
}
macro_rules! try_request {
($x:expr) => {{
match $x {
Ok(x) => x,
Err(e) => reject_with_error!(e),
}
}};
}
let validation_data = request.persisted_validation_data();
let parent_header =
try_request!(Block::Header::decode(&mut &validation_data.parent_head.0[..]));
let parent_hash = parent_header.hash();
if !collator.collator_service().check_block_status(parent_hash, &parent_header) {
continue
}
let Ok(Some(code)) =
params.para_client.state_at(parent_hash).map_err(drop).and_then(|s| {
s.storage(&sp_core::storage::well_known_keys::CODE).map_err(drop)
})
else {
continue;
};
super::check_validation_code_or_log(
&ValidationCode::from(code).hash(),
params.para_id,
¶ms.relay_client,
*request.relay_parent(),
)
.await;
let relay_parent_header =
match params.relay_client.header(RBlockId::hash(*request.relay_parent())).await {
Err(e) => reject_with_error!(e),
Ok(None) => continue, Ok(Some(h)) => h,
};
let slot_duration = match params.para_client.runtime_api().slot_duration(parent_hash) {
Ok(d) => d,
Err(e) => reject_with_error!(e),
};
let claim = match collator_util::claim_slot::<_, _, P>(
&*params.para_client,
parent_hash,
&relay_parent_header,
slot_duration,
params.relay_chain_slot_duration,
¶ms.keystore,
)
.await
{
Ok(None) => continue,
Ok(Some(c)) => c,
Err(e) => reject_with_error!(e),
};
if last_processed_slot >= *claim.slot() &&
last_relay_chain_block < *relay_parent_header.number()
{
continue
}
let (parachain_inherent_data, other_inherent_data) = try_request!(
collator
.create_inherent_data(
*request.relay_parent(),
&validation_data,
parent_hash,
claim.timestamp(),
)
.await
);
let allowed_pov_size = if cfg!(feature = "full-pov-size") {
validation_data.max_pov_size
} else {
validation_data.max_pov_size / 2
} as usize;
let maybe_collation = try_request!(
collator
.collate(
&parent_header,
&claim,
None,
(parachain_inherent_data, other_inherent_data),
params.authoring_duration,
allowed_pov_size,
)
.await
);
if let Some((collation, _, post_hash)) = maybe_collation {
let result_sender =
Some(collator.collator_service().announce_with_barrier(post_hash));
request.complete(Some(CollationResult { collation, result_sender }));
} else {
request.complete(None);
tracing::debug!(target: crate::LOG_TARGET, "No block proposal");
}
last_processed_slot = *claim.slot();
last_relay_chain_block = *relay_parent_header.number();
}
}
}