polkadot_availability_recovery/task/strategy/
systematic.rs1use crate::{
18 futures_undead::FuturesUndead,
19 task::{
20 strategy::{
21 do_post_recovery_check, is_unavailable, OngoingRequests, N_PARALLEL,
22 SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT,
23 },
24 RecoveryParams, RecoveryStrategy, State,
25 },
26 LOG_TARGET,
27};
28
29use polkadot_node_primitives::AvailableData;
30use polkadot_node_subsystem::{overseer, RecoveryError};
31use polkadot_primitives::{ChunkIndex, ValidatorIndex};
32
33use std::collections::VecDeque;
34
35pub struct FetchSystematicChunksParams {
37 pub validators: Vec<(ChunkIndex, ValidatorIndex)>,
39 pub backers: Vec<ValidatorIndex>,
41}
42
43pub struct FetchSystematicChunks {
46 threshold: usize,
48 validators: Vec<(ChunkIndex, ValidatorIndex)>,
50 backers: Vec<ValidatorIndex>,
52 requesting_chunks: OngoingRequests,
54}
55
56impl FetchSystematicChunks {
57 pub fn new(params: FetchSystematicChunksParams) -> Self {
59 Self {
60 threshold: params.validators.len(),
61 validators: params.validators,
62 backers: params.backers,
63 requesting_chunks: FuturesUndead::new(),
64 }
65 }
66
67 fn is_unavailable(
68 unrequested_validators: usize,
69 in_flight_requests: usize,
70 systematic_chunk_count: usize,
71 threshold: usize,
72 ) -> bool {
73 is_unavailable(
74 systematic_chunk_count,
75 in_flight_requests,
76 unrequested_validators,
77 threshold,
78 )
79 }
80
81 fn get_desired_request_count(&self, chunk_count: usize, threshold: usize) -> usize {
86 let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold);
88 let remaining_chunks = threshold.saturating_sub(chunk_count);
90 std::cmp::min(max_requests_boundary, remaining_chunks)
94 }
95
96 async fn attempt_systematic_recovery<Sender: overseer::AvailabilityRecoverySenderTrait>(
97 &mut self,
98 state: &mut State,
99 common_params: &RecoveryParams,
100 ) -> Result<AvailableData, RecoveryError> {
101 let strategy_type = RecoveryStrategy::<Sender>::strategy_type(self);
102 let recovery_duration = common_params.metrics.time_erasure_recovery(strategy_type);
103 let reconstruct_duration = common_params.metrics.time_erasure_reconstruct(strategy_type);
104 let chunks = state
105 .received_chunks
106 .range(
107 ChunkIndex(0)..
108 ChunkIndex(
109 u32::try_from(self.threshold)
110 .expect("validator count should not exceed u32"),
111 ),
112 )
113 .map(|(_, chunk)| chunk.chunk.clone())
114 .collect::<Vec<_>>();
115
116 let available_data = polkadot_erasure_coding::reconstruct_from_systematic_v1(
117 common_params.n_validators,
118 chunks,
119 );
120
121 match available_data {
122 Ok(data) => {
123 drop(reconstruct_duration);
124
125 do_post_recovery_check(common_params, data)
127 .await
128 .inspect_err(|_| {
129 recovery_duration.map(|rd| rd.stop_and_discard());
130 })
131 .inspect(|_| {
132 gum::trace!(
133 target: LOG_TARGET,
134 candidate_hash = ?common_params.candidate_hash,
135 erasure_root = ?common_params.erasure_root,
136 "Data recovery from systematic chunks complete",
137 );
138 })
139 },
140 Err(err) => {
141 reconstruct_duration.map(|rd| rd.stop_and_discard());
142 recovery_duration.map(|rd| rd.stop_and_discard());
143
144 gum::debug!(
145 target: LOG_TARGET,
146 candidate_hash = ?common_params.candidate_hash,
147 erasure_root = ?common_params.erasure_root,
148 ?err,
149 "Systematic data recovery error",
150 );
151
152 Err(RecoveryError::Invalid)
153 },
154 }
155 }
156}
157
158#[async_trait::async_trait]
159impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender>
160 for FetchSystematicChunks
161{
162 fn display_name(&self) -> &'static str {
163 "Fetch systematic chunks"
164 }
165
166 fn strategy_type(&self) -> &'static str {
167 "systematic_chunks"
168 }
169
170 async fn run(
171 mut self: Box<Self>,
172 state: &mut State,
173 sender: &mut Sender,
174 common_params: &RecoveryParams,
175 ) -> Result<AvailableData, RecoveryError> {
176 if !common_params.bypass_availability_store {
178 let local_chunk_indices = state.populate_from_av_store(common_params, sender).await;
179
180 for (_, our_c_index) in &local_chunk_indices {
181 if self.validators.iter().any(|(c_index, _)| c_index == our_c_index) &&
184 !state.received_chunks.contains_key(our_c_index)
185 {
186 gum::debug!(
187 target: LOG_TARGET,
188 candidate_hash = ?common_params.candidate_hash,
189 erasure_root = ?common_params.erasure_root,
190 requesting = %self.requesting_chunks.len(),
191 total_requesting = %self.requesting_chunks.total_len(),
192 n_validators = %common_params.n_validators,
193 chunk_index = ?our_c_index,
194 "Systematic chunk recovery is not possible. We are among the systematic validators but hold an invalid chunk",
195 );
196 return Err(RecoveryError::Unavailable)
197 }
198 }
199 }
200
201 self.validators.retain(|(c_index, v_index)| {
204 !state.received_chunks.contains_key(c_index) &&
205 state.can_retry_request(
206 &(common_params.validator_authority_keys[v_index.0 as usize].clone(), *v_index),
207 SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT,
208 )
209 });
210
211 let mut systematic_chunk_count = state
212 .received_chunks
213 .range(ChunkIndex(0)..ChunkIndex(self.threshold as u32))
214 .count();
215
216 let mut validators_queue: VecDeque<_> = std::mem::take(&mut self.validators)
219 .into_iter()
220 .map(|(_, validator_index)| {
221 (
222 common_params.validator_authority_keys[validator_index.0 as usize].clone(),
223 validator_index,
224 )
225 })
226 .collect();
227 let mut backers: Vec<_> = std::mem::take(&mut self.backers)
228 .into_iter()
229 .map(|validator_index| {
230 common_params.validator_authority_keys[validator_index.0 as usize].clone()
231 })
232 .collect();
233
234 loop {
235 if systematic_chunk_count >= self.threshold {
238 return self.attempt_systematic_recovery::<Sender>(state, common_params).await
239 }
240
241 if Self::is_unavailable(
242 validators_queue.len(),
243 self.requesting_chunks.total_len(),
244 systematic_chunk_count,
245 self.threshold,
246 ) {
247 gum::debug!(
248 target: LOG_TARGET,
249 candidate_hash = ?common_params.candidate_hash,
250 erasure_root = ?common_params.erasure_root,
251 %systematic_chunk_count,
252 requesting = %self.requesting_chunks.len(),
253 total_requesting = %self.requesting_chunks.total_len(),
254 n_validators = %common_params.n_validators,
255 systematic_threshold = ?self.threshold,
256 "Data recovery from systematic chunks is not possible",
257 );
258
259 return Err(RecoveryError::Unavailable)
260 }
261
262 let desired_requests_count =
263 self.get_desired_request_count(systematic_chunk_count, self.threshold);
264 let already_requesting_count = self.requesting_chunks.len();
265 gum::debug!(
266 target: LOG_TARGET,
267 ?common_params.candidate_hash,
268 ?desired_requests_count,
269 total_received = ?systematic_chunk_count,
270 systematic_threshold = ?self.threshold,
271 ?already_requesting_count,
272 "Requesting systematic availability chunks for a candidate",
273 );
274
275 let strategy_type = RecoveryStrategy::<Sender>::strategy_type(&*self);
276
277 state
278 .launch_parallel_chunk_requests(
279 strategy_type,
280 common_params,
281 sender,
282 desired_requests_count,
283 &mut validators_queue,
284 &mut self.requesting_chunks,
285 )
286 .await;
287
288 let _ = state
289 .wait_for_chunks(
290 strategy_type,
291 common_params,
292 SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT,
293 &mut validators_queue,
294 &mut self.requesting_chunks,
295 &mut backers,
296 |unrequested_validators,
297 in_flight_reqs,
298 _chunk_count,
300 new_systematic_chunk_count| {
301 systematic_chunk_count = new_systematic_chunk_count;
302
303 let is_unavailable = Self::is_unavailable(
304 unrequested_validators,
305 in_flight_reqs,
306 systematic_chunk_count,
307 self.threshold,
308 );
309
310 systematic_chunk_count >= self.threshold || is_unavailable
311 },
312 )
313 .await;
314 }
315 }
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use polkadot_erasure_coding::systematic_recovery_threshold;
322
323 #[test]
324 fn test_get_desired_request_count() {
325 let num_validators = 100;
326 let threshold = systematic_recovery_threshold(num_validators).unwrap();
327
328 let systematic_chunks_task = FetchSystematicChunks::new(FetchSystematicChunksParams {
329 validators: vec![(1.into(), 1.into()); num_validators],
330 backers: vec![],
331 });
332 assert_eq!(systematic_chunks_task.get_desired_request_count(0, threshold), threshold);
333 assert_eq!(systematic_chunks_task.get_desired_request_count(5, threshold), threshold - 5);
334 assert_eq!(
335 systematic_chunks_task.get_desired_request_count(num_validators * 2, threshold),
336 0
337 );
338 assert_eq!(systematic_chunks_task.get_desired_request_count(0, N_PARALLEL * 2), N_PARALLEL);
339 assert_eq!(systematic_chunks_task.get_desired_request_count(N_PARALLEL, N_PARALLEL + 2), 2);
340 }
341}