use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use polkadot_node_primitives::{AvailableData, POV_BOMB_LIMIT};
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{
CandidateReceipt, CommittedCandidateReceipt, Id as ParaId, SessionIndex,
};
use cumulus_primitives_core::ParachainBlockData;
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use codec::Decode;
use futures::{
channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt,
};
use futures_timer::Delay;
use rand::{distributions::Uniform, prelude::Distribution, thread_rng};
use std::{
collections::{HashMap, HashSet, VecDeque},
pin::Pin,
sync::Arc,
time::Duration,
};
mod active_candidate_recovery;
use active_candidate_recovery::ActiveCandidateRecovery;
const LOG_TARGET: &str = "cumulus-pov-recovery";
#[async_trait::async_trait]
pub trait RecoveryHandle: Send {
async fn send_recovery_msg(
&mut self,
message: AvailabilityRecoveryMessage,
origin: &'static str,
);
}
#[async_trait::async_trait]
impl RecoveryHandle for OverseerHandle {
async fn send_recovery_msg(
&mut self,
message: AvailabilityRecoveryMessage,
origin: &'static str,
) {
self.send_msg(message, origin).await;
}
}
#[derive(Debug, PartialEq)]
pub enum RecoveryKind {
Simple,
Full,
}
pub struct RecoveryRequest<Block: BlockT> {
pub hash: Block::Hash,
pub kind: RecoveryKind,
}
#[derive(Clone, Copy)]
pub struct RecoveryDelayRange {
pub min: Duration,
pub max: Duration,
}
impl RecoveryDelayRange {
fn duration(&self) -> Duration {
Uniform::from(self.min..=self.max).sample(&mut thread_rng())
}
}
struct Candidate<Block: BlockT> {
receipt: CandidateReceipt,
session_index: SessionIndex,
block_number: NumberFor<Block>,
parent_hash: Block::Hash,
waiting_recovery: bool,
}
struct RecoveryQueue<Block: BlockT> {
recovery_delay_range: RecoveryDelayRange,
recovery_queue: VecDeque<Block::Hash>,
signaling_queue: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
}
impl<Block: BlockT> RecoveryQueue<Block> {
pub fn new(recovery_delay_range: RecoveryDelayRange) -> Self {
Self {
recovery_delay_range,
recovery_queue: Default::default(),
signaling_queue: Default::default(),
}
}
pub fn push_recovery(&mut self, hash: Block::Hash) {
let delay = self.recovery_delay_range.duration();
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Adding block to queue and adding new recovery slot in {:?} sec",
delay.as_secs(),
);
self.recovery_queue.push_back(hash);
self.signaling_queue.push(
async move {
Delay::new(delay).await;
}
.boxed(),
);
}
pub async fn next_recovery(&mut self) -> Block::Hash {
loop {
if self.signaling_queue.next().await.is_some() {
if let Some(hash) = self.recovery_queue.pop_front() {
return hash
} else {
tracing::error!(
target: LOG_TARGET,
"Recovery was signaled, but no candidate hash available. This is a bug."
);
};
}
futures::pending!()
}
}
}
pub struct PoVRecovery<Block: BlockT, PC, RC> {
candidates: HashMap<Block::Hash, Candidate<Block>>,
candidate_recovery_queue: RecoveryQueue<Block>,
active_candidate_recovery: ActiveCandidateRecovery<Block>,
waiting_for_parent: HashMap<Block::Hash, Vec<Block>>,
parachain_client: Arc<PC>,
parachain_import_queue: Box<dyn ImportQueueService<Block>>,
relay_chain_interface: RC,
para_id: ParaId,
recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
candidates_in_retry: HashSet<Block::Hash>,
parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
}
impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
where
PC: BlockBackend<Block> + BlockchainEvents<Block> + UsageProvider<Block>,
RCInterface: RelayChainInterface + Clone,
{
pub fn new(
recovery_handle: Box<dyn RecoveryHandle>,
recovery_delay_range: RecoveryDelayRange,
parachain_client: Arc<PC>,
parachain_import_queue: Box<dyn ImportQueueService<Block>>,
relay_chain_interface: RCInterface,
para_id: ParaId,
recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
) -> Self {
Self {
candidates: HashMap::new(),
candidate_recovery_queue: RecoveryQueue::new(recovery_delay_range),
active_candidate_recovery: ActiveCandidateRecovery::new(recovery_handle),
waiting_for_parent: HashMap::new(),
parachain_client,
parachain_import_queue,
relay_chain_interface,
para_id,
candidates_in_retry: HashSet::new(),
recovery_chan_rx,
parachain_sync_service,
}
}
fn handle_pending_candidate(
&mut self,
receipt: CommittedCandidateReceipt,
session_index: SessionIndex,
) {
let header = match Block::Header::decode(&mut &receipt.commitments.head_data.0[..]) {
Ok(header) => header,
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
error = ?e,
"Failed to decode parachain header from pending candidate",
);
return
},
};
if *header.number() <= self.parachain_client.usage_info().chain.finalized_number {
return
}
let hash = header.hash();
if self.candidates.contains_key(&hash) {
return
}
tracing::debug!(target: LOG_TARGET, block_hash = ?hash, "Adding outstanding candidate");
self.candidates.insert(
hash,
Candidate {
block_number: *header.number(),
receipt: receipt.to_plain(),
session_index,
parent_hash: *header.parent_hash(),
waiting_recovery: false,
},
);
self.recover(RecoveryRequest { hash, kind: RecoveryKind::Simple });
}
fn clear_waiting_recovery(&mut self, block_hash: &Block::Hash) {
if let Some(candidate) = self.candidates.get_mut(block_hash) {
candidate.waiting_recovery = false;
}
}
fn handle_block_finalized(&mut self, block_number: NumberFor<Block>) {
self.candidates.retain(|_, pc| pc.block_number > block_number);
}
async fn recover_candidate(&mut self, block_hash: Block::Hash) {
match self.candidates.get(&block_hash) {
Some(candidate) if candidate.waiting_recovery => {
tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
self.active_candidate_recovery.recover_candidate(block_hash, candidate).await;
},
_ => (),
}
}
fn reset_candidate(&mut self, hash: Block::Hash) {
let mut blocks_to_delete = vec![hash];
while let Some(delete) = blocks_to_delete.pop() {
if let Some(childs) = self.waiting_for_parent.remove(&delete) {
blocks_to_delete.extend(childs.iter().map(BlockT::hash));
}
}
self.clear_waiting_recovery(&hash);
}
async fn handle_candidate_recovered(
&mut self,
block_hash: Block::Hash,
available_data: Option<AvailableData>,
) {
let available_data = match available_data {
Some(data) => {
self.candidates_in_retry.remove(&block_hash);
data
},
None =>
if self.candidates_in_retry.insert(block_hash) {
tracing::debug!(target: LOG_TARGET, ?block_hash, "Recovery failed, retrying.");
self.candidate_recovery_queue.push_recovery(block_hash);
return
} else {
tracing::warn!(
target: LOG_TARGET,
?block_hash,
"Unable to recover block after retry.",
);
self.candidates_in_retry.remove(&block_hash);
self.reset_candidate(block_hash);
return
},
};
let raw_block_data = match sp_maybe_compressed_blob::decompress(
&available_data.pov.block_data.0,
POV_BOMB_LIMIT,
) {
Ok(r) => r,
Err(error) => {
tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
self.reset_candidate(block_hash);
return
},
};
let block_data = match ParachainBlockData::<Block>::decode(&mut &raw_block_data[..]) {
Ok(d) => d,
Err(error) => {
tracing::warn!(
target: LOG_TARGET,
?error,
"Failed to decode parachain block data from recovered PoV",
);
self.reset_candidate(block_hash);
return
},
};
let block = block_data.into_block();
let parent = *block.header().parent_hash();
match self.parachain_client.block_status(parent) {
Ok(BlockStatus::Unknown) => {
let parent_scheduled_for_recovery =
self.candidates.get(&parent).map_or(false, |parent| parent.waiting_recovery);
if parent_scheduled_for_recovery {
tracing::debug!(
target: LOG_TARGET,
?block_hash,
parent_hash = ?parent,
parent_scheduled_for_recovery,
"Waiting for recovery of parent.",
);
self.waiting_for_parent.entry(parent).or_default().push(block);
return
} else {
tracing::debug!(
target: LOG_TARGET,
?block_hash,
parent_hash = ?parent,
"Parent not found while trying to import recovered block.",
);
self.reset_candidate(block_hash);
return
}
},
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?parent,
?error,
"Error while checking block status",
);
self.reset_candidate(block_hash);
return
},
_ => (),
}
self.import_block(block).await;
}
async fn import_block(&mut self, block: Block) {
let mut blocks = VecDeque::new();
tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), "Importing block retrieved using pov_recovery");
blocks.push_back(block);
let mut incoming_blocks = Vec::new();
while let Some(block) = blocks.pop_front() {
let block_hash = block.hash();
let (header, body) = block.deconstruct();
incoming_blocks.push(IncomingBlock {
hash: block_hash,
header: Some(header),
body: Some(body),
import_existing: false,
allow_missing_state: false,
justifications: None,
origin: None,
skip_execution: false,
state: None,
indexed_body: None,
});
if let Some(waiting) = self.waiting_for_parent.remove(&block_hash) {
blocks.extend(waiting);
}
}
self.parachain_import_queue
.import_blocks(BlockOrigin::ConsensusBroadcast, incoming_blocks);
}
pub fn recover(&mut self, req: RecoveryRequest<Block>) {
let RecoveryRequest { mut hash, kind } = req;
let mut to_recover = Vec::new();
loop {
let candidate = match self.candidates.get_mut(&hash) {
Some(candidate) => candidate,
None => {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Cound not recover. Block was never announced as candidate"
);
return
},
};
match self.parachain_client.block_status(hash) {
Ok(BlockStatus::Unknown) if !candidate.waiting_recovery => {
candidate.waiting_recovery = true;
to_recover.push(hash);
},
Ok(_) => break,
Err(e) => {
tracing::error!(
target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to get block status",
);
for hash in to_recover {
self.clear_waiting_recovery(&hash);
}
return
},
}
if kind == RecoveryKind::Simple {
break
}
hash = candidate.parent_hash;
}
for hash in to_recover.into_iter().rev() {
self.candidate_recovery_queue.push_recovery(hash);
}
}
pub async fn run(mut self) {
let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
let pending_candidates = match pending_candidates(
self.relay_chain_interface.clone(),
self.para_id,
self.parachain_sync_service.clone(),
)
.await
{
Ok(pending_candidate_stream) => pending_candidate_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
return
},
};
futures::pin_mut!(pending_candidates);
loop {
select! {
pending_candidate = pending_candidates.next() => {
if let Some((receipt, session_index)) = pending_candidate {
self.handle_pending_candidate(receipt, session_index);
} else {
tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended");
return;
}
},
recovery_req = self.recovery_chan_rx.next() => {
if let Some(req) = recovery_req {
self.recover(req);
} else {
tracing::debug!(target: LOG_TARGET, "Recovery channel stream ended");
return;
}
},
imported = imported_blocks.next() => {
if let Some(imported) = imported {
self.clear_waiting_recovery(&imported.hash);
} else {
tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended");
return;
}
},
finalized = finalized_blocks.next() => {
if let Some(finalized) = finalized {
self.handle_block_finalized(*finalized.header.number());
} else {
tracing::debug!(target: LOG_TARGET, "Finalized blocks stream ended");
return;
}
},
next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => {
self.recover_candidate(next_to_recover).await;
},
(block_hash, available_data) =
self.active_candidate_recovery.wait_for_recovery().fuse() =>
{
self.handle_candidate_recovered(block_hash, available_data).await;
},
}
}
}
}
async fn pending_candidates(
relay_chain_client: impl RelayChainInterface + Clone,
para_id: ParaId,
sync_service: Arc<dyn SyncOracle + Sync + Send>,
) -> RelayChainResult<impl Stream<Item = (CommittedCandidateReceipt, SessionIndex)>> {
let import_notification_stream = relay_chain_client.import_notification_stream().await?;
let filtered_stream = import_notification_stream.filter_map(move |n| {
let client_for_closure = relay_chain_client.clone();
let sync_oracle = sync_service.clone();
async move {
let hash = n.hash();
if sync_oracle.is_major_syncing() {
tracing::debug!(
target: LOG_TARGET,
relay_hash = ?hash,
"Skipping candidate due to sync.",
);
return None
}
let pending_availability_result = client_for_closure
.candidate_pending_availability(hash, para_id)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to fetch pending candidates.",
)
});
let session_index_result =
client_for_closure.session_index_for_child(hash).await.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
error = ?e,
"Failed to fetch session index.",
)
});
if let Ok(Some(candidate)) = pending_availability_result {
session_index_result.map(|session_index| (candidate, session_index)).ok()
} else {
None
}
}
});
Ok(filtered_stream)
}