referrerpolicy=no-referrer-when-downgrade

polkadot_availability_recovery/task/strategy/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Recovery strategies.
18
19mod chunks;
20mod full;
21mod systematic;
22
23pub use self::{
24	chunks::{FetchChunks, FetchChunksParams},
25	full::{FetchFull, FetchFullParams},
26	systematic::{FetchSystematicChunks, FetchSystematicChunksParams},
27};
28use crate::{
29	futures_undead::FuturesUndead, ErasureTask, PostRecoveryCheck, RecoveryParams, LOG_TARGET,
30};
31
32use codec::Decode;
33use futures::{channel::oneshot, SinkExt};
34use polkadot_erasure_coding::branch_hash;
35#[cfg(not(test))]
36use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT;
37use polkadot_node_network_protocol::request_response::{
38	self as req_res, outgoing::RequestError, OutgoingRequest, Recipient, Requests,
39};
40use polkadot_node_primitives::{AvailableData, ErasureChunk};
41use polkadot_node_subsystem::{
42	messages::{AvailabilityStoreMessage, NetworkBridgeTxMessage},
43	overseer, RecoveryError,
44};
45use polkadot_primitives::{AuthorityDiscoveryId, BlakeTwo256, ChunkIndex, HashT, ValidatorIndex};
46use sc_network::{IfDisconnected, OutboundFailure, ProtocolName, RequestFailure};
47use std::{
48	collections::{BTreeMap, HashMap, VecDeque},
49	time::Duration,
50};
51
52// How many parallel chunk fetching requests should be running at once.
53const N_PARALLEL: usize = 50;
54
55/// Time after which we consider a request to have failed
56///
57/// and we should try more peers. Note in theory the request times out at the network level,
58/// measurements have shown, that in practice requests might actually take longer to fail in
59/// certain occasions. (The very least, authority discovery is not part of the timeout.)
60///
61/// For the time being this value is the same as the timeout on the networking layer, but as this
62/// timeout is more soft than the networking one, it might make sense to pick different values as
63/// well.
64#[cfg(not(test))]
65const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
66#[cfg(test)]
67const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);
68
69/// The maximum number of times systematic chunk recovery will try making a request for a given
70/// (validator,chunk) pair, if the error was not fatal. Added so that we don't get stuck in an
71/// infinite retry loop.
72pub const SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT: u32 = 2;
73/// The maximum number of times regular chunk recovery will try making a request for a given
74/// (validator,chunk) pair, if the error was not fatal. Added so that we don't get stuck in an
75/// infinite retry loop.
76pub const REGULAR_CHUNKS_REQ_RETRY_LIMIT: u32 = 5;
77
78// Helpful type alias for tracking ongoing chunk requests.
79type OngoingRequests = FuturesUndead<(
80	AuthorityDiscoveryId,
81	ValidatorIndex,
82	Result<(Option<ErasureChunk>, ProtocolName), RequestError>,
83)>;
84
85const fn is_unavailable(
86	received_chunks: usize,
87	requesting_chunks: usize,
88	unrequested_validators: usize,
89	threshold: usize,
90) -> bool {
91	received_chunks + requesting_chunks + unrequested_validators < threshold
92}
93
94/// Check validity of a chunk.
95fn is_chunk_valid(params: &RecoveryParams, chunk: &ErasureChunk) -> bool {
96	let anticipated_hash =
97		match branch_hash(&params.erasure_root, chunk.proof(), chunk.index.0 as usize) {
98			Ok(hash) => hash,
99			Err(e) => {
100				gum::debug!(
101					target: LOG_TARGET,
102					candidate_hash = ?params.candidate_hash,
103					chunk_index = ?chunk.index,
104					error = ?e,
105					"Invalid Merkle proof",
106				);
107				return false
108			},
109		};
110	let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
111	if anticipated_hash != erasure_chunk_hash {
112		gum::debug!(
113			target: LOG_TARGET,
114			candidate_hash = ?params.candidate_hash,
115			chunk_index = ?chunk.index,
116			"Merkle proof mismatch"
117		);
118		return false
119	}
120	true
121}
122
123/// Perform the validity checks after recovery.
124async fn do_post_recovery_check(
125	params: &RecoveryParams,
126	data: AvailableData,
127) -> Result<AvailableData, RecoveryError> {
128	let mut erasure_task_tx = params.erasure_task_tx.clone();
129	match params.post_recovery_check {
130		PostRecoveryCheck::Reencode => {
131			// Send request to re-encode the chunks and check merkle root.
132			let (reencode_tx, reencode_rx) = oneshot::channel();
133			erasure_task_tx
134				.send(ErasureTask::Reencode(
135					params.n_validators,
136					params.erasure_root,
137					data,
138					reencode_tx,
139				))
140				.await
141				.map_err(|_| RecoveryError::ChannelClosed)?;
142
143			reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?.ok_or_else(|| {
144				gum::trace!(
145					target: LOG_TARGET,
146					candidate_hash = ?params.candidate_hash,
147					erasure_root = ?params.erasure_root,
148					"Data recovery error - root mismatch",
149				);
150				RecoveryError::Invalid
151			})
152		},
153		PostRecoveryCheck::PovHash => {
154			let pov = data.pov.clone();
155			(pov.hash() == params.pov_hash).then_some(data).ok_or_else(|| {
156				gum::trace!(
157					target: LOG_TARGET,
158					candidate_hash = ?params.candidate_hash,
159					expected_pov_hash = ?params.pov_hash,
160					actual_pov_hash = ?pov.hash(),
161					"Data recovery error - PoV hash mismatch",
162				);
163				RecoveryError::Invalid
164			})
165		},
166	}
167}
168
169#[async_trait::async_trait]
170/// Common trait for runnable recovery strategies.
171pub trait RecoveryStrategy<Sender: overseer::AvailabilityRecoverySenderTrait>: Send {
172	/// Main entry point of the strategy.
173	async fn run(
174		mut self: Box<Self>,
175		state: &mut State,
176		sender: &mut Sender,
177		common_params: &RecoveryParams,
178	) -> Result<AvailableData, RecoveryError>;
179
180	/// Return the name of the strategy for logging purposes.
181	fn display_name(&self) -> &'static str;
182
183	/// Return the strategy type for use as a metric label.
184	fn strategy_type(&self) -> &'static str;
185}
186
187/// Utility type used for recording the result of requesting a chunk from a validator.
188enum ErrorRecord {
189	NonFatal(u32),
190	Fatal,
191}
192
193/// Helper struct used for the `received_chunks` mapping.
194/// Compared to `ErasureChunk`, it doesn't need to hold the `ChunkIndex` (because it's the key used
195/// for the map) and proof, but needs to hold the `ValidatorIndex` instead.
196struct Chunk {
197	/// The erasure-encoded chunk of data belonging to the candidate block.
198	chunk: Vec<u8>,
199	/// The validator index that corresponds to this chunk. Not always the same as the chunk index.
200	validator_index: ValidatorIndex,
201}
202
203/// Intermediate/common data that must be passed between `RecoveryStrategy`s belonging to the
204/// same `RecoveryTask`.
205pub struct State {
206	/// Chunks received so far.
207	/// This MUST be a `BTreeMap` in order for systematic recovery to work (the algorithm assumes
208	/// that chunks are ordered by their index). If we ever switch this to some non-ordered
209	/// collection, we need to add a sort step to the systematic recovery.
210	received_chunks: BTreeMap<ChunkIndex, Chunk>,
211
212	/// A record of errors returned when requesting a chunk from a validator.
213	recorded_errors: HashMap<(AuthorityDiscoveryId, ValidatorIndex), ErrorRecord>,
214}
215
216impl State {
217	pub fn new() -> Self {
218		Self { received_chunks: BTreeMap::new(), recorded_errors: HashMap::new() }
219	}
220
221	fn insert_chunk(&mut self, chunk_index: ChunkIndex, chunk: Chunk) {
222		self.received_chunks.insert(chunk_index, chunk);
223	}
224
225	fn chunk_count(&self) -> usize {
226		self.received_chunks.len()
227	}
228
229	fn systematic_chunk_count(&self, systematic_threshold: usize) -> usize {
230		self.received_chunks
231			.range(ChunkIndex(0)..ChunkIndex(systematic_threshold as u32))
232			.count()
233	}
234
235	fn record_error_fatal(
236		&mut self,
237		authority_id: AuthorityDiscoveryId,
238		validator_index: ValidatorIndex,
239	) {
240		self.recorded_errors.insert((authority_id, validator_index), ErrorRecord::Fatal);
241	}
242
243	fn record_error_non_fatal(
244		&mut self,
245		authority_id: AuthorityDiscoveryId,
246		validator_index: ValidatorIndex,
247	) {
248		self.recorded_errors
249			.entry((authority_id, validator_index))
250			.and_modify(|record| {
251				if let ErrorRecord::NonFatal(ref mut count) = record {
252					*count = count.saturating_add(1);
253				}
254			})
255			.or_insert(ErrorRecord::NonFatal(1));
256	}
257
258	fn can_retry_request(
259		&self,
260		key: &(AuthorityDiscoveryId, ValidatorIndex),
261		retry_threshold: u32,
262	) -> bool {
263		match self.recorded_errors.get(key) {
264			None => true,
265			Some(entry) => match entry {
266				ErrorRecord::Fatal => false,
267				ErrorRecord::NonFatal(count) if *count < retry_threshold => true,
268				ErrorRecord::NonFatal(_) => false,
269			},
270		}
271	}
272
273	/// Retrieve the local chunks held in the av-store (should be either 0 or 1).
274	async fn populate_from_av_store<Sender: overseer::AvailabilityRecoverySenderTrait>(
275		&mut self,
276		params: &RecoveryParams,
277		sender: &mut Sender,
278	) -> Vec<(ValidatorIndex, ChunkIndex)> {
279		let (tx, rx) = oneshot::channel();
280		sender
281			.send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx))
282			.await;
283
284		match rx.await {
285			Ok(chunks) => {
286				// This should either be length 1 or 0. If we had the whole data,
287				// we wouldn't have reached this stage.
288				let chunk_indices: Vec<_> = chunks
289					.iter()
290					.map(|(validator_index, chunk)| (*validator_index, chunk.index))
291					.collect();
292
293				for (validator_index, chunk) in chunks {
294					if is_chunk_valid(params, &chunk) {
295						gum::trace!(
296							target: LOG_TARGET,
297							candidate_hash = ?params.candidate_hash,
298							chunk_index = ?chunk.index,
299							"Found valid chunk on disk"
300						);
301						self.insert_chunk(
302							chunk.index,
303							Chunk { chunk: chunk.chunk, validator_index },
304						);
305					} else {
306						gum::error!(
307							target: LOG_TARGET,
308							"Loaded invalid chunk from disk! Disk/Db corruption _very_ likely - please fix ASAP!"
309						);
310					};
311				}
312
313				chunk_indices
314			},
315			Err(oneshot::Canceled) => {
316				gum::warn!(
317					target: LOG_TARGET,
318					candidate_hash = ?params.candidate_hash,
319					"Failed to reach the availability store"
320				);
321
322				vec![]
323			},
324		}
325	}
326
327	/// Launch chunk requests in parallel, according to the parameters.
328	async fn launch_parallel_chunk_requests<Sender>(
329		&mut self,
330		strategy_type: &str,
331		params: &RecoveryParams,
332		sender: &mut Sender,
333		desired_requests_count: usize,
334		validators: &mut VecDeque<(AuthorityDiscoveryId, ValidatorIndex)>,
335		requesting_chunks: &mut OngoingRequests,
336	) where
337		Sender: overseer::AvailabilityRecoverySenderTrait,
338	{
339		let candidate_hash = params.candidate_hash;
340		let already_requesting_count = requesting_chunks.len();
341
342		let to_launch = desired_requests_count - already_requesting_count;
343		let mut requests = Vec::with_capacity(to_launch);
344
345		gum::trace!(
346			target: LOG_TARGET,
347			?candidate_hash,
348			"Attempting to launch {} requests",
349			to_launch
350		);
351
352		while requesting_chunks.len() < desired_requests_count {
353			if let Some((authority_id, validator_index)) = validators.pop_back() {
354				gum::trace!(
355					target: LOG_TARGET,
356					?authority_id,
357					?validator_index,
358					?candidate_hash,
359					"Requesting chunk",
360				);
361
362				// Request data.
363				let raw_request_v2 =
364					req_res::v2::ChunkFetchingRequest { candidate_hash, index: validator_index };
365				let raw_request_v1 = req_res::v1::ChunkFetchingRequest::from(raw_request_v2);
366
367				let (req, res) = OutgoingRequest::new_with_fallback(
368					Recipient::Authority(authority_id.clone()),
369					raw_request_v2,
370					raw_request_v1,
371				);
372				requests.push(Requests::ChunkFetching(req));
373
374				params.metrics.on_chunk_request_issued(strategy_type);
375				let timer = params.metrics.time_chunk_request(strategy_type);
376				let v1_protocol_name = params.req_v1_protocol_name.clone();
377				let v2_protocol_name = params.req_v2_protocol_name.clone();
378
379				let chunk_mapping_enabled = params.chunk_mapping_enabled;
380				let authority_id_clone = authority_id.clone();
381
382				requesting_chunks.push(Box::pin(async move {
383					let _timer = timer;
384					let res = match res.await {
385						Ok((bytes, protocol)) =>
386							if v2_protocol_name == protocol {
387								match req_res::v2::ChunkFetchingResponse::decode(&mut &bytes[..]) {
388									Ok(req_res::v2::ChunkFetchingResponse::Chunk(chunk)) =>
389										Ok((Some(chunk.into()), protocol)),
390									Ok(req_res::v2::ChunkFetchingResponse::NoSuchChunk) =>
391										Ok((None, protocol)),
392									Err(e) => Err(RequestError::InvalidResponse(e)),
393								}
394							} else if v1_protocol_name == protocol {
395								// V1 protocol version must not be used when chunk mapping node
396								// feature is enabled, because we can't know the real index of the
397								// returned chunk.
398								// This case should never be reached as long as the
399								// `AvailabilityChunkMapping` feature is only enabled after the
400								// v1 version is removed. Still, log this.
401								if chunk_mapping_enabled {
402									gum::info!(
403										target: LOG_TARGET,
404										?candidate_hash,
405										authority_id = ?authority_id_clone,
406										"Another validator is responding on /req_chunk/1 protocol while the availability chunk \
407										mapping feature is enabled in the runtime. All validators must switch to /req_chunk/2."
408									);
409								}
410
411								match req_res::v1::ChunkFetchingResponse::decode(&mut &bytes[..]) {
412									Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) => Ok((
413										Some(chunk.recombine_into_chunk(&raw_request_v1)),
414										protocol,
415									)),
416									Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) =>
417										Ok((None, protocol)),
418									Err(e) => Err(RequestError::InvalidResponse(e)),
419								}
420							} else {
421								Err(RequestError::NetworkError(RequestFailure::UnknownProtocol))
422							},
423
424						Err(e) => Err(e),
425					};
426
427					(authority_id, validator_index, res)
428				}));
429			} else {
430				break
431			}
432		}
433
434		if requests.len() != 0 {
435			sender
436				.send_message(NetworkBridgeTxMessage::SendRequests(
437					requests,
438					IfDisconnected::TryConnect,
439				))
440				.await;
441		}
442	}
443
444	/// Wait for a sufficient amount of chunks to reconstruct according to the provided `params`.
445	async fn wait_for_chunks(
446		&mut self,
447		strategy_type: &str,
448		params: &RecoveryParams,
449		retry_threshold: u32,
450		validators: &mut VecDeque<(AuthorityDiscoveryId, ValidatorIndex)>,
451		requesting_chunks: &mut OngoingRequests,
452		// If supplied, these validators will be used as a backup for requesting chunks. They
453		// should hold all chunks. Each of them will only be used to query one chunk.
454		backup_validators: &mut Vec<AuthorityDiscoveryId>,
455		// Function that returns `true` when this strategy can conclude. Either if we got enough
456		// chunks or if it's impossible.
457		mut can_conclude: impl FnMut(
458			// Number of validators left in the queue
459			usize,
460			// Number of in flight requests
461			usize,
462			// Number of valid chunks received so far
463			usize,
464			// Number of valid systematic chunks received so far
465			usize,
466		) -> bool,
467	) -> (usize, usize) {
468		let metrics = &params.metrics;
469
470		let mut total_received_responses = 0;
471		let mut error_count = 0;
472
473		// Wait for all current requests to conclude or time-out, or until we reach enough chunks.
474		// We also declare requests undead, once `TIMEOUT_START_NEW_REQUESTS` is reached and will
475		// return in that case for `launch_parallel_requests` to fill up slots again.
476		while let Some(res) = requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await
477		{
478			total_received_responses += 1;
479
480			let (authority_id, validator_index, request_result) = res;
481
482			let mut is_error = false;
483
484			match request_result {
485				Ok((maybe_chunk, protocol)) => {
486					match protocol {
487						name if name == params.req_v1_protocol_name =>
488							params.metrics.on_chunk_response_v1(),
489						name if name == params.req_v2_protocol_name =>
490							params.metrics.on_chunk_response_v2(),
491						_ => {},
492					}
493
494					match maybe_chunk {
495						Some(chunk) =>
496							if is_chunk_valid(params, &chunk) {
497								metrics.on_chunk_request_succeeded(strategy_type);
498								gum::trace!(
499									target: LOG_TARGET,
500									candidate_hash = ?params.candidate_hash,
501									?authority_id,
502									?validator_index,
503									"Received valid chunk",
504								);
505								self.insert_chunk(
506									chunk.index,
507									Chunk { chunk: chunk.chunk, validator_index },
508								);
509							} else {
510								metrics.on_chunk_request_invalid(strategy_type);
511								error_count += 1;
512								// Record that we got an invalid chunk so that subsequent strategies
513								// don't try requesting this again.
514								self.record_error_fatal(authority_id.clone(), validator_index);
515								is_error = true;
516							},
517						None => {
518							metrics.on_chunk_request_no_such_chunk(strategy_type);
519							gum::trace!(
520								target: LOG_TARGET,
521								candidate_hash = ?params.candidate_hash,
522								?authority_id,
523								?validator_index,
524								"Validator did not have the chunk",
525							);
526							error_count += 1;
527							// Record that the validator did not have this chunk so that subsequent
528							// strategies don't try requesting this again.
529							self.record_error_fatal(authority_id.clone(), validator_index);
530							is_error = true;
531						},
532					}
533				},
534				Err(err) => {
535					error_count += 1;
536
537					gum::trace!(
538						target: LOG_TARGET,
539						candidate_hash= ?params.candidate_hash,
540						?err,
541						?authority_id,
542						?validator_index,
543						"Failure requesting chunk",
544					);
545
546					is_error = true;
547
548					match err {
549						RequestError::InvalidResponse(_) => {
550							metrics.on_chunk_request_invalid(strategy_type);
551
552							gum::debug!(
553								target: LOG_TARGET,
554								candidate_hash = ?params.candidate_hash,
555								?err,
556								?authority_id,
557								?validator_index,
558								"Chunk fetching response was invalid",
559							);
560
561							// Record that we got an invalid chunk so that this or
562							// subsequent strategies don't try requesting this again.
563							self.record_error_fatal(authority_id.clone(), validator_index);
564						},
565						RequestError::NetworkError(err) => {
566							// No debug logs on general network errors - that became very
567							// spammy occasionally.
568							if let RequestFailure::Network(OutboundFailure::Timeout) = err {
569								metrics.on_chunk_request_timeout(strategy_type);
570							} else {
571								metrics.on_chunk_request_error(strategy_type);
572							}
573
574							// Record that we got a non-fatal error so that this or
575							// subsequent strategies will retry requesting this only a
576							// limited number of times.
577							self.record_error_non_fatal(authority_id.clone(), validator_index);
578						},
579						RequestError::Canceled(_) => {
580							metrics.on_chunk_request_error(strategy_type);
581
582							// Record that we got a non-fatal error so that this or
583							// subsequent strategies will retry requesting this only a
584							// limited number of times.
585							self.record_error_non_fatal(authority_id.clone(), validator_index);
586						},
587					}
588				},
589			}
590
591			if is_error {
592				// First, see if we can retry the request.
593				if self.can_retry_request(&(authority_id.clone(), validator_index), retry_threshold)
594				{
595					validators.push_front((authority_id, validator_index));
596				} else {
597					// Otherwise, try requesting from a backer as a backup, if we've not already
598					// requested the same chunk from it.
599
600					let position = backup_validators.iter().position(|v| {
601						!self.recorded_errors.contains_key(&(v.clone(), validator_index))
602					});
603					if let Some(position) = position {
604						// Use swap_remove because it's faster and we don't care about order here.
605						let backer = backup_validators.swap_remove(position);
606						validators.push_front((backer, validator_index));
607					}
608				}
609			}
610
611			if can_conclude(
612				validators.len(),
613				requesting_chunks.total_len(),
614				self.chunk_count(),
615				self.systematic_chunk_count(params.systematic_threshold),
616			) {
617				gum::debug!(
618					target: LOG_TARGET,
619					validators_len = validators.len(),
620					candidate_hash = ?params.candidate_hash,
621					received_chunks_count = ?self.chunk_count(),
622					requested_chunks_count = ?requesting_chunks.len(),
623					threshold = ?params.threshold,
624					"Can conclude availability recovery strategy",
625				);
626				break
627			}
628		}
629
630		(total_received_responses, error_count)
631	}
632}
633
634#[cfg(test)]
635mod tests {
636	use super::*;
637	use crate::{tests::*, Metrics, RecoveryStrategy, RecoveryTask};
638	use assert_matches::assert_matches;
639	use codec::Error as DecodingError;
640	use futures::{
641		channel::mpsc::{self, UnboundedReceiver},
642		executor, future, Future, FutureExt, StreamExt,
643	};
644	use polkadot_erasure_coding::{recovery_threshold, systematic_recovery_threshold};
645	use polkadot_node_network_protocol::request_response::Protocol;
646	use polkadot_node_primitives::{BlockData, PoV};
647	use polkadot_node_subsystem::{AllMessages, TimeoutExt};
648	use polkadot_node_subsystem_test_helpers::{
649		derive_erasure_chunks_with_proofs_and_root, sender_receiver, TestSubsystemSender,
650	};
651	use polkadot_primitives::{CandidateHash, HeadData, PersistedValidationData};
652	use polkadot_primitives_test_helpers::dummy_hash;
653	use sp_keyring::Sr25519Keyring;
654	use std::sync::Arc;
655
656	const TIMEOUT: Duration = Duration::from_secs(1);
657
658	impl Default for RecoveryParams {
659		fn default() -> Self {
660			let validators = vec![
661				Sr25519Keyring::Ferdie,
662				Sr25519Keyring::Alice.into(),
663				Sr25519Keyring::Bob.into(),
664				Sr25519Keyring::Charlie,
665				Sr25519Keyring::Dave,
666				Sr25519Keyring::One,
667				Sr25519Keyring::Two,
668			];
669			let (erasure_task_tx, _erasure_task_rx) = mpsc::channel(10);
670
671			Self {
672				validator_authority_keys: validator_authority_id(&validators),
673				n_validators: validators.len(),
674				threshold: recovery_threshold(validators.len()).unwrap(),
675				systematic_threshold: systematic_recovery_threshold(validators.len()).unwrap(),
676				candidate_hash: CandidateHash(dummy_hash()),
677				erasure_root: dummy_hash(),
678				metrics: Metrics::new_dummy(),
679				bypass_availability_store: false,
680				post_recovery_check: PostRecoveryCheck::Reencode,
681				pov_hash: dummy_hash(),
682				req_v1_protocol_name: "/req_chunk/1".into(),
683				req_v2_protocol_name: "/req_chunk/2".into(),
684				chunk_mapping_enabled: true,
685				erasure_task_tx,
686			}
687		}
688	}
689
690	impl RecoveryParams {
691		fn create_chunks(&mut self) -> Vec<ErasureChunk> {
692			let available_data = dummy_available_data();
693			let (chunks, erasure_root) = derive_erasure_chunks_with_proofs_and_root(
694				self.n_validators,
695				&available_data,
696				|_, _| {},
697			);
698
699			self.erasure_root = erasure_root;
700			self.pov_hash = available_data.pov.hash();
701
702			chunks
703		}
704	}
705
706	fn dummy_available_data() -> AvailableData {
707		let validation_data = PersistedValidationData {
708			parent_head: HeadData(vec![7, 8, 9]),
709			relay_parent_number: Default::default(),
710			max_pov_size: 1024,
711			relay_parent_storage_root: Default::default(),
712		};
713
714		AvailableData {
715			validation_data,
716			pov: Arc::new(PoV { block_data: BlockData(vec![42; 64]) }),
717		}
718	}
719
720	fn test_harness<RecvFut: Future<Output = ()>, TestFut: Future<Output = ()>>(
721		receiver_future: impl FnOnce(UnboundedReceiver<AllMessages>) -> RecvFut,
722		test: impl FnOnce(TestSubsystemSender) -> TestFut,
723	) {
724		let (sender, receiver) = sender_receiver();
725
726		let test_fut = test(sender);
727		let receiver_future = receiver_future(receiver);
728
729		futures::pin_mut!(test_fut);
730		futures::pin_mut!(receiver_future);
731
732		executor::block_on(future::join(test_fut, receiver_future)).1
733	}
734
735	#[test]
736	fn test_recorded_errors() {
737		let retry_threshold = 2;
738		let mut state = State::new();
739
740		let alice = Sr25519Keyring::Alice.public();
741		let bob = Sr25519Keyring::Bob.public();
742		let eve = Sr25519Keyring::Eve.public();
743
744		assert!(state.can_retry_request(&(alice.into(), 0.into()), retry_threshold));
745		assert!(state.can_retry_request(&(alice.into(), 0.into()), 0));
746		state.record_error_non_fatal(alice.into(), 0.into());
747		assert!(state.can_retry_request(&(alice.into(), 0.into()), retry_threshold));
748		state.record_error_non_fatal(alice.into(), 0.into());
749		assert!(!state.can_retry_request(&(alice.into(), 0.into()), retry_threshold));
750		state.record_error_non_fatal(alice.into(), 0.into());
751		assert!(!state.can_retry_request(&(alice.into(), 0.into()), retry_threshold));
752
753		assert!(state.can_retry_request(&(alice.into(), 0.into()), 5));
754
755		state.record_error_fatal(bob.into(), 1.into());
756		assert!(!state.can_retry_request(&(bob.into(), 1.into()), retry_threshold));
757		state.record_error_non_fatal(bob.into(), 1.into());
758		assert!(!state.can_retry_request(&(bob.into(), 1.into()), retry_threshold));
759
760		assert!(state.can_retry_request(&(eve.into(), 4.into()), 0));
761		assert!(state.can_retry_request(&(eve.into(), 4.into()), retry_threshold));
762	}
763
764	#[test]
765	fn test_populate_from_av_store() {
766		let params = RecoveryParams::default();
767
768		// Failed to reach the av store
769		{
770			let params = params.clone();
771			let candidate_hash = params.candidate_hash;
772			let mut state = State::new();
773
774			test_harness(
775				|mut receiver: UnboundedReceiver<AllMessages>| async move {
776					assert_matches!(
777					receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
778					AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryAllChunks(hash, tx)) => {
779						assert_eq!(hash, candidate_hash);
780						drop(tx);
781					});
782				},
783				|mut sender| async move {
784					let local_chunk_indices =
785						state.populate_from_av_store(&params, &mut sender).await;
786
787					assert_eq!(state.chunk_count(), 0);
788					assert_eq!(local_chunk_indices.len(), 0);
789				},
790			);
791		}
792
793		// Found invalid chunk
794		{
795			let mut params = params.clone();
796			let candidate_hash = params.candidate_hash;
797			let mut state = State::new();
798			let chunks = params.create_chunks();
799
800			test_harness(
801				|mut receiver: UnboundedReceiver<AllMessages>| async move {
802					assert_matches!(
803					receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
804					AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryAllChunks(hash, tx)) => {
805						assert_eq!(hash, candidate_hash);
806						let mut chunk = chunks[0].clone();
807						chunk.index = 3.into();
808						tx.send(vec![(2.into(), chunk)]).unwrap();
809					});
810				},
811				|mut sender| async move {
812					let local_chunk_indices =
813						state.populate_from_av_store(&params, &mut sender).await;
814
815					assert_eq!(state.chunk_count(), 0);
816					assert_eq!(local_chunk_indices.len(), 1);
817				},
818			);
819		}
820
821		// Found valid chunk
822		{
823			let mut params = params.clone();
824			let candidate_hash = params.candidate_hash;
825			let mut state = State::new();
826			let chunks = params.create_chunks();
827
828			test_harness(
829				|mut receiver: UnboundedReceiver<AllMessages>| async move {
830					assert_matches!(
831					receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
832					AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryAllChunks(hash, tx)) => {
833						assert_eq!(hash, candidate_hash);
834						tx.send(vec![(4.into(), chunks[1].clone())]).unwrap();
835					});
836				},
837				|mut sender| async move {
838					let local_chunk_indices =
839						state.populate_from_av_store(&params, &mut sender).await;
840
841					assert_eq!(state.chunk_count(), 1);
842					assert_eq!(local_chunk_indices.len(), 1);
843				},
844			);
845		}
846	}
847
848	#[test]
849	fn test_launch_parallel_chunk_requests() {
850		let params = RecoveryParams::default();
851		let alice: AuthorityDiscoveryId = Sr25519Keyring::Alice.public().into();
852		let bob: AuthorityDiscoveryId = Sr25519Keyring::Bob.public().into();
853		let eve: AuthorityDiscoveryId = Sr25519Keyring::Eve.public().into();
854
855		// No validators to request from.
856		{
857			let params = params.clone();
858			let mut state = State::new();
859			let mut ongoing_reqs = OngoingRequests::new();
860			let mut validators = VecDeque::new();
861
862			test_harness(
863				|mut receiver: UnboundedReceiver<AllMessages>| async move {
864					// Shouldn't send any requests.
865					assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
866				},
867				|mut sender| async move {
868					state
869						.launch_parallel_chunk_requests(
870							"regular",
871							&params,
872							&mut sender,
873							3,
874							&mut validators,
875							&mut ongoing_reqs,
876						)
877						.await;
878
879					assert_eq!(ongoing_reqs.total_len(), 0);
880				},
881			);
882		}
883
884		// Has validators but no need to request more.
885		{
886			let params = params.clone();
887			let mut state = State::new();
888			let mut ongoing_reqs = OngoingRequests::new();
889			let mut validators = VecDeque::new();
890			validators.push_back((alice.clone(), ValidatorIndex(1)));
891
892			test_harness(
893				|mut receiver: UnboundedReceiver<AllMessages>| async move {
894					// Shouldn't send any requests.
895					assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
896				},
897				|mut sender| async move {
898					state
899						.launch_parallel_chunk_requests(
900							"regular",
901							&params,
902							&mut sender,
903							0,
904							&mut validators,
905							&mut ongoing_reqs,
906						)
907						.await;
908
909					assert_eq!(ongoing_reqs.total_len(), 0);
910				},
911			);
912		}
913
914		// Has validators but no need to request more.
915		{
916			let params = params.clone();
917			let mut state = State::new();
918			let mut ongoing_reqs = OngoingRequests::new();
919			ongoing_reqs.push(async { todo!() }.boxed());
920			ongoing_reqs.soft_cancel();
921			let mut validators = VecDeque::new();
922			validators.push_back((alice.clone(), ValidatorIndex(1)));
923
924			test_harness(
925				|mut receiver: UnboundedReceiver<AllMessages>| async move {
926					// Shouldn't send any requests.
927					assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
928				},
929				|mut sender| async move {
930					state
931						.launch_parallel_chunk_requests(
932							"regular",
933							&params,
934							&mut sender,
935							0,
936							&mut validators,
937							&mut ongoing_reqs,
938						)
939						.await;
940
941					assert_eq!(ongoing_reqs.total_len(), 1);
942					assert_eq!(ongoing_reqs.len(), 0);
943				},
944			);
945		}
946
947		// Needs to request more.
948		{
949			let params = params.clone();
950			let mut state = State::new();
951			let mut ongoing_reqs = OngoingRequests::new();
952			ongoing_reqs.push(async { todo!() }.boxed());
953			ongoing_reqs.soft_cancel();
954			ongoing_reqs.push(async { todo!() }.boxed());
955			let mut validators = VecDeque::new();
956			validators.push_back((alice.clone(), 0.into()));
957			validators.push_back((bob, 1.into()));
958			validators.push_back((eve, 2.into()));
959
960			test_harness(
961				|mut receiver: UnboundedReceiver<AllMessages>| async move {
962					assert_matches!(
963						receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
964						AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests(requests, _)) if requests.len()
965== 3 					);
966				},
967				|mut sender| async move {
968					state
969						.launch_parallel_chunk_requests(
970							"regular",
971							&params,
972							&mut sender,
973							10,
974							&mut validators,
975							&mut ongoing_reqs,
976						)
977						.await;
978
979					assert_eq!(ongoing_reqs.total_len(), 5);
980					assert_eq!(ongoing_reqs.len(), 4);
981				},
982			);
983		}
984
985		// Check network protocol versioning.
986		{
987			let params = params.clone();
988			let mut state = State::new();
989			let mut ongoing_reqs = OngoingRequests::new();
990			let mut validators = VecDeque::new();
991			validators.push_back((alice, 0.into()));
992
993			test_harness(
994				|mut receiver: UnboundedReceiver<AllMessages>| async move {
995					match receiver.next().timeout(TIMEOUT).await.unwrap().unwrap() {
996						AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests(
997							mut requests,
998							_,
999						)) => {
1000							assert_eq!(requests.len(), 1);
1001							// By default, we should use the new protocol version with a fallback on
1002							// the older one.
1003							let (protocol, request) = requests.remove(0).encode_request();
1004							assert_eq!(protocol, Protocol::ChunkFetchingV2);
1005							assert_eq!(
1006								request.fallback_request.unwrap().1,
1007								Protocol::ChunkFetchingV1
1008							);
1009						},
1010						_ => unreachable!(),
1011					}
1012				},
1013				|mut sender| async move {
1014					state
1015						.launch_parallel_chunk_requests(
1016							"regular",
1017							&params,
1018							&mut sender,
1019							10,
1020							&mut validators,
1021							&mut ongoing_reqs,
1022						)
1023						.await;
1024
1025					assert_eq!(ongoing_reqs.total_len(), 1);
1026					assert_eq!(ongoing_reqs.len(), 1);
1027				},
1028			);
1029		}
1030	}
1031
1032	#[test]
1033	fn test_wait_for_chunks() {
1034		let params = RecoveryParams::default();
1035		let retry_threshold = 2;
1036
1037		// No ongoing requests.
1038		{
1039			let params = params.clone();
1040			let mut state = State::new();
1041			let mut ongoing_reqs = OngoingRequests::new();
1042			let mut validators = VecDeque::new();
1043
1044			test_harness(
1045				|mut receiver: UnboundedReceiver<AllMessages>| async move {
1046					// Shouldn't send any requests.
1047					assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
1048				},
1049				|_| async move {
1050					let (total_responses, error_count) = state
1051						.wait_for_chunks(
1052							"regular",
1053							&params,
1054							retry_threshold,
1055							&mut validators,
1056							&mut ongoing_reqs,
1057							&mut vec![],
1058							|_, _, _, _| false,
1059						)
1060						.await;
1061					assert_eq!(total_responses, 0);
1062					assert_eq!(error_count, 0);
1063					assert_eq!(state.chunk_count(), 0);
1064				},
1065			);
1066		}
1067
1068		// Complex scenario.
1069		{
1070			let mut params = params.clone();
1071			let chunks = params.create_chunks();
1072			let mut state = State::new();
1073			let mut ongoing_reqs = OngoingRequests::new();
1074			ongoing_reqs.push(
1075				future::ready((
1076					params.validator_authority_keys[0].clone(),
1077					0.into(),
1078					Ok((Some(chunks[0].clone()), "".into())),
1079				))
1080				.boxed(),
1081			);
1082			ongoing_reqs.soft_cancel();
1083			ongoing_reqs.push(
1084				future::ready((
1085					params.validator_authority_keys[1].clone(),
1086					1.into(),
1087					Ok((Some(chunks[1].clone()), "".into())),
1088				))
1089				.boxed(),
1090			);
1091			ongoing_reqs.push(
1092				future::ready((
1093					params.validator_authority_keys[2].clone(),
1094					2.into(),
1095					Ok((None, "".into())),
1096				))
1097				.boxed(),
1098			);
1099			ongoing_reqs.push(
1100				future::ready((
1101					params.validator_authority_keys[3].clone(),
1102					3.into(),
1103					Err(RequestError::from(DecodingError::from("err"))),
1104				))
1105				.boxed(),
1106			);
1107			ongoing_reqs.push(
1108				future::ready((
1109					params.validator_authority_keys[4].clone(),
1110					4.into(),
1111					Err(RequestError::NetworkError(RequestFailure::NotConnected)),
1112				))
1113				.boxed(),
1114			);
1115
1116			let mut validators: VecDeque<_> = (5..params.n_validators as u32)
1117				.map(|i| (params.validator_authority_keys[i as usize].clone(), i.into()))
1118				.collect();
1119			validators.push_back((
1120				Sr25519Keyring::AliceStash.public().into(),
1121				ValidatorIndex(params.n_validators as u32),
1122			));
1123
1124			test_harness(
1125				|mut receiver: UnboundedReceiver<AllMessages>| async move {
1126					// Shouldn't send any requests.
1127					assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
1128				},
1129				|_| async move {
1130					let (total_responses, error_count) = state
1131						.wait_for_chunks(
1132							"regular",
1133							&params,
1134							retry_threshold,
1135							&mut validators,
1136							&mut ongoing_reqs,
1137							&mut vec![],
1138							|_, _, _, _| false,
1139						)
1140						.await;
1141					assert_eq!(total_responses, 5);
1142					assert_eq!(error_count, 3);
1143					assert_eq!(state.chunk_count(), 2);
1144
1145					let mut expected_validators: VecDeque<_> = (4..params.n_validators as u32)
1146						.map(|i| (params.validator_authority_keys[i as usize].clone(), i.into()))
1147						.collect();
1148					expected_validators.push_back((
1149						Sr25519Keyring::AliceStash.public().into(),
1150						ValidatorIndex(params.n_validators as u32),
1151					));
1152
1153					assert_eq!(validators, expected_validators);
1154
1155					// This time we'll go over the recoverable error threshold.
1156					ongoing_reqs.push(
1157						future::ready((
1158							params.validator_authority_keys[4].clone(),
1159							4.into(),
1160							Err(RequestError::NetworkError(RequestFailure::NotConnected)),
1161						))
1162						.boxed(),
1163					);
1164
1165					let (total_responses, error_count) = state
1166						.wait_for_chunks(
1167							"regular",
1168							&params,
1169							retry_threshold,
1170							&mut validators,
1171							&mut ongoing_reqs,
1172							&mut vec![],
1173							|_, _, _, _| false,
1174						)
1175						.await;
1176					assert_eq!(total_responses, 1);
1177					assert_eq!(error_count, 1);
1178					assert_eq!(state.chunk_count(), 2);
1179
1180					validators.pop_front();
1181					let mut expected_validators: VecDeque<_> = (5..params.n_validators as u32)
1182						.map(|i| (params.validator_authority_keys[i as usize].clone(), i.into()))
1183						.collect();
1184					expected_validators.push_back((
1185						Sr25519Keyring::AliceStash.public().into(),
1186						ValidatorIndex(params.n_validators as u32),
1187					));
1188
1189					assert_eq!(validators, expected_validators);
1190
1191					// Check that can_conclude returning true terminates the loop.
1192					let (total_responses, error_count) = state
1193						.wait_for_chunks(
1194							"regular",
1195							&params,
1196							retry_threshold,
1197							&mut validators,
1198							&mut ongoing_reqs,
1199							&mut vec![],
1200							|_, _, _, _| true,
1201						)
1202						.await;
1203					assert_eq!(total_responses, 0);
1204					assert_eq!(error_count, 0);
1205					assert_eq!(state.chunk_count(), 2);
1206
1207					assert_eq!(validators, expected_validators);
1208				},
1209			);
1210		}
1211
1212		// Complex scenario with backups in the backing group.
1213		{
1214			let mut params = params.clone();
1215			let chunks = params.create_chunks();
1216			let mut state = State::new();
1217			let mut ongoing_reqs = OngoingRequests::new();
1218			ongoing_reqs.push(
1219				future::ready((
1220					params.validator_authority_keys[0].clone(),
1221					0.into(),
1222					Ok((Some(chunks[0].clone()), "".into())),
1223				))
1224				.boxed(),
1225			);
1226			ongoing_reqs.soft_cancel();
1227			ongoing_reqs.push(
1228				future::ready((
1229					params.validator_authority_keys[1].clone(),
1230					1.into(),
1231					Ok((Some(chunks[1].clone()), "".into())),
1232				))
1233				.boxed(),
1234			);
1235			ongoing_reqs.push(
1236				future::ready((
1237					params.validator_authority_keys[2].clone(),
1238					2.into(),
1239					Ok((None, "".into())),
1240				))
1241				.boxed(),
1242			);
1243			ongoing_reqs.push(
1244				future::ready((
1245					params.validator_authority_keys[3].clone(),
1246					3.into(),
1247					Err(RequestError::from(DecodingError::from("err"))),
1248				))
1249				.boxed(),
1250			);
1251			ongoing_reqs.push(
1252				future::ready((
1253					params.validator_authority_keys[4].clone(),
1254					4.into(),
1255					Err(RequestError::NetworkError(RequestFailure::NotConnected)),
1256				))
1257				.boxed(),
1258			);
1259
1260			let mut validators: VecDeque<_> = (5..params.n_validators as u32)
1261				.map(|i| (params.validator_authority_keys[i as usize].clone(), i.into()))
1262				.collect();
1263			validators.push_back((
1264				Sr25519Keyring::Eve.public().into(),
1265				ValidatorIndex(params.n_validators as u32),
1266			));
1267
1268			let mut backup_backers = vec![
1269				params.validator_authority_keys[2].clone(),
1270				params.validator_authority_keys[0].clone(),
1271				params.validator_authority_keys[4].clone(),
1272				params.validator_authority_keys[3].clone(),
1273				Sr25519Keyring::AliceStash.public().into(),
1274				Sr25519Keyring::BobStash.public().into(),
1275			];
1276
1277			test_harness(
1278				|mut receiver: UnboundedReceiver<AllMessages>| async move {
1279					// Shouldn't send any requests.
1280					assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
1281				},
1282				|_| async move {
1283					let (total_responses, error_count) = state
1284						.wait_for_chunks(
1285							"regular",
1286							&params,
1287							retry_threshold,
1288							&mut validators,
1289							&mut ongoing_reqs,
1290							&mut backup_backers,
1291							|_, _, _, _| false,
1292						)
1293						.await;
1294					assert_eq!(total_responses, 5);
1295					assert_eq!(error_count, 3);
1296					assert_eq!(state.chunk_count(), 2);
1297
1298					let mut expected_validators: VecDeque<_> = (5..params.n_validators as u32)
1299						.map(|i| (params.validator_authority_keys[i as usize].clone(), i.into()))
1300						.collect();
1301					expected_validators.push_back((
1302						Sr25519Keyring::Eve.public().into(),
1303						ValidatorIndex(params.n_validators as u32),
1304					));
1305					// We picked a backer as a backup for chunks 2 and 3.
1306					expected_validators
1307						.push_front((params.validator_authority_keys[0].clone(), 2.into()));
1308					expected_validators
1309						.push_front((params.validator_authority_keys[2].clone(), 3.into()));
1310					expected_validators
1311						.push_front((params.validator_authority_keys[4].clone(), 4.into()));
1312
1313					assert_eq!(validators, expected_validators);
1314
1315					// This time we'll go over the recoverable error threshold for chunk 4.
1316					ongoing_reqs.push(
1317						future::ready((
1318							params.validator_authority_keys[4].clone(),
1319							4.into(),
1320							Err(RequestError::NetworkError(RequestFailure::NotConnected)),
1321						))
1322						.boxed(),
1323					);
1324
1325					validators.pop_front();
1326
1327					let (total_responses, error_count) = state
1328						.wait_for_chunks(
1329							"regular",
1330							&params,
1331							retry_threshold,
1332							&mut validators,
1333							&mut ongoing_reqs,
1334							&mut backup_backers,
1335							|_, _, _, _| false,
1336						)
1337						.await;
1338					assert_eq!(total_responses, 1);
1339					assert_eq!(error_count, 1);
1340					assert_eq!(state.chunk_count(), 2);
1341
1342					expected_validators.pop_front();
1343					expected_validators
1344						.push_front((Sr25519Keyring::AliceStash.public().into(), 4.into()));
1345
1346					assert_eq!(validators, expected_validators);
1347				},
1348			);
1349		}
1350	}
1351
1352	#[test]
1353	fn test_recovery_strategy_run() {
1354		let params = RecoveryParams::default();
1355
1356		struct GoodStrategy;
1357		#[async_trait::async_trait]
1358		impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender> for GoodStrategy {
1359			fn display_name(&self) -> &'static str {
1360				"GoodStrategy"
1361			}
1362
1363			fn strategy_type(&self) -> &'static str {
1364				"good_strategy"
1365			}
1366
1367			async fn run(
1368				mut self: Box<Self>,
1369				_state: &mut State,
1370				_sender: &mut Sender,
1371				_common_params: &RecoveryParams,
1372			) -> Result<AvailableData, RecoveryError> {
1373				Ok(dummy_available_data())
1374			}
1375		}
1376
1377		struct UnavailableStrategy;
1378		#[async_trait::async_trait]
1379		impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender>
1380			for UnavailableStrategy
1381		{
1382			fn display_name(&self) -> &'static str {
1383				"UnavailableStrategy"
1384			}
1385
1386			fn strategy_type(&self) -> &'static str {
1387				"unavailable_strategy"
1388			}
1389
1390			async fn run(
1391				mut self: Box<Self>,
1392				_state: &mut State,
1393				_sender: &mut Sender,
1394				_common_params: &RecoveryParams,
1395			) -> Result<AvailableData, RecoveryError> {
1396				Err(RecoveryError::Unavailable)
1397			}
1398		}
1399
1400		struct InvalidStrategy;
1401		#[async_trait::async_trait]
1402		impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender>
1403			for InvalidStrategy
1404		{
1405			fn display_name(&self) -> &'static str {
1406				"InvalidStrategy"
1407			}
1408
1409			fn strategy_type(&self) -> &'static str {
1410				"invalid_strategy"
1411			}
1412
1413			async fn run(
1414				mut self: Box<Self>,
1415				_state: &mut State,
1416				_sender: &mut Sender,
1417				_common_params: &RecoveryParams,
1418			) -> Result<AvailableData, RecoveryError> {
1419				Err(RecoveryError::Invalid)
1420			}
1421		}
1422
1423		// No recovery strategies.
1424		{
1425			let mut params = params.clone();
1426			let strategies = VecDeque::new();
1427			params.bypass_availability_store = true;
1428
1429			test_harness(
1430				|mut receiver: UnboundedReceiver<AllMessages>| async move {
1431					// Shouldn't send any requests.
1432					assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
1433				},
1434				|sender| async move {
1435					let task = RecoveryTask::new(sender, params, strategies);
1436
1437					assert_eq!(task.run().await.unwrap_err(), RecoveryError::Unavailable);
1438				},
1439			);
1440		}
1441
1442		// If we have the data in av-store, returns early.
1443		{
1444			let params = params.clone();
1445			let strategies = VecDeque::new();
1446			let candidate_hash = params.candidate_hash;
1447
1448			test_harness(
1449				|mut receiver: UnboundedReceiver<AllMessages>| async move {
1450					assert_matches!(
1451					receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
1452					AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryAvailableData(hash, tx)) => {
1453						assert_eq!(hash, candidate_hash);
1454						tx.send(Some(dummy_available_data())).unwrap();
1455					});
1456				},
1457				|sender| async move {
1458					let task = RecoveryTask::new(sender, params, strategies);
1459
1460					assert_eq!(task.run().await.unwrap(), dummy_available_data());
1461				},
1462			);
1463		}
1464
1465		// Strategy returning `RecoveryError::Invalid`` will short-circuit the entire task.
1466		{
1467			let mut params = params.clone();
1468			params.bypass_availability_store = true;
1469			let mut strategies: VecDeque<Box<dyn RecoveryStrategy<TestSubsystemSender>>> =
1470				VecDeque::new();
1471			strategies.push_back(Box::new(InvalidStrategy));
1472			strategies.push_back(Box::new(GoodStrategy));
1473
1474			test_harness(
1475				|mut receiver: UnboundedReceiver<AllMessages>| async move {
1476					// Shouldn't send any requests.
1477					assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
1478				},
1479				|sender| async move {
1480					let task = RecoveryTask::new(sender, params, strategies);
1481
1482					assert_eq!(task.run().await.unwrap_err(), RecoveryError::Invalid);
1483				},
1484			);
1485		}
1486
1487		// Strategy returning `Unavailable` will fall back to the next one.
1488		{
1489			let params = params.clone();
1490			let candidate_hash = params.candidate_hash;
1491			let mut strategies: VecDeque<Box<dyn RecoveryStrategy<TestSubsystemSender>>> =
1492				VecDeque::new();
1493			strategies.push_back(Box::new(UnavailableStrategy));
1494			strategies.push_back(Box::new(GoodStrategy));
1495
1496			test_harness(
1497				|mut receiver: UnboundedReceiver<AllMessages>| async move {
1498					assert_matches!(
1499						receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
1500						AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryAvailableData(hash, tx)) => {
1501							assert_eq!(hash, candidate_hash);
1502							tx.send(Some(dummy_available_data())).unwrap();
1503					});
1504				},
1505				|sender| async move {
1506					let task = RecoveryTask::new(sender, params, strategies);
1507
1508					assert_eq!(task.run().await.unwrap(), dummy_available_data());
1509				},
1510			);
1511		}
1512
1513		// More complex scenario.
1514		{
1515			let params = params.clone();
1516			let candidate_hash = params.candidate_hash;
1517			let mut strategies: VecDeque<Box<dyn RecoveryStrategy<TestSubsystemSender>>> =
1518				VecDeque::new();
1519			strategies.push_back(Box::new(UnavailableStrategy));
1520			strategies.push_back(Box::new(UnavailableStrategy));
1521			strategies.push_back(Box::new(GoodStrategy));
1522			strategies.push_back(Box::new(InvalidStrategy));
1523
1524			test_harness(
1525				|mut receiver: UnboundedReceiver<AllMessages>| async move {
1526					assert_matches!(
1527						receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
1528						AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryAvailableData(hash, tx)) => {
1529							assert_eq!(hash, candidate_hash);
1530							tx.send(Some(dummy_available_data())).unwrap();
1531					});
1532				},
1533				|sender| async move {
1534					let task = RecoveryTask::new(sender, params, strategies);
1535
1536					assert_eq!(task.run().await.unwrap(), dummy_available_data());
1537				},
1538			);
1539		}
1540	}
1541
1542	#[test]
1543	fn test_is_unavailable() {
1544		assert_eq!(is_unavailable(0, 0, 0, 0), false);
1545		assert_eq!(is_unavailable(2, 2, 2, 0), false);
1546		// Already reached the threshold.
1547		assert_eq!(is_unavailable(3, 0, 10, 3), false);
1548		assert_eq!(is_unavailable(3, 2, 0, 3), false);
1549		assert_eq!(is_unavailable(3, 2, 10, 3), false);
1550		// It's still possible to reach the threshold
1551		assert_eq!(is_unavailable(0, 0, 10, 3), false);
1552		assert_eq!(is_unavailable(0, 0, 3, 3), false);
1553		assert_eq!(is_unavailable(1, 1, 1, 3), false);
1554		// Not possible to reach the threshold
1555		assert_eq!(is_unavailable(0, 0, 0, 3), true);
1556		assert_eq!(is_unavailable(2, 3, 2, 10), true);
1557	}
1558}