polkadot_availability_recovery/task/strategy/
chunks.rs1use crate::{
18 futures_undead::FuturesUndead,
19 task::{
20 strategy::{
21 do_post_recovery_check, is_unavailable, OngoingRequests, N_PARALLEL,
22 REGULAR_CHUNKS_REQ_RETRY_LIMIT,
23 },
24 RecoveryParams, State,
25 },
26 ErasureTask, RecoveryStrategy, LOG_TARGET,
27};
28
29use polkadot_node_primitives::AvailableData;
30use polkadot_node_subsystem::{overseer, RecoveryError};
31use polkadot_primitives::ValidatorIndex;
32
33use futures::{channel::oneshot, SinkExt};
34use rand::seq::SliceRandom;
35use std::collections::VecDeque;
36
37pub struct FetchChunksParams {
39 pub n_validators: usize,
40}
41
42pub struct FetchChunks {
44 error_count: usize,
46 total_received_responses: usize,
48 validators: VecDeque<ValidatorIndex>,
50 requesting_chunks: OngoingRequests,
52}
53
54impl FetchChunks {
55 pub fn new(params: FetchChunksParams) -> Self {
57 let mut validators: VecDeque<ValidatorIndex> =
60 (0..params.n_validators).map(|i| ValidatorIndex(i as u32)).collect();
61 validators.make_contiguous().shuffle(&mut rand::thread_rng());
62
63 Self {
64 error_count: 0,
65 total_received_responses: 0,
66 validators,
67 requesting_chunks: FuturesUndead::new(),
68 }
69 }
70
71 fn is_unavailable(
72 unrequested_validators: usize,
73 in_flight_requests: usize,
74 chunk_count: usize,
75 threshold: usize,
76 ) -> bool {
77 is_unavailable(chunk_count, in_flight_requests, unrequested_validators, threshold)
78 }
79
80 fn get_desired_request_count(&self, chunk_count: usize, threshold: usize) -> usize {
85 let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold);
93 let remaining_chunks = threshold.saturating_sub(chunk_count);
95 let inv_error_rate =
97 self.total_received_responses.checked_div(self.error_count).unwrap_or(0);
98 std::cmp::min(
100 max_requests_boundary,
101 remaining_chunks + remaining_chunks.checked_div(inv_error_rate).unwrap_or(0),
102 )
103 }
104
105 async fn attempt_recovery<Sender: overseer::AvailabilityRecoverySenderTrait>(
106 &mut self,
107 state: &mut State,
108 common_params: &RecoveryParams,
109 ) -> Result<AvailableData, RecoveryError> {
110 let recovery_duration =
111 common_params
112 .metrics
113 .time_erasure_recovery(RecoveryStrategy::<Sender>::strategy_type(self));
114
115 let (avilable_data_tx, available_data_rx) = oneshot::channel();
117
118 let mut erasure_task_tx = common_params.erasure_task_tx.clone();
119 erasure_task_tx
120 .send(ErasureTask::Reconstruct(
121 common_params.n_validators,
122 std::mem::take(&mut state.received_chunks)
125 .into_iter()
126 .map(|(c_index, chunk)| (c_index, chunk.chunk))
127 .collect(),
128 avilable_data_tx,
129 ))
130 .await
131 .map_err(|_| RecoveryError::ChannelClosed)?;
132
133 let available_data_response =
134 available_data_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
135
136 match available_data_response {
137 Ok(data) => do_post_recovery_check(common_params, data)
139 .await
140 .inspect_err(|_| {
141 recovery_duration.map(|rd| rd.stop_and_discard());
142 })
143 .inspect(|_| {
144 gum::trace!(
145 target: LOG_TARGET,
146 candidate_hash = ?common_params.candidate_hash,
147 erasure_root = ?common_params.erasure_root,
148 "Data recovery from chunks complete",
149 );
150 }),
151 Err(err) => {
152 recovery_duration.map(|rd| rd.stop_and_discard());
153 gum::debug!(
154 target: LOG_TARGET,
155 candidate_hash = ?common_params.candidate_hash,
156 erasure_root = ?common_params.erasure_root,
157 ?err,
158 "Data recovery error",
159 );
160
161 Err(RecoveryError::Invalid)
162 },
163 }
164 }
165}
166
167#[async_trait::async_trait]
168impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender> for FetchChunks {
169 fn display_name(&self) -> &'static str {
170 "Fetch chunks"
171 }
172
173 fn strategy_type(&self) -> &'static str {
174 "regular_chunks"
175 }
176
177 async fn run(
178 mut self: Box<Self>,
179 state: &mut State,
180 sender: &mut Sender,
181 common_params: &RecoveryParams,
182 ) -> Result<AvailableData, RecoveryError> {
183 if !common_params.bypass_availability_store {
185 let local_chunk_indices = state.populate_from_av_store(common_params, sender).await;
186 self.validators.retain(|validator_index| {
187 !local_chunk_indices.iter().any(|(v_index, _)| v_index == validator_index)
188 });
189 }
190
191 self.validators.retain(|v_index| {
194 !state.received_chunks.values().any(|c| v_index == &c.validator_index) &&
195 state.can_retry_request(
196 &(common_params.validator_authority_keys[v_index.0 as usize].clone(), *v_index),
197 REGULAR_CHUNKS_REQ_RETRY_LIMIT,
198 )
199 });
200
201 let mut validators_queue: VecDeque<_> = std::mem::take(&mut self.validators)
204 .into_iter()
205 .map(|validator_index| {
206 (
207 common_params.validator_authority_keys[validator_index.0 as usize].clone(),
208 validator_index,
209 )
210 })
211 .collect();
212
213 loop {
214 if state.chunk_count() >= common_params.threshold {
220 return self.attempt_recovery::<Sender>(state, common_params).await
221 }
222
223 if Self::is_unavailable(
224 validators_queue.len(),
225 self.requesting_chunks.total_len(),
226 state.chunk_count(),
227 common_params.threshold,
228 ) {
229 gum::debug!(
230 target: LOG_TARGET,
231 candidate_hash = ?common_params.candidate_hash,
232 erasure_root = ?common_params.erasure_root,
233 received = %state.chunk_count(),
234 requesting = %self.requesting_chunks.len(),
235 total_requesting = %self.requesting_chunks.total_len(),
236 n_validators = %common_params.n_validators,
237 "Data recovery from chunks is not possible",
238 );
239
240 return Err(RecoveryError::Unavailable)
241 }
242
243 let desired_requests_count =
244 self.get_desired_request_count(state.chunk_count(), common_params.threshold);
245 let already_requesting_count = self.requesting_chunks.len();
246 gum::debug!(
247 target: LOG_TARGET,
248 ?common_params.candidate_hash,
249 ?desired_requests_count,
250 error_count= ?self.error_count,
251 total_received = ?self.total_received_responses,
252 threshold = ?common_params.threshold,
253 ?already_requesting_count,
254 "Requesting availability chunks for a candidate",
255 );
256
257 let strategy_type = RecoveryStrategy::<Sender>::strategy_type(&*self);
258
259 state
260 .launch_parallel_chunk_requests(
261 strategy_type,
262 common_params,
263 sender,
264 desired_requests_count,
265 &mut validators_queue,
266 &mut self.requesting_chunks,
267 )
268 .await;
269
270 let (total_responses, error_count) = state
271 .wait_for_chunks(
272 strategy_type,
273 common_params,
274 REGULAR_CHUNKS_REQ_RETRY_LIMIT,
275 &mut validators_queue,
276 &mut self.requesting_chunks,
277 &mut vec![],
278 |unrequested_validators,
279 in_flight_reqs,
280 chunk_count,
281 _systematic_chunk_count| {
282 chunk_count >= common_params.threshold ||
283 Self::is_unavailable(
284 unrequested_validators,
285 in_flight_reqs,
286 chunk_count,
287 common_params.threshold,
288 )
289 },
290 )
291 .await;
292
293 self.total_received_responses += total_responses;
294 self.error_count += error_count;
295 }
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use polkadot_erasure_coding::recovery_threshold;
303
304 #[test]
305 fn test_get_desired_request_count() {
306 let n_validators = 100;
307 let threshold = recovery_threshold(n_validators).unwrap();
308
309 let mut fetch_chunks_task = FetchChunks::new(FetchChunksParams { n_validators });
310 assert_eq!(fetch_chunks_task.get_desired_request_count(0, threshold), threshold);
311 fetch_chunks_task.error_count = 1;
312 fetch_chunks_task.total_received_responses = 1;
313 assert_eq!(fetch_chunks_task.get_desired_request_count(0, threshold), threshold);
315
316 assert_eq!(fetch_chunks_task.get_desired_request_count(0, N_PARALLEL + 2), N_PARALLEL);
318
319 fetch_chunks_task.total_received_responses = 2;
320 assert_eq!(fetch_chunks_task.get_desired_request_count(1, threshold), threshold);
322 fetch_chunks_task.total_received_responses = 10;
323 assert_eq!(fetch_chunks_task.get_desired_request_count(9, threshold), 27);
327 assert_eq!(fetch_chunks_task.get_desired_request_count(9, N_PARALLEL + 9), N_PARALLEL);
329
330 fetch_chunks_task.error_count = 0;
331 assert_eq!(fetch_chunks_task.get_desired_request_count(10, threshold), threshold - 10);
333 }
334}