referrerpolicy=no-referrer-when-downgrade

polkadot_availability_recovery/task/strategy/
chunks.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	futures_undead::FuturesUndead,
19	task::{
20		strategy::{
21			do_post_recovery_check, is_unavailable, OngoingRequests, N_PARALLEL,
22			REGULAR_CHUNKS_REQ_RETRY_LIMIT,
23		},
24		RecoveryParams, State,
25	},
26	ErasureTask, RecoveryStrategy, LOG_TARGET,
27};
28
29use polkadot_node_primitives::AvailableData;
30use polkadot_node_subsystem::{overseer, RecoveryError};
31use polkadot_primitives::ValidatorIndex;
32
33use futures::{channel::oneshot, SinkExt};
34use rand::seq::SliceRandom;
35use std::collections::VecDeque;
36
37/// Parameters specific to the `FetchChunks` strategy.
38pub struct FetchChunksParams {
39	pub n_validators: usize,
40}
41
42/// `RecoveryStrategy` that requests chunks from validators, in parallel.
43pub struct FetchChunks {
44	/// How many requests have been unsuccessful so far.
45	error_count: usize,
46	/// Total number of responses that have been received, including failed ones.
47	total_received_responses: usize,
48	/// A shuffled array of validator indices.
49	validators: VecDeque<ValidatorIndex>,
50	/// Collection of in-flight requests.
51	requesting_chunks: OngoingRequests,
52}
53
54impl FetchChunks {
55	/// Instantiate a new strategy.
56	pub fn new(params: FetchChunksParams) -> Self {
57		// Shuffle the validators to make sure that we don't request chunks from the same
58		// validators over and over.
59		let mut validators: VecDeque<ValidatorIndex> =
60			(0..params.n_validators).map(|i| ValidatorIndex(i as u32)).collect();
61		validators.make_contiguous().shuffle(&mut rand::thread_rng());
62
63		Self {
64			error_count: 0,
65			total_received_responses: 0,
66			validators,
67			requesting_chunks: FuturesUndead::new(),
68		}
69	}
70
71	fn is_unavailable(
72		unrequested_validators: usize,
73		in_flight_requests: usize,
74		chunk_count: usize,
75		threshold: usize,
76	) -> bool {
77		is_unavailable(chunk_count, in_flight_requests, unrequested_validators, threshold)
78	}
79
80	/// Desired number of parallel requests.
81	///
82	/// For the given threshold (total required number of chunks) get the desired number of
83	/// requests we want to have running in parallel at this time.
84	fn get_desired_request_count(&self, chunk_count: usize, threshold: usize) -> usize {
85		// Upper bound for parallel requests.
86		// We want to limit this, so requests can be processed within the timeout and we limit the
87		// following feedback loop:
88		// 1. Requests fail due to timeout
89		// 2. We request more chunks to make up for it
90		// 3. Bandwidth is spread out even more, so we get even more timeouts
91		// 4. We request more chunks to make up for it ...
92		let max_requests_boundary = std::cmp::min(N_PARALLEL, threshold);
93		// How many chunks are still needed?
94		let remaining_chunks = threshold.saturating_sub(chunk_count);
95		// What is the current error rate, so we can make up for it?
96		let inv_error_rate =
97			self.total_received_responses.checked_div(self.error_count).unwrap_or(0);
98		// Actual number of requests we want to have in flight in parallel:
99		std::cmp::min(
100			max_requests_boundary,
101			remaining_chunks + remaining_chunks.checked_div(inv_error_rate).unwrap_or(0),
102		)
103	}
104
105	async fn attempt_recovery<Sender: overseer::AvailabilityRecoverySenderTrait>(
106		&mut self,
107		state: &mut State,
108		common_params: &RecoveryParams,
109	) -> Result<AvailableData, RecoveryError> {
110		let recovery_duration =
111			common_params
112				.metrics
113				.time_erasure_recovery(RecoveryStrategy::<Sender>::strategy_type(self));
114
115		// Send request to reconstruct available data from chunks.
116		let (avilable_data_tx, available_data_rx) = oneshot::channel();
117
118		let mut erasure_task_tx = common_params.erasure_task_tx.clone();
119		erasure_task_tx
120			.send(ErasureTask::Reconstruct(
121				common_params.n_validators,
122				// Safe to leave an empty vec in place, as we're stopping the recovery process if
123				// this reconstruct fails.
124				std::mem::take(&mut state.received_chunks)
125					.into_iter()
126					.map(|(c_index, chunk)| (c_index, chunk.chunk))
127					.collect(),
128				avilable_data_tx,
129			))
130			.await
131			.map_err(|_| RecoveryError::ChannelClosed)?;
132
133		let available_data_response =
134			available_data_rx.await.map_err(|_| RecoveryError::ChannelClosed)?;
135
136		match available_data_response {
137			// Attempt post-recovery check.
138			Ok(data) => do_post_recovery_check(common_params, data)
139				.await
140				.inspect_err(|_| {
141					recovery_duration.map(|rd| rd.stop_and_discard());
142				})
143				.inspect(|_| {
144					gum::trace!(
145						target: LOG_TARGET,
146						candidate_hash = ?common_params.candidate_hash,
147						erasure_root = ?common_params.erasure_root,
148						"Data recovery from chunks complete",
149					);
150				}),
151			Err(err) => {
152				recovery_duration.map(|rd| rd.stop_and_discard());
153				gum::debug!(
154					target: LOG_TARGET,
155					candidate_hash = ?common_params.candidate_hash,
156					erasure_root = ?common_params.erasure_root,
157					?err,
158					"Data recovery error",
159				);
160
161				Err(RecoveryError::Invalid)
162			},
163		}
164	}
165}
166
167#[async_trait::async_trait]
168impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender> for FetchChunks {
169	fn display_name(&self) -> &'static str {
170		"Fetch chunks"
171	}
172
173	fn strategy_type(&self) -> &'static str {
174		"regular_chunks"
175	}
176
177	async fn run(
178		mut self: Box<Self>,
179		state: &mut State,
180		sender: &mut Sender,
181		common_params: &RecoveryParams,
182	) -> Result<AvailableData, RecoveryError> {
183		// First query the store for any chunks we've got.
184		if !common_params.bypass_availability_store {
185			let local_chunk_indices = state.populate_from_av_store(common_params, sender).await;
186			self.validators.retain(|validator_index| {
187				!local_chunk_indices.iter().any(|(v_index, _)| v_index == validator_index)
188			});
189		}
190
191		// No need to query the validators that have the chunks we already received or that we know
192		// don't have the data from previous strategies.
193		self.validators.retain(|v_index| {
194			!state.received_chunks.values().any(|c| v_index == &c.validator_index) &&
195				state.can_retry_request(
196					&(common_params.validator_authority_keys[v_index.0 as usize].clone(), *v_index),
197					REGULAR_CHUNKS_REQ_RETRY_LIMIT,
198				)
199		});
200
201		// Safe to `take` here, as we're consuming `self` anyway and we're not using the
202		// `validators` field in other methods.
203		let mut validators_queue: VecDeque<_> = std::mem::take(&mut self.validators)
204			.into_iter()
205			.map(|validator_index| {
206				(
207					common_params.validator_authority_keys[validator_index.0 as usize].clone(),
208					validator_index,
209				)
210			})
211			.collect();
212
213		loop {
214			// If received_chunks has more than threshold entries, attempt to recover the data.
215			// If that fails, or a re-encoding of it doesn't match the expected erasure root,
216			// return Err(RecoveryError::Invalid).
217			// Do this before requesting any chunks because we may have enough of them coming from
218			// past RecoveryStrategies.
219			if state.chunk_count() >= common_params.threshold {
220				return self.attempt_recovery::<Sender>(state, common_params).await
221			}
222
223			if Self::is_unavailable(
224				validators_queue.len(),
225				self.requesting_chunks.total_len(),
226				state.chunk_count(),
227				common_params.threshold,
228			) {
229				gum::debug!(
230					target: LOG_TARGET,
231					candidate_hash = ?common_params.candidate_hash,
232					erasure_root = ?common_params.erasure_root,
233					received = %state.chunk_count(),
234					requesting = %self.requesting_chunks.len(),
235					total_requesting = %self.requesting_chunks.total_len(),
236					n_validators = %common_params.n_validators,
237					"Data recovery from chunks is not possible",
238				);
239
240				return Err(RecoveryError::Unavailable)
241			}
242
243			let desired_requests_count =
244				self.get_desired_request_count(state.chunk_count(), common_params.threshold);
245			let already_requesting_count = self.requesting_chunks.len();
246			gum::debug!(
247				target: LOG_TARGET,
248				?common_params.candidate_hash,
249				?desired_requests_count,
250				error_count= ?self.error_count,
251				total_received = ?self.total_received_responses,
252				threshold = ?common_params.threshold,
253				?already_requesting_count,
254				"Requesting availability chunks for a candidate",
255			);
256
257			let strategy_type = RecoveryStrategy::<Sender>::strategy_type(&*self);
258
259			state
260				.launch_parallel_chunk_requests(
261					strategy_type,
262					common_params,
263					sender,
264					desired_requests_count,
265					&mut validators_queue,
266					&mut self.requesting_chunks,
267				)
268				.await;
269
270			let (total_responses, error_count) = state
271				.wait_for_chunks(
272					strategy_type,
273					common_params,
274					REGULAR_CHUNKS_REQ_RETRY_LIMIT,
275					&mut validators_queue,
276					&mut self.requesting_chunks,
277					&mut vec![],
278					|unrequested_validators,
279					 in_flight_reqs,
280					 chunk_count,
281					 _systematic_chunk_count| {
282						chunk_count >= common_params.threshold ||
283							Self::is_unavailable(
284								unrequested_validators,
285								in_flight_reqs,
286								chunk_count,
287								common_params.threshold,
288							)
289					},
290				)
291				.await;
292
293			self.total_received_responses += total_responses;
294			self.error_count += error_count;
295		}
296	}
297}
298
299#[cfg(test)]
300mod tests {
301	use super::*;
302	use polkadot_erasure_coding::recovery_threshold;
303
304	#[test]
305	fn test_get_desired_request_count() {
306		let n_validators = 100;
307		let threshold = recovery_threshold(n_validators).unwrap();
308
309		let mut fetch_chunks_task = FetchChunks::new(FetchChunksParams { n_validators });
310		assert_eq!(fetch_chunks_task.get_desired_request_count(0, threshold), threshold);
311		fetch_chunks_task.error_count = 1;
312		fetch_chunks_task.total_received_responses = 1;
313		// We saturate at threshold (34):
314		assert_eq!(fetch_chunks_task.get_desired_request_count(0, threshold), threshold);
315
316		// We saturate at the parallel limit.
317		assert_eq!(fetch_chunks_task.get_desired_request_count(0, N_PARALLEL + 2), N_PARALLEL);
318
319		fetch_chunks_task.total_received_responses = 2;
320		// With given error rate - still saturating:
321		assert_eq!(fetch_chunks_task.get_desired_request_count(1, threshold), threshold);
322		fetch_chunks_task.total_received_responses = 10;
323		// error rate: 1/10
324		// remaining chunks needed: threshold (34) - 9
325		// expected: 24 * (1+ 1/10) = (next greater integer) = 27
326		assert_eq!(fetch_chunks_task.get_desired_request_count(9, threshold), 27);
327		// We saturate at the parallel limit.
328		assert_eq!(fetch_chunks_task.get_desired_request_count(9, N_PARALLEL + 9), N_PARALLEL);
329
330		fetch_chunks_task.error_count = 0;
331		// With error count zero - we should fetch exactly as needed:
332		assert_eq!(fetch_chunks_task.get_desired_request_count(10, threshold), threshold - 10);
333	}
334}