use crate::{
futures_undead::FuturesUndead,
task::{
strategy::{
do_post_recovery_check, is_unavailable, OngoingRequests, N_PARALLEL,
SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT,
},
RecoveryParams, RecoveryStrategy, State,
},
LOG_TARGET,
};
use polkadot_node_primitives::AvailableData;
use polkadot_node_subsystem::{overseer, RecoveryError};
use polkadot_primitives::{ChunkIndex, ValidatorIndex};
use std::collections::VecDeque;
pub struct FetchSystematicChunksParams {
pub validators: Vec<(ChunkIndex, ValidatorIndex)>,
pub backers: Vec<ValidatorIndex>,
}
pub struct FetchSystematicChunks {
threshold: usize,
validators: Vec<(ChunkIndex, ValidatorIndex)>,
backers: Vec<ValidatorIndex>,
requesting_chunks: OngoingRequests,
}
impl FetchSystematicChunks {
pub fn new(params: FetchSystematicChunksParams) -> Self {
Self {
threshold: params.validators.len(),
validators: params.validators,
backers: params.backers,
requesting_chunks: FuturesUndead::new(),
}
}
fn is_unavailable(
unrequested_validators: usize,
in_flight_requests: usize,
systematic_chunk_count: usize,
threshold: usize,
) -> bool {
is_unavailable(
systematic_chunk_count,
in_flight_requests,
unrequested_validators,
threshold,
)
}
fn get_desired_request_count(&self, chunk_count: usize, threshold: usize) -> usize {
let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold);
let remaining_chunks = threshold.saturating_sub(chunk_count);
std::cmp::min(max_requests_boundary, remaining_chunks)
}
async fn attempt_systematic_recovery<Sender: overseer::AvailabilityRecoverySenderTrait>(
&mut self,
state: &mut State,
common_params: &RecoveryParams,
) -> Result<AvailableData, RecoveryError> {
let strategy_type = RecoveryStrategy::<Sender>::strategy_type(self);
let recovery_duration = common_params.metrics.time_erasure_recovery(strategy_type);
let reconstruct_duration = common_params.metrics.time_erasure_reconstruct(strategy_type);
let chunks = state
.received_chunks
.range(
ChunkIndex(0)..
ChunkIndex(
u32::try_from(self.threshold)
.expect("validator count should not exceed u32"),
),
)
.map(|(_, chunk)| chunk.chunk.clone())
.collect::<Vec<_>>();
let available_data = polkadot_erasure_coding::reconstruct_from_systematic_v1(
common_params.n_validators,
chunks,
);
match available_data {
Ok(data) => {
drop(reconstruct_duration);
do_post_recovery_check(common_params, data)
.await
.inspect_err(|_| {
recovery_duration.map(|rd| rd.stop_and_discard());
})
.inspect(|_| {
gum::trace!(
target: LOG_TARGET,
candidate_hash = ?common_params.candidate_hash,
erasure_root = ?common_params.erasure_root,
"Data recovery from systematic chunks complete",
);
})
},
Err(err) => {
reconstruct_duration.map(|rd| rd.stop_and_discard());
recovery_duration.map(|rd| rd.stop_and_discard());
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?common_params.candidate_hash,
erasure_root = ?common_params.erasure_root,
?err,
"Systematic data recovery error",
);
Err(RecoveryError::Invalid)
},
}
}
}
#[async_trait::async_trait]
impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender>
for FetchSystematicChunks
{
fn display_name(&self) -> &'static str {
"Fetch systematic chunks"
}
fn strategy_type(&self) -> &'static str {
"systematic_chunks"
}
async fn run(
mut self: Box<Self>,
state: &mut State,
sender: &mut Sender,
common_params: &RecoveryParams,
) -> Result<AvailableData, RecoveryError> {
if !common_params.bypass_availability_store {
let local_chunk_indices = state.populate_from_av_store(common_params, sender).await;
for (_, our_c_index) in &local_chunk_indices {
if self.validators.iter().any(|(c_index, _)| c_index == our_c_index) &&
!state.received_chunks.contains_key(our_c_index)
{
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?common_params.candidate_hash,
erasure_root = ?common_params.erasure_root,
requesting = %self.requesting_chunks.len(),
total_requesting = %self.requesting_chunks.total_len(),
n_validators = %common_params.n_validators,
chunk_index = ?our_c_index,
"Systematic chunk recovery is not possible. We are among the systematic validators but hold an invalid chunk",
);
return Err(RecoveryError::Unavailable)
}
}
}
self.validators.retain(|(c_index, v_index)| {
!state.received_chunks.contains_key(c_index) &&
state.can_retry_request(
&(common_params.validator_authority_keys[v_index.0 as usize].clone(), *v_index),
SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT,
)
});
let mut systematic_chunk_count = state
.received_chunks
.range(ChunkIndex(0)..ChunkIndex(self.threshold as u32))
.count();
let mut validators_queue: VecDeque<_> = std::mem::take(&mut self.validators)
.into_iter()
.map(|(_, validator_index)| {
(
common_params.validator_authority_keys[validator_index.0 as usize].clone(),
validator_index,
)
})
.collect();
let mut backers: Vec<_> = std::mem::take(&mut self.backers)
.into_iter()
.map(|validator_index| {
common_params.validator_authority_keys[validator_index.0 as usize].clone()
})
.collect();
loop {
if systematic_chunk_count >= self.threshold {
return self.attempt_systematic_recovery::<Sender>(state, common_params).await
}
if Self::is_unavailable(
validators_queue.len(),
self.requesting_chunks.total_len(),
systematic_chunk_count,
self.threshold,
) {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?common_params.candidate_hash,
erasure_root = ?common_params.erasure_root,
%systematic_chunk_count,
requesting = %self.requesting_chunks.len(),
total_requesting = %self.requesting_chunks.total_len(),
n_validators = %common_params.n_validators,
systematic_threshold = ?self.threshold,
"Data recovery from systematic chunks is not possible",
);
return Err(RecoveryError::Unavailable)
}
let desired_requests_count =
self.get_desired_request_count(systematic_chunk_count, self.threshold);
let already_requesting_count = self.requesting_chunks.len();
gum::debug!(
target: LOG_TARGET,
?common_params.candidate_hash,
?desired_requests_count,
total_received = ?systematic_chunk_count,
systematic_threshold = ?self.threshold,
?already_requesting_count,
"Requesting systematic availability chunks for a candidate",
);
let strategy_type = RecoveryStrategy::<Sender>::strategy_type(&*self);
state
.launch_parallel_chunk_requests(
strategy_type,
common_params,
sender,
desired_requests_count,
&mut validators_queue,
&mut self.requesting_chunks,
)
.await;
let _ = state
.wait_for_chunks(
strategy_type,
common_params,
SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT,
&mut validators_queue,
&mut self.requesting_chunks,
&mut backers,
|unrequested_validators,
in_flight_reqs,
_chunk_count,
new_systematic_chunk_count| {
systematic_chunk_count = new_systematic_chunk_count;
let is_unavailable = Self::is_unavailable(
unrequested_validators,
in_flight_reqs,
systematic_chunk_count,
self.threshold,
);
systematic_chunk_count >= self.threshold || is_unavailable
},
)
.await;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use polkadot_erasure_coding::systematic_recovery_threshold;
#[test]
fn test_get_desired_request_count() {
let num_validators = 100;
let threshold = systematic_recovery_threshold(num_validators).unwrap();
let systematic_chunks_task = FetchSystematicChunks::new(FetchSystematicChunksParams {
validators: vec![(1.into(), 1.into()); num_validators],
backers: vec![],
});
assert_eq!(systematic_chunks_task.get_desired_request_count(0, threshold), threshold);
assert_eq!(systematic_chunks_task.get_desired_request_count(5, threshold), threshold - 5);
assert_eq!(
systematic_chunks_task.get_desired_request_count(num_validators * 2, threshold),
0
);
assert_eq!(systematic_chunks_task.get_desired_request_count(0, N_PARALLEL * 2), N_PARALLEL);
assert_eq!(systematic_chunks_task.get_desired_request_count(N_PARALLEL, N_PARALLEL + 2), 2);
}
}