use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use schnellru::{ByLength, LruMap};
use sp_blockchain::Error as ClientError;
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use cumulus_client_pov_recovery::{RecoveryKind, RecoveryRequest};
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use polkadot_primitives::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption};
use codec::Decode;
use futures::{channel::mpsc::Sender, pin_mut, select, FutureExt, Stream, StreamExt};
use std::sync::Arc;
const LOG_TARGET: &str = "cumulus-consensus";
const FINALIZATION_CACHE_SIZE: u32 = 40;
fn handle_new_finalized_head<P, Block, B>(
parachain: &Arc<P>,
finalized_head: Vec<u8>,
last_seen_finalized_hashes: &mut LruMap<Block::Hash, ()>,
) where
Block: BlockT,
B: Backend<Block>,
P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
{
let header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
return
},
};
let hash = header.hash();
last_seen_finalized_hashes.insert(hash, ());
if parachain.usage_info().chain.finalized_number < *header.number() {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Attempting to finalize header.",
);
if let Err(e) = parachain.finalize_block(hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
),
}
}
}
}
async fn follow_finalized_head<P, Block, B, R>(para_id: ParaId, parachain: Arc<P>, relay_chain: R)
where
Block: BlockT,
P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
R: RelayChainInterface + Clone,
B: Backend<Block>,
{
let finalized_heads = match finalized_heads(relay_chain, para_id).await {
Ok(finalized_heads_stream) => finalized_heads_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
return
},
};
let mut imported_blocks = parachain.import_notification_stream().fuse();
pin_mut!(finalized_heads);
let mut last_seen_finalized_hashes = LruMap::new(ByLength::new(FINALIZATION_CACHE_SIZE));
loop {
select! {
fin = finalized_heads.next() => {
match fin {
Some(finalized_head) =>
handle_new_finalized_head(¶chain, finalized_head, &mut last_seen_finalized_hashes),
None => {
tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
return
}
}
},
imported = imported_blocks.next() => {
match imported {
Some(imported_block) => {
if last_seen_finalized_hashes.peek(&imported_block.hash).is_some() {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?imported_block.hash,
"Setting newly imported block as finalized.",
);
if let Err(e) = parachain.finalize_block(imported_block.hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: LOG_TARGET,
block_hash = ?imported_block.hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: LOG_TARGET,
error = ?e,
block_hash = ?imported_block.hash,
"Failed to finalize block",
),
}
}
}
},
None => {
tracing::debug!(
target: LOG_TARGET,
"Stopping following imported blocks.",
);
return
}
}
}
}
}
}
pub async fn run_parachain_consensus<P, R, Block, B>(
para_id: ParaId,
parachain: Arc<P>,
relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: Finalizer<Block, B>
+ UsageProvider<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>,
for<'a> &'a P: BlockImport<Block>,
R: RelayChainInterface + Clone,
B: Backend<Block>,
{
let follow_new_best = follow_new_best(
para_id,
parachain.clone(),
relay_chain.clone(),
announce_block,
recovery_chan_tx,
);
let follow_finalized_head = follow_finalized_head(para_id, parachain, relay_chain);
select! {
_ = follow_new_best.fuse() => {},
_ = follow_finalized_head.fuse() => {},
}
}
async fn follow_new_best<P, R, Block, B>(
para_id: ParaId,
parachain: Arc<P>,
relay_chain: R,
announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
mut recovery_chan_tx: Option<Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: Finalizer<Block, B>
+ UsageProvider<Block>
+ Send
+ Sync
+ BlockBackend<Block>
+ BlockchainEvents<Block>,
for<'a> &'a P: BlockImport<Block>,
R: RelayChainInterface + Clone,
B: Backend<Block>,
{
let new_best_heads = match new_best_heads(relay_chain, para_id).await {
Ok(best_heads_stream) => best_heads_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
return
},
};
pin_mut!(new_best_heads);
let mut imported_blocks = parachain.import_notification_stream().fuse();
let mut unset_best_header = None;
loop {
select! {
h = new_best_heads.next() => {
match h {
Some(h) => handle_new_best_parachain_head(
h,
&*parachain,
&mut unset_best_header,
recovery_chan_tx.as_mut(),
).await,
None => {
tracing::debug!(
target: LOG_TARGET,
"Stopping following new best.",
);
return
}
}
},
i = imported_blocks.next() => {
match i {
Some(i) => handle_new_block_imported(
i,
&mut unset_best_header,
&*parachain,
&*announce_block,
).await,
None => {
tracing::debug!(
target: LOG_TARGET,
"Stopping following imported blocks.",
);
return
}
}
},
}
}
}
async fn handle_new_block_imported<Block, P>(
notification: BlockImportNotification<Block>,
unset_best_header_opt: &mut Option<Block::Header>,
parachain: &P,
announce_block: &(dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync),
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a P: BlockImport<Block>,
{
if notification.origin != BlockOrigin::Own {
announce_block(notification.hash, None);
}
let unset_best_header = match (notification.is_new_best, &unset_best_header_opt) {
(true, _) | (_, None) => return,
(false, Some(ref u)) => u,
};
let unset_hash = if notification.header.number() < unset_best_header.number() {
return
} else if notification.header.number() == unset_best_header.number() {
let unset_hash = unset_best_header.hash();
if unset_hash != notification.hash {
return
} else {
unset_hash
}
} else {
unset_best_header.hash()
};
match parachain.block_status(unset_hash) {
Ok(BlockStatus::InChainWithState) => {
let unset_best_header = unset_best_header_opt
.take()
.expect("We checked above that the value is set; qed");
tracing::debug!(
target: LOG_TARGET,
?unset_hash,
"Importing block as new best for parachain.",
);
import_block_as_new_best(unset_hash, unset_best_header, parachain).await;
},
state => tracing::debug!(
target: LOG_TARGET,
?unset_best_header,
?notification.header,
?state,
"Unexpected state for unset best header.",
),
}
}
async fn handle_new_best_parachain_head<Block, P>(
head: Vec<u8>,
parachain: &P,
unset_best_header: &mut Option<Block::Header>,
mut recovery_chan_tx: Option<&mut Sender<RecoveryRequest<Block>>>,
) where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a P: BlockImport<Block>,
{
let parachain_head = match <<Block as BlockT>::Header>::decode(&mut &head[..]) {
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
error = ?err,
"Could not decode Parachain header while following best heads.",
);
return
},
};
let hash = parachain_head.hash();
if parachain.usage_info().chain.best_hash == hash {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Skipping set new best block, because block is already the best.",
);
return;
}
match parachain.block_status(hash) {
Ok(BlockStatus::InChainWithState) => {
unset_best_header.take();
tracing::debug!(
target: LOG_TARGET,
included = ?hash,
"Importing block as new best for parachain.",
);
import_block_as_new_best(hash, parachain_head, parachain).await;
},
Ok(BlockStatus::InChainPruned) => {
tracing::error!(
target: LOG_TARGET,
block_hash = ?hash,
"Trying to set pruned block as new best!",
);
},
Ok(BlockStatus::Unknown) => {
*unset_best_header = Some(parachain_head);
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Parachain block not yet imported, waiting for import to enact as best block.",
);
if let Some(ref mut recovery_chan_tx) = recovery_chan_tx {
let req = RecoveryRequest { hash, kind: RecoveryKind::Full };
if let Err(err) = recovery_chan_tx.try_send(req) {
tracing::warn!(
target: LOG_TARGET,
block_hash = ?hash,
error = ?err,
"Unable to notify block recovery subsystem"
)
}
}
},
Err(e) => {
tracing::error!(
target: LOG_TARGET,
block_hash = ?hash,
error = ?e,
"Failed to get block status of block.",
);
},
_ => {},
}
}
async fn import_block_as_new_best<Block, P>(hash: Block::Hash, header: Block::Header, parachain: &P)
where
Block: BlockT,
P: UsageProvider<Block> + Send + Sync + BlockBackend<Block>,
for<'a> &'a P: BlockImport<Block>,
{
let best_number = parachain.usage_info().chain.best_number;
if *header.number() < best_number {
tracing::debug!(
target: LOG_TARGET,
%best_number,
block_number = %header.number(),
"Skipping importing block as new best block, because there already exists a \
best block with an higher number",
);
return
}
let mut block_import_params = BlockImportParams::new(BlockOrigin::ConsensusBroadcast, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(true));
block_import_params.import_existing = true;
if let Err(err) = parachain.import_block(block_import_params).await {
tracing::warn!(
target: LOG_TARGET,
block_hash = ?hash,
error = ?err,
"Failed to set new best block.",
);
}
}
async fn new_best_heads(
relay_chain: impl RelayChainInterface + Clone,
para_id: ParaId,
) -> RelayChainResult<impl Stream<Item = Vec<u8>>> {
let new_best_notification_stream =
relay_chain.new_best_notification_stream().await?.filter_map(move |n| {
let relay_chain = relay_chain.clone();
async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() }
});
Ok(new_best_notification_stream)
}
async fn finalized_heads(
relay_chain: impl RelayChainInterface + Clone,
para_id: ParaId,
) -> RelayChainResult<impl Stream<Item = Vec<u8>>> {
let finality_notification_stream =
relay_chain.finality_notification_stream().await?.filter_map(move |n| {
let relay_chain = relay_chain.clone();
async move { parachain_head_at(&relay_chain, n.hash(), para_id).await.ok().flatten() }
});
Ok(finality_notification_stream)
}
async fn parachain_head_at(
relay_chain: &impl RelayChainInterface,
at: PHash,
para_id: ParaId,
) -> RelayChainResult<Option<Vec<u8>>> {
relay_chain
.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut)
.await
.map(|s| s.map(|s| s.parent_head.0))
}