referrerpolicy=no-referrer-when-downgrade

polkadot_availability_recovery/task/strategy/
systematic.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	futures_undead::FuturesUndead,
19	task::{
20		strategy::{
21			do_post_recovery_check, is_unavailable, OngoingRequests, N_PARALLEL,
22			SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT,
23		},
24		RecoveryParams, RecoveryStrategy, State,
25	},
26	LOG_TARGET,
27};
28
29use polkadot_node_primitives::AvailableData;
30use polkadot_node_subsystem::{overseer, RecoveryError};
31use polkadot_primitives::{ChunkIndex, ValidatorIndex};
32
33use std::collections::VecDeque;
34
35/// Parameters needed for fetching systematic chunks.
36pub struct FetchSystematicChunksParams {
37	/// Validators that hold the systematic chunks.
38	pub validators: Vec<(ChunkIndex, ValidatorIndex)>,
39	/// Validators in the backing group, to be used as a backup for requesting systematic chunks.
40	pub backers: Vec<ValidatorIndex>,
41}
42
43/// `RecoveryStrategy` that attempts to recover the systematic chunks from the validators that
44/// hold them, in order to bypass the erasure code reconstruction step, which is costly.
45pub struct FetchSystematicChunks {
46	/// Systematic recovery threshold.
47	threshold: usize,
48	/// Validators that hold the systematic chunks.
49	validators: Vec<(ChunkIndex, ValidatorIndex)>,
50	/// Backers to be used as a backup.
51	backers: Vec<ValidatorIndex>,
52	/// Collection of in-flight requests.
53	requesting_chunks: OngoingRequests,
54}
55
56impl FetchSystematicChunks {
57	/// Instantiate a new systematic chunks strategy.
58	pub fn new(params: FetchSystematicChunksParams) -> Self {
59		Self {
60			threshold: params.validators.len(),
61			validators: params.validators,
62			backers: params.backers,
63			requesting_chunks: FuturesUndead::new(),
64		}
65	}
66
67	fn is_unavailable(
68		unrequested_validators: usize,
69		in_flight_requests: usize,
70		systematic_chunk_count: usize,
71		threshold: usize,
72	) -> bool {
73		is_unavailable(
74			systematic_chunk_count,
75			in_flight_requests,
76			unrequested_validators,
77			threshold,
78		)
79	}
80
81	/// Desired number of parallel requests.
82	///
83	/// For the given threshold (total required number of chunks) get the desired number of
84	/// requests we want to have running in parallel at this time.
85	fn get_desired_request_count(&self, chunk_count: usize, threshold: usize) -> usize {
86		// Upper bound for parallel requests.
87		let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold);
88		// How many chunks are still needed?
89		let remaining_chunks = threshold.saturating_sub(chunk_count);
90		// Actual number of requests we want to have in flight in parallel:
91		// We don't have to make up for any error rate, as an error fetching a systematic chunk
92		// results in failure of the entire strategy.
93		std::cmp::min(max_requests_boundary, remaining_chunks)
94	}
95
96	async fn attempt_systematic_recovery<Sender: overseer::AvailabilityRecoverySenderTrait>(
97		&mut self,
98		state: &mut State,
99		common_params: &RecoveryParams,
100	) -> Result<AvailableData, RecoveryError> {
101		let strategy_type = RecoveryStrategy::<Sender>::strategy_type(self);
102		let recovery_duration = common_params.metrics.time_erasure_recovery(strategy_type);
103		let reconstruct_duration = common_params.metrics.time_erasure_reconstruct(strategy_type);
104		let chunks = state
105			.received_chunks
106			.range(
107				ChunkIndex(0)..
108					ChunkIndex(
109						u32::try_from(self.threshold)
110							.expect("validator count should not exceed u32"),
111					),
112			)
113			.map(|(_, chunk)| chunk.chunk.clone())
114			.collect::<Vec<_>>();
115
116		let available_data = polkadot_erasure_coding::reconstruct_from_systematic_v1(
117			common_params.n_validators,
118			chunks,
119		);
120
121		match available_data {
122			Ok(data) => {
123				drop(reconstruct_duration);
124
125				// Attempt post-recovery check.
126				do_post_recovery_check(common_params, data)
127					.await
128					.inspect_err(|_| {
129						recovery_duration.map(|rd| rd.stop_and_discard());
130					})
131					.inspect(|_| {
132						gum::trace!(
133							target: LOG_TARGET,
134							candidate_hash = ?common_params.candidate_hash,
135							erasure_root = ?common_params.erasure_root,
136							"Data recovery from systematic chunks complete",
137						);
138					})
139			},
140			Err(err) => {
141				reconstruct_duration.map(|rd| rd.stop_and_discard());
142				recovery_duration.map(|rd| rd.stop_and_discard());
143
144				gum::debug!(
145					target: LOG_TARGET,
146					candidate_hash = ?common_params.candidate_hash,
147					erasure_root = ?common_params.erasure_root,
148					?err,
149					"Systematic data recovery error",
150				);
151
152				Err(RecoveryError::Invalid)
153			},
154		}
155	}
156}
157
158#[async_trait::async_trait]
159impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender>
160	for FetchSystematicChunks
161{
162	fn display_name(&self) -> &'static str {
163		"Fetch systematic chunks"
164	}
165
166	fn strategy_type(&self) -> &'static str {
167		"systematic_chunks"
168	}
169
170	async fn run(
171		mut self: Box<Self>,
172		state: &mut State,
173		sender: &mut Sender,
174		common_params: &RecoveryParams,
175	) -> Result<AvailableData, RecoveryError> {
176		// First query the store for any chunks we've got.
177		if !common_params.bypass_availability_store {
178			let local_chunk_indices = state.populate_from_av_store(common_params, sender).await;
179
180			for (_, our_c_index) in &local_chunk_indices {
181				// If we are among the systematic validators but hold an invalid chunk, we cannot
182				// perform the systematic recovery. Fall through to the next strategy.
183				if self.validators.iter().any(|(c_index, _)| c_index == our_c_index) &&
184					!state.received_chunks.contains_key(our_c_index)
185				{
186					gum::debug!(
187						target: LOG_TARGET,
188						candidate_hash = ?common_params.candidate_hash,
189						erasure_root = ?common_params.erasure_root,
190						requesting = %self.requesting_chunks.len(),
191						total_requesting = %self.requesting_chunks.total_len(),
192						n_validators = %common_params.n_validators,
193						chunk_index = ?our_c_index,
194						"Systematic chunk recovery is not possible. We are among the systematic validators but hold an invalid chunk",
195					);
196					return Err(RecoveryError::Unavailable)
197				}
198			}
199		}
200
201		// No need to query the validators that have the chunks we already received or that we know
202		// don't have the data from previous strategies.
203		self.validators.retain(|(c_index, v_index)| {
204			!state.received_chunks.contains_key(c_index) &&
205				state.can_retry_request(
206					&(common_params.validator_authority_keys[v_index.0 as usize].clone(), *v_index),
207					SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT,
208				)
209		});
210
211		let mut systematic_chunk_count = state
212			.received_chunks
213			.range(ChunkIndex(0)..ChunkIndex(self.threshold as u32))
214			.count();
215
216		// Safe to `take` here, as we're consuming `self` anyway and we're not using the
217		// `validators` or `backers` fields in other methods.
218		let mut validators_queue: VecDeque<_> = std::mem::take(&mut self.validators)
219			.into_iter()
220			.map(|(_, validator_index)| {
221				(
222					common_params.validator_authority_keys[validator_index.0 as usize].clone(),
223					validator_index,
224				)
225			})
226			.collect();
227		let mut backers: Vec<_> = std::mem::take(&mut self.backers)
228			.into_iter()
229			.map(|validator_index| {
230				common_params.validator_authority_keys[validator_index.0 as usize].clone()
231			})
232			.collect();
233
234		loop {
235			// If received_chunks has `systematic_chunk_threshold` entries, attempt to recover the
236			// data.
237			if systematic_chunk_count >= self.threshold {
238				return self.attempt_systematic_recovery::<Sender>(state, common_params).await
239			}
240
241			if Self::is_unavailable(
242				validators_queue.len(),
243				self.requesting_chunks.total_len(),
244				systematic_chunk_count,
245				self.threshold,
246			) {
247				gum::debug!(
248					target: LOG_TARGET,
249					candidate_hash = ?common_params.candidate_hash,
250					erasure_root = ?common_params.erasure_root,
251					%systematic_chunk_count,
252					requesting = %self.requesting_chunks.len(),
253					total_requesting = %self.requesting_chunks.total_len(),
254					n_validators = %common_params.n_validators,
255					systematic_threshold = ?self.threshold,
256					"Data recovery from systematic chunks is not possible",
257				);
258
259				return Err(RecoveryError::Unavailable)
260			}
261
262			let desired_requests_count =
263				self.get_desired_request_count(systematic_chunk_count, self.threshold);
264			let already_requesting_count = self.requesting_chunks.len();
265			gum::debug!(
266				target: LOG_TARGET,
267				?common_params.candidate_hash,
268				?desired_requests_count,
269				total_received = ?systematic_chunk_count,
270				systematic_threshold = ?self.threshold,
271				?already_requesting_count,
272				"Requesting systematic availability chunks for a candidate",
273			);
274
275			let strategy_type = RecoveryStrategy::<Sender>::strategy_type(&*self);
276
277			state
278				.launch_parallel_chunk_requests(
279					strategy_type,
280					common_params,
281					sender,
282					desired_requests_count,
283					&mut validators_queue,
284					&mut self.requesting_chunks,
285				)
286				.await;
287
288			let _ = state
289				.wait_for_chunks(
290					strategy_type,
291					common_params,
292					SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT,
293					&mut validators_queue,
294					&mut self.requesting_chunks,
295					&mut backers,
296					|unrequested_validators,
297					 in_flight_reqs,
298					 // Don't use this chunk count, as it may contain non-systematic chunks.
299					 _chunk_count,
300					 new_systematic_chunk_count| {
301						systematic_chunk_count = new_systematic_chunk_count;
302
303						let is_unavailable = Self::is_unavailable(
304							unrequested_validators,
305							in_flight_reqs,
306							systematic_chunk_count,
307							self.threshold,
308						);
309
310						systematic_chunk_count >= self.threshold || is_unavailable
311					},
312				)
313				.await;
314		}
315	}
316}
317
318#[cfg(test)]
319mod tests {
320	use super::*;
321	use polkadot_erasure_coding::systematic_recovery_threshold;
322
323	#[test]
324	fn test_get_desired_request_count() {
325		let num_validators = 100;
326		let threshold = systematic_recovery_threshold(num_validators).unwrap();
327
328		let systematic_chunks_task = FetchSystematicChunks::new(FetchSystematicChunksParams {
329			validators: vec![(1.into(), 1.into()); num_validators],
330			backers: vec![],
331		});
332		assert_eq!(systematic_chunks_task.get_desired_request_count(0, threshold), threshold);
333		assert_eq!(systematic_chunks_task.get_desired_request_count(5, threshold), threshold - 5);
334		assert_eq!(
335			systematic_chunks_task.get_desired_request_count(num_validators * 2, threshold),
336			0
337		);
338		assert_eq!(systematic_chunks_task.get_desired_request_count(0, N_PARALLEL * 2), N_PARALLEL);
339		assert_eq!(systematic_chunks_task.get_desired_request_count(N_PARALLEL, N_PARALLEL + 2), 2);
340	}
341}