#![warn(missing_docs)]
mod strategy;
pub use self::strategy::{
FetchChunks, FetchChunksParams, FetchFull, FetchFullParams, FetchSystematicChunks,
FetchSystematicChunksParams, RecoveryStrategy, State,
};
#[cfg(test)]
pub use self::strategy::{REGULAR_CHUNKS_REQ_RETRY_LIMIT, SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT};
use crate::{metrics::Metrics, ErasureTask, PostRecoveryCheck, LOG_TARGET};
use codec::Encode;
use polkadot_node_primitives::AvailableData;
use polkadot_node_subsystem::{messages::AvailabilityStoreMessage, overseer, RecoveryError};
use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, Hash};
use sc_network::ProtocolName;
use futures::channel::{mpsc, oneshot};
use std::collections::VecDeque;
#[derive(Clone)]
pub struct RecoveryParams {
pub validator_authority_keys: Vec<AuthorityDiscoveryId>,
pub n_validators: usize,
pub threshold: usize,
pub systematic_threshold: usize,
pub candidate_hash: CandidateHash,
pub erasure_root: Hash,
pub metrics: Metrics,
pub bypass_availability_store: bool,
pub post_recovery_check: PostRecoveryCheck,
pub pov_hash: Hash,
pub req_v1_protocol_name: ProtocolName,
pub req_v2_protocol_name: ProtocolName,
pub chunk_mapping_enabled: bool,
pub erasure_task_tx: mpsc::Sender<ErasureTask>,
}
pub struct RecoveryTask<Sender: overseer::AvailabilityRecoverySenderTrait> {
sender: Sender,
params: RecoveryParams,
strategies: VecDeque<Box<dyn RecoveryStrategy<Sender>>>,
state: State,
}
impl<Sender> RecoveryTask<Sender>
where
Sender: overseer::AvailabilityRecoverySenderTrait,
{
pub fn new(
sender: Sender,
params: RecoveryParams,
strategies: VecDeque<Box<dyn RecoveryStrategy<Sender>>>,
) -> Self {
Self { sender, params, strategies, state: State::new() }
}
async fn in_availability_store(&mut self) -> Option<AvailableData> {
if !self.params.bypass_availability_store {
let (tx, rx) = oneshot::channel();
self.sender
.send_message(AvailabilityStoreMessage::QueryAvailableData(
self.params.candidate_hash,
tx,
))
.await;
match rx.await {
Ok(Some(data)) => return Some(data),
Ok(None) => {},
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?self.params.candidate_hash,
"Failed to reach the availability store",
)
},
}
}
None
}
pub async fn run(mut self) -> Result<AvailableData, RecoveryError> {
if let Some(data) = self.in_availability_store().await {
return Ok(data)
}
self.params.metrics.on_recovery_started();
let _timer = self.params.metrics.time_full_recovery();
while let Some(current_strategy) = self.strategies.pop_front() {
let display_name = current_strategy.display_name();
let strategy_type = current_strategy.strategy_type();
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?self.params.candidate_hash,
"Starting `{}` strategy",
display_name
);
let res = current_strategy.run(&mut self.state, &mut self.sender, &self.params).await;
match res {
Err(RecoveryError::Unavailable) =>
if self.strategies.front().is_some() {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?self.params.candidate_hash,
"Recovery strategy `{}` did not conclude. Trying the next one.",
display_name
);
continue
},
Err(err) => {
match &err {
RecoveryError::Invalid =>
self.params.metrics.on_recovery_invalid(strategy_type),
_ => self.params.metrics.on_recovery_failed(strategy_type),
}
return Err(err)
},
Ok(data) => {
self.params.metrics.on_recovery_succeeded(strategy_type, data.encoded_size());
return Ok(data)
},
}
}
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?self.params.candidate_hash,
"Recovery of available data failed.",
);
self.params.metrics.on_recovery_failed("all");
Err(RecoveryError::Unavailable)
}
}