referrerpolicy=no-referrer-when-downgrade

polkadot_availability_recovery/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Availability Recovery Subsystem of Polkadot.
18
19#![warn(missing_docs)]
20
21use std::{
22	collections::{BTreeMap, VecDeque},
23	iter::Iterator,
24	num::NonZeroUsize,
25	pin::Pin,
26};
27
28use futures::{
29	channel::oneshot,
30	future::{Future, FutureExt, RemoteHandle},
31	pin_mut,
32	prelude::*,
33	sink::SinkExt,
34	stream::{FuturesUnordered, StreamExt},
35	task::{Context, Poll},
36};
37use sc_network::ProtocolName;
38use schnellru::{ByLength, LruMap};
39use task::{
40	FetchChunks, FetchChunksParams, FetchFull, FetchFullParams, FetchSystematicChunks,
41	FetchSystematicChunksParams,
42};
43
44use polkadot_erasure_coding::{
45	branches, obtain_chunks_v1, recovery_threshold, systematic_recovery_threshold,
46	Error as ErasureEncodingError,
47};
48use task::{RecoveryParams, RecoveryStrategy, RecoveryTask};
49
50use error::{log_error, Error, FatalError, Result};
51use polkadot_node_network_protocol::{
52	request_response::{
53		v1 as request_v1, v2 as request_v2, IncomingRequestReceiver, IsRequest, ReqProtocolNames,
54	},
55	UnifiedReputationChange as Rep,
56};
57use polkadot_node_primitives::AvailableData;
58use polkadot_node_subsystem::{
59	errors::RecoveryError,
60	messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage},
61	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
62	SubsystemContext, SubsystemError,
63};
64use polkadot_node_subsystem_util::{
65	availability_chunks::availability_chunk_indices,
66	runtime::{ExtendedSessionInfo, RuntimeInfo},
67};
68use polkadot_primitives::{
69	node_features, BlockNumber, CandidateHash, CandidateReceiptV2 as CandidateReceipt, ChunkIndex,
70	CoreIndex, GroupIndex, Hash, SessionIndex, ValidatorIndex,
71};
72
73mod error;
74mod futures_undead;
75mod metrics;
76mod task;
77pub use metrics::Metrics;
78
79#[cfg(test)]
80mod tests;
81
82type RecoveryResult = std::result::Result<AvailableData, RecoveryError>;
83
84const LOG_TARGET: &str = "parachain::availability-recovery";
85
86// Size of the LRU cache where we keep recovered data.
87const LRU_SIZE: u32 = 16;
88
89const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
90
91/// PoV size limit in bytes for which prefer fetching from backers. (conservative, Polkadot for now)
92pub(crate) const CONSERVATIVE_FETCH_CHUNKS_THRESHOLD: usize = 1 * 1024 * 1024;
93/// PoV size limit in bytes for which prefer fetching from backers. (Kusama and all testnets)
94pub const FETCH_CHUNKS_THRESHOLD: usize = 4 * 1024 * 1024;
95
96#[derive(Clone, PartialEq)]
97/// The strategy we use to recover the PoV.
98pub enum RecoveryStrategyKind {
99	/// We try the backing group first if PoV size is lower than specified, then fallback to
100	/// validator chunks.
101	BackersFirstIfSizeLower(usize),
102	/// We try the backing group first if PoV size is lower than specified, then fallback to
103	/// systematic chunks. Regular chunk recovery as a last resort.
104	BackersFirstIfSizeLowerThenSystematicChunks(usize),
105
106	/// The following variants are only helpful for integration tests.
107	///
108	/// We always try the backing group first, then fallback to validator chunks.
109	#[allow(dead_code)]
110	BackersFirstAlways,
111	/// We always recover using validator chunks.
112	#[allow(dead_code)]
113	ChunksAlways,
114	/// First try the backing group. Then systematic chunks.
115	#[allow(dead_code)]
116	BackersThenSystematicChunks,
117	/// Always recover using systematic chunks, fall back to regular chunks.
118	#[allow(dead_code)]
119	SystematicChunks,
120}
121
122/// The Availability Recovery Subsystem.
123pub struct AvailabilityRecoverySubsystem {
124	/// PoV recovery strategy to use.
125	recovery_strategy_kind: RecoveryStrategyKind,
126	// If this is true, do not request data from the availability store.
127	/// This is the useful for nodes where the
128	/// availability-store subsystem is not expected to run,
129	/// such as collators.
130	bypass_availability_store: bool,
131	/// Receiver for available data requests.
132	req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
133	/// Metrics for this subsystem.
134	metrics: Metrics,
135	/// The type of check to perform after available data was recovered.
136	post_recovery_check: PostRecoveryCheck,
137	/// Full protocol name for ChunkFetchingV1.
138	req_v1_protocol_name: ProtocolName,
139	/// Full protocol name for ChunkFetchingV2.
140	req_v2_protocol_name: ProtocolName,
141}
142
143#[derive(Clone, PartialEq, Debug)]
144/// The type of check to perform after available data was recovered.
145enum PostRecoveryCheck {
146	/// Reencode the data and check erasure root. For validators.
147	Reencode,
148	/// Only check the pov hash. For collators only.
149	PovHash,
150}
151
152/// Expensive erasure coding computations that we want to run on a blocking thread.
153enum ErasureTask {
154	/// Reconstructs `AvailableData` from chunks given `n_validators`.
155	Reconstruct(
156		usize,
157		BTreeMap<ChunkIndex, Vec<u8>>,
158		oneshot::Sender<std::result::Result<AvailableData, ErasureEncodingError>>,
159	),
160	/// Re-encode `AvailableData` into erasure chunks in order to verify the provided root hash of
161	/// the Merkle tree.
162	Reencode(usize, Hash, AvailableData, oneshot::Sender<Option<AvailableData>>),
163}
164
165/// Re-encode the data into erasure chunks in order to verify
166/// the root hash of the provided Merkle tree, which is built
167/// on-top of the encoded chunks.
168///
169/// This (expensive) check is necessary, as otherwise we can't be sure that some chunks won't have
170/// been tampered with by the backers, which would result in some validators considering the data
171/// valid and some invalid as having fetched different set of chunks. The checking of the Merkle
172/// proof for individual chunks only gives us guarantees, that we have fetched a chunk belonging to
173/// a set the backers have committed to.
174///
175/// NOTE: It is fine to do this check with already decoded data, because if the decoding failed for
176/// some validators, we can be sure that chunks have been tampered with (by the backers) or the
177/// data was invalid to begin with. In the former case, validators fetching valid chunks will see
178/// invalid data as well, because the root won't match. In the latter case the situation is the
179/// same for anyone anyways.
180fn reconstructed_data_matches_root(
181	n_validators: usize,
182	expected_root: &Hash,
183	data: &AvailableData,
184	metrics: &Metrics,
185) -> bool {
186	let _timer = metrics.time_reencode_chunks();
187
188	let chunks = match obtain_chunks_v1(n_validators, data) {
189		Ok(chunks) => chunks,
190		Err(e) => {
191			gum::debug!(
192				target: LOG_TARGET,
193				err = ?e,
194				"Failed to obtain chunks",
195			);
196			return false
197		},
198	};
199
200	let branches = branches(&chunks);
201
202	branches.root() == *expected_root
203}
204
205/// Accumulate all awaiting sides for some particular `AvailableData`.
206struct RecoveryHandle {
207	candidate_hash: CandidateHash,
208	remote: RemoteHandle<RecoveryResult>,
209	awaiting: Vec<oneshot::Sender<RecoveryResult>>,
210}
211
212impl Future for RecoveryHandle {
213	type Output = Option<(CandidateHash, RecoveryResult)>;
214
215	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
216		let mut indices_to_remove = Vec::new();
217		for (i, awaiting) in self.awaiting.iter_mut().enumerate().rev() {
218			if let Poll::Ready(()) = awaiting.poll_canceled(cx) {
219				indices_to_remove.push(i);
220			}
221		}
222
223		// these are reverse order, so remove is fine.
224		for index in indices_to_remove {
225			gum::debug!(
226				target: LOG_TARGET,
227				candidate_hash = ?self.candidate_hash,
228				"Receiver for available data dropped.",
229			);
230
231			self.awaiting.swap_remove(index);
232		}
233
234		if self.awaiting.is_empty() {
235			gum::debug!(
236				target: LOG_TARGET,
237				candidate_hash = ?self.candidate_hash,
238				"All receivers for available data dropped.",
239			);
240
241			return Poll::Ready(None)
242		}
243
244		let remote = &mut self.remote;
245		futures::pin_mut!(remote);
246		let result = futures::ready!(remote.poll(cx));
247
248		for awaiting in self.awaiting.drain(..) {
249			let _ = awaiting.send(result.clone());
250		}
251
252		Poll::Ready(Some((self.candidate_hash, result)))
253	}
254}
255
256/// Cached result of an availability recovery operation.
257#[derive(Debug, Clone)]
258enum CachedRecovery {
259	/// Availability was successfully retrieved before.
260	Valid(AvailableData),
261	/// Availability was successfully retrieved before, but was found to be invalid.
262	Invalid,
263}
264
265impl CachedRecovery {
266	/// Convert back to	`Result` to deliver responses.
267	fn into_result(self) -> RecoveryResult {
268		match self {
269			Self::Valid(d) => Ok(d),
270			Self::Invalid => Err(RecoveryError::Invalid),
271		}
272	}
273}
274
275impl TryFrom<RecoveryResult> for CachedRecovery {
276	type Error = ();
277	fn try_from(o: RecoveryResult) -> std::result::Result<CachedRecovery, Self::Error> {
278		match o {
279			Ok(d) => Ok(Self::Valid(d)),
280			Err(RecoveryError::Invalid) => Ok(Self::Invalid),
281			// We don't want to cache unavailable state, as that state might change, so if
282			// requested again we want to try again!
283			Err(RecoveryError::Unavailable) => Err(()),
284			Err(RecoveryError::ChannelClosed) => Err(()),
285		}
286	}
287}
288
289struct State {
290	/// Each recovery task is implemented as its own async task,
291	/// and these handles are for communicating with them.
292	ongoing_recoveries: FuturesUnordered<RecoveryHandle>,
293
294	/// A recent block hash for which state should be available.
295	live_block: (BlockNumber, Hash),
296
297	/// An LRU cache of recently recovered data.
298	availability_lru: LruMap<CandidateHash, CachedRecovery>,
299
300	/// Cached runtime info.
301	runtime_info: RuntimeInfo,
302}
303
304impl Default for State {
305	fn default() -> Self {
306		Self {
307			ongoing_recoveries: FuturesUnordered::new(),
308			live_block: (0, Hash::default()),
309			availability_lru: LruMap::new(ByLength::new(LRU_SIZE)),
310			runtime_info: RuntimeInfo::new(None),
311		}
312	}
313}
314
315#[overseer::subsystem(AvailabilityRecovery, error=SubsystemError, prefix=self::overseer)]
316impl<Context> AvailabilityRecoverySubsystem {
317	fn start(self, ctx: Context) -> SpawnedSubsystem {
318		let future = self
319			.run(ctx)
320			.map_err(|e| SubsystemError::with_origin("availability-recovery", e))
321			.boxed();
322		SpawnedSubsystem { name: "availability-recovery-subsystem", future }
323	}
324}
325
326/// Handles a signal from the overseer.
327/// Returns true if subsystem receives a deadly signal.
328async fn handle_signal(state: &mut State, signal: OverseerSignal) -> bool {
329	match signal {
330		OverseerSignal::Conclude => true,
331		OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. }) => {
332			// if activated is non-empty, set state.live_block to the highest block in `activated`
333			if let Some(activated) = activated {
334				if activated.number > state.live_block.0 {
335					state.live_block = (activated.number, activated.hash)
336				}
337			}
338
339			false
340		},
341		OverseerSignal::BlockFinalized(_, _) => false,
342	}
343}
344
345/// Machinery around launching recovery tasks into the background.
346#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
347async fn launch_recovery_task<Context>(
348	state: &mut State,
349	ctx: &mut Context,
350	response_sender: oneshot::Sender<RecoveryResult>,
351	recovery_strategies: VecDeque<Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>>,
352	params: RecoveryParams,
353) -> Result<()> {
354	let candidate_hash = params.candidate_hash;
355	let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategies);
356
357	let (remote, remote_handle) = recovery_task.run().remote_handle();
358
359	state.ongoing_recoveries.push(RecoveryHandle {
360		candidate_hash,
361		remote: remote_handle,
362		awaiting: vec![response_sender],
363	});
364
365	ctx.spawn("recovery-task", Box::pin(remote))
366		.map_err(|err| Error::SpawnTask(err))
367}
368
369/// Handles an availability recovery request.
370#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
371async fn handle_recover<Context>(
372	state: &mut State,
373	ctx: &mut Context,
374	receipt: CandidateReceipt,
375	session_index: SessionIndex,
376	backing_group: Option<GroupIndex>,
377	response_sender: oneshot::Sender<RecoveryResult>,
378	metrics: &Metrics,
379	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
380	recovery_strategy_kind: RecoveryStrategyKind,
381	bypass_availability_store: bool,
382	post_recovery_check: PostRecoveryCheck,
383	maybe_core_index: Option<CoreIndex>,
384	req_v1_protocol_name: ProtocolName,
385	req_v2_protocol_name: ProtocolName,
386) -> Result<()> {
387	let candidate_hash = receipt.hash();
388
389	if let Some(result) =
390		state.availability_lru.get(&candidate_hash).cloned().map(|v| v.into_result())
391	{
392		return response_sender.send(result).map_err(|_| Error::CanceledResponseSender)
393	}
394
395	if let Some(i) =
396		state.ongoing_recoveries.iter_mut().find(|i| i.candidate_hash == candidate_hash)
397	{
398		i.awaiting.push(response_sender);
399		return Ok(())
400	}
401
402	let session_info_res = state
403		.runtime_info
404		.get_session_info_by_index(ctx.sender(), state.live_block.1, session_index)
405		.await;
406
407	match session_info_res {
408		Ok(ExtendedSessionInfo { session_info, node_features, .. }) => {
409			let mut backer_group = None;
410			let n_validators = session_info.validators.len();
411			let systematic_threshold = systematic_recovery_threshold(n_validators)?;
412			let mut recovery_strategies: VecDeque<
413				Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>,
414			> = VecDeque::with_capacity(3);
415
416			if let Some(backing_group) = backing_group {
417				if let Some(backing_validators) = session_info.validator_groups.get(backing_group) {
418					let mut small_pov_size = true;
419
420					match recovery_strategy_kind {
421						RecoveryStrategyKind::BackersFirstIfSizeLower(fetch_chunks_threshold) |
422						RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(
423							fetch_chunks_threshold,
424						) => {
425							// Get our own chunk size to get an estimate of the PoV size.
426							let chunk_size: Result<Option<usize>> =
427								query_chunk_size(ctx, candidate_hash).await;
428							if let Ok(Some(chunk_size)) = chunk_size {
429								let pov_size_estimate = chunk_size * systematic_threshold;
430								small_pov_size = pov_size_estimate < fetch_chunks_threshold;
431
432								if small_pov_size {
433									gum::trace!(
434										target: LOG_TARGET,
435										?candidate_hash,
436										pov_size_estimate,
437										fetch_chunks_threshold,
438										"Prefer fetch from backing group",
439									);
440								}
441							} else {
442								// we have a POV limit but were not able to query the chunk size, so
443								// don't use the backing group.
444								small_pov_size = false;
445							}
446						},
447						_ => {},
448					};
449
450					match (&recovery_strategy_kind, small_pov_size) {
451						(RecoveryStrategyKind::BackersFirstAlways, _) |
452						(RecoveryStrategyKind::BackersFirstIfSizeLower(_), true) |
453						(
454							RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(_),
455							true,
456						) |
457						(RecoveryStrategyKind::BackersThenSystematicChunks, _) =>
458							recovery_strategies.push_back(Box::new(FetchFull::new(
459								FetchFullParams { validators: backing_validators.to_vec() },
460							))),
461						_ => {},
462					};
463
464					backer_group = Some(backing_validators);
465				}
466			}
467
468			let chunk_mapping_enabled = if let Some(&true) = node_features
469				.get(usize::from(node_features::FeatureIndex::AvailabilityChunkMapping as u8))
470				.as_deref()
471			{
472				true
473			} else {
474				false
475			};
476
477			// We can only attempt systematic recovery if we received the core index of the
478			// candidate and chunk mapping is enabled.
479			if let Some(core_index) = maybe_core_index {
480				if matches!(
481					recovery_strategy_kind,
482					RecoveryStrategyKind::BackersThenSystematicChunks |
483						RecoveryStrategyKind::SystematicChunks |
484						RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(_)
485				) && chunk_mapping_enabled
486				{
487					let chunk_indices =
488						availability_chunk_indices(node_features, n_validators, core_index)?;
489
490					let chunk_indices: VecDeque<_> = chunk_indices
491						.iter()
492						.enumerate()
493						.map(|(v_index, c_index)| {
494							(
495								*c_index,
496								ValidatorIndex(
497									u32::try_from(v_index)
498										.expect("validator count should not exceed u32"),
499								),
500							)
501						})
502						.collect();
503
504					// Only get the validators according to the threshold.
505					let validators = chunk_indices
506						.clone()
507						.into_iter()
508						.filter(|(c_index, _)| {
509							usize::try_from(c_index.0)
510								.expect("usize is at least u32 bytes on all modern targets.") <
511								systematic_threshold
512						})
513						.collect();
514
515					recovery_strategies.push_back(Box::new(FetchSystematicChunks::new(
516						FetchSystematicChunksParams {
517							validators,
518							backers: backer_group.map(|v| v.to_vec()).unwrap_or_else(|| vec![]),
519						},
520					)));
521				}
522			}
523
524			recovery_strategies.push_back(Box::new(FetchChunks::new(FetchChunksParams {
525				n_validators: session_info.validators.len(),
526			})));
527
528			let session_info = session_info.clone();
529
530			let n_validators = session_info.validators.len();
531
532			launch_recovery_task(
533				state,
534				ctx,
535				response_sender,
536				recovery_strategies,
537				RecoveryParams {
538					validator_authority_keys: session_info.discovery_keys.clone(),
539					n_validators,
540					threshold: recovery_threshold(n_validators)?,
541					systematic_threshold,
542					candidate_hash,
543					erasure_root: receipt.descriptor.erasure_root(),
544					metrics: metrics.clone(),
545					bypass_availability_store,
546					post_recovery_check,
547					pov_hash: receipt.descriptor.pov_hash(),
548					req_v1_protocol_name,
549					req_v2_protocol_name,
550					chunk_mapping_enabled,
551					erasure_task_tx,
552				},
553			)
554			.await
555		},
556		Err(_) => {
557			response_sender
558				.send(Err(RecoveryError::Unavailable))
559				.map_err(|_| Error::CanceledResponseSender)?;
560
561			Err(Error::SessionInfoUnavailable(state.live_block.1))
562		},
563	}
564}
565
566/// Queries the full `AvailableData` from av-store.
567#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
568async fn query_full_data<Context>(
569	ctx: &mut Context,
570	candidate_hash: CandidateHash,
571) -> Result<Option<AvailableData>> {
572	let (tx, rx) = oneshot::channel();
573	ctx.send_message(AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx))
574		.await;
575
576	rx.await.map_err(Error::CanceledQueryFullData)
577}
578
579/// Queries a chunk from av-store.
580#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
581async fn query_chunk_size<Context>(
582	ctx: &mut Context,
583	candidate_hash: CandidateHash,
584) -> Result<Option<usize>> {
585	let (tx, rx) = oneshot::channel();
586	ctx.send_message(AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx))
587		.await;
588
589	rx.await.map_err(Error::CanceledQueryFullData)
590}
591
592#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
593impl AvailabilityRecoverySubsystem {
594	/// Create a new instance of `AvailabilityRecoverySubsystem` suitable for collator nodes,
595	/// which never requests the `AvailabilityStoreSubsystem` subsystem and only checks the POV hash
596	/// instead of reencoding the available data.
597	pub fn for_collator(
598		fetch_chunks_threshold: Option<usize>,
599		req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
600		req_protocol_names: &ReqProtocolNames,
601		metrics: Metrics,
602	) -> Self {
603		Self {
604			recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(
605				fetch_chunks_threshold.unwrap_or(CONSERVATIVE_FETCH_CHUNKS_THRESHOLD),
606			),
607			bypass_availability_store: true,
608			post_recovery_check: PostRecoveryCheck::PovHash,
609			req_receiver,
610			metrics,
611			req_v1_protocol_name: req_protocol_names
612				.get_name(request_v1::ChunkFetchingRequest::PROTOCOL),
613			req_v2_protocol_name: req_protocol_names
614				.get_name(request_v2::ChunkFetchingRequest::PROTOCOL),
615		}
616	}
617
618	/// Create an optimised new instance of `AvailabilityRecoverySubsystem` suitable for validator
619	/// nodes, which:
620	/// - for small POVs (over the `fetch_chunks_threshold` or the
621	///   `CONSERVATIVE_FETCH_CHUNKS_THRESHOLD`), it attempts full recovery from backers, if backing
622	///   group supplied.
623	/// - for large POVs, attempts systematic recovery, if core_index supplied and
624	///   AvailabilityChunkMapping node feature is enabled.
625	/// - as a last resort, attempt regular chunk recovery from all validators.
626	pub fn for_validator(
627		fetch_chunks_threshold: Option<usize>,
628		req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
629		req_protocol_names: &ReqProtocolNames,
630		metrics: Metrics,
631	) -> Self {
632		Self {
633			recovery_strategy_kind:
634				RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(
635					fetch_chunks_threshold.unwrap_or(CONSERVATIVE_FETCH_CHUNKS_THRESHOLD),
636				),
637			bypass_availability_store: false,
638			post_recovery_check: PostRecoveryCheck::Reencode,
639			req_receiver,
640			metrics,
641			req_v1_protocol_name: req_protocol_names
642				.get_name(request_v1::ChunkFetchingRequest::PROTOCOL),
643			req_v2_protocol_name: req_protocol_names
644				.get_name(request_v2::ChunkFetchingRequest::PROTOCOL),
645		}
646	}
647
648	/// Customise the recovery strategy kind
649	/// Currently only useful for tests.
650	#[cfg(any(test, feature = "subsystem-benchmarks"))]
651	pub fn with_recovery_strategy_kind(
652		req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
653		req_protocol_names: &ReqProtocolNames,
654		metrics: Metrics,
655		recovery_strategy_kind: RecoveryStrategyKind,
656	) -> Self {
657		Self {
658			recovery_strategy_kind,
659			bypass_availability_store: false,
660			post_recovery_check: PostRecoveryCheck::Reencode,
661			req_receiver,
662			metrics,
663			req_v1_protocol_name: req_protocol_names
664				.get_name(request_v1::ChunkFetchingRequest::PROTOCOL),
665			req_v2_protocol_name: req_protocol_names
666				.get_name(request_v2::ChunkFetchingRequest::PROTOCOL),
667		}
668	}
669
670	/// Starts the inner subsystem loop.
671	pub async fn run<Context>(self, mut ctx: Context) -> std::result::Result<(), FatalError> {
672		let mut state = State::default();
673		let Self {
674			mut req_receiver,
675			metrics,
676			recovery_strategy_kind,
677			bypass_availability_store,
678			post_recovery_check,
679			req_v1_protocol_name,
680			req_v2_protocol_name,
681		} = self;
682
683		let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16);
684		let mut erasure_task_rx = erasure_task_rx.fuse();
685
686		// `ThreadPoolBuilder` spawns the tasks using `spawn_blocking`. For each worker there will
687		// be a `mpsc` channel created. Each of these workers take the `Receiver` and poll it in an
688		// infinite loop. All of the sender ends of the channel are sent as a vec which we then use
689		// to create a `Cycle` iterator. We use this iterator to assign work in a round-robin
690		// fashion to the workers in the pool.
691		//
692		// How work is dispatched to the pool from the recovery tasks:
693		// - Once a recovery task finishes retrieving the availability data, it needs to reconstruct
694		//   from chunks and/or
695		// re-encode the data which are heavy CPU computations.
696		// To do so it sends an `ErasureTask` to the main loop via the `erasure_task` channel, and
697		// waits for the results over a `oneshot` channel.
698		// - In the subsystem main loop we poll the `erasure_task_rx` receiver.
699		// - We forward the received `ErasureTask` to the `next()` sender yielded by the `Cycle`
700		//   iterator.
701		// - Some worker thread handles it and sends the response over the `oneshot` channel.
702
703		// Create a thread pool with 2 workers.
704		let mut to_pool = ThreadPoolBuilder::build(
705			// Pool is guaranteed to have at least 1 worker thread.
706			NonZeroUsize::new(2).expect("There are 2 threads; qed"),
707			metrics.clone(),
708			&mut ctx,
709		)
710		.into_iter()
711		.cycle();
712
713		loop {
714			let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
715			pin_mut!(recv_req);
716			let res = futures::select! {
717				erasure_task = erasure_task_rx.next() => {
718					match erasure_task {
719						Some(task) => {
720							to_pool
721								.next()
722								.expect("Pool size is `NonZeroUsize`; qed")
723								.send(task)
724								.await
725								.map_err(|_| RecoveryError::ChannelClosed)
726						},
727						None => {
728							Err(RecoveryError::ChannelClosed)
729						}
730					}.map_err(Into::into)
731				}
732				signal = ctx.recv().fuse() => {
733					match signal {
734						Ok(signal) => {
735							match signal {
736								FromOrchestra::Signal(signal) => if handle_signal(
737									&mut state,
738									signal,
739								).await {
740									gum::debug!(target: LOG_TARGET, "subsystem concluded");
741									return Ok(());
742								} else {
743									Ok(())
744								},
745								FromOrchestra::Communication {
746									msg: AvailabilityRecoveryMessage::RecoverAvailableData(
747										receipt,
748										session_index,
749										maybe_backing_group,
750										maybe_core_index,
751										response_sender,
752									)
753								} => handle_recover(
754										&mut state,
755										&mut ctx,
756										receipt,
757										session_index,
758										maybe_backing_group,
759										response_sender,
760										&metrics,
761										erasure_task_tx.clone(),
762										recovery_strategy_kind.clone(),
763										bypass_availability_store,
764										post_recovery_check.clone(),
765										maybe_core_index,
766										req_v1_protocol_name.clone(),
767										req_v2_protocol_name.clone(),
768									).await
769							}
770						},
771						Err(e) => Err(Error::SubsystemReceive(e))
772					}
773				}
774				in_req = recv_req => {
775					match in_req {
776						Ok(req) => {
777							if bypass_availability_store {
778								gum::debug!(
779									target: LOG_TARGET,
780									"Skipping request to availability-store.",
781								);
782								let _ = req.send_response(None.into());
783								Ok(())
784							} else {
785								match query_full_data(&mut ctx, req.payload.candidate_hash).await {
786									Ok(res) => {
787										let _ = req.send_response(res.into());
788										Ok(())
789									}
790									Err(e) => {
791										let _ = req.send_response(None.into());
792										Err(e)
793									}
794								}
795							}
796						}
797						Err(e) => Err(Error::IncomingRequest(e))
798					}
799				}
800				output = state.ongoing_recoveries.select_next_some() => {
801					let mut res = Ok(());
802					if let Some((candidate_hash, result)) = output {
803						if let Err(ref e) = result {
804							res = Err(Error::Recovery(e.clone()));
805						}
806
807						if let Ok(recovery) = CachedRecovery::try_from(result) {
808							state.availability_lru.insert(candidate_hash, recovery);
809						}
810					}
811
812					res
813				}
814			};
815
816			// Only bubble up fatal errors, but log all of them.
817			if let Err(e) = res {
818				log_error(Err(e))?;
819			}
820		}
821	}
822}
823
824// A simple thread pool implementation using `spawn_blocking` threads.
825struct ThreadPoolBuilder;
826
827const MAX_THREADS: NonZeroUsize = match NonZeroUsize::new(4) {
828	Some(max_threads) => max_threads,
829	None => panic!("MAX_THREADS must be non-zero"),
830};
831
832impl ThreadPoolBuilder {
833	// Creates a pool of `size` workers, where 1 <= `size` <= `MAX_THREADS`.
834	//
835	// Each worker is created by `spawn_blocking` and takes the receiver side of a channel
836	// while all of the senders are returned to the caller. Each worker runs `erasure_task_thread`
837	// that polls the `Receiver` for an `ErasureTask` which is expected to be CPU intensive. The
838	// larger the input (more or larger chunks/availability data), the more CPU cycles will be
839	// spent.
840	//
841	// For example, for 32KB PoVs, we'd expect re-encode to eat as much as 90ms and 500ms for
842	// 2.5MiB.
843	//
844	// After executing such a task, the worker sends the response via a provided `oneshot` sender.
845	//
846	// The caller is responsible for routing work to the workers.
847	#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
848	pub fn build<Context>(
849		size: NonZeroUsize,
850		metrics: Metrics,
851		ctx: &mut Context,
852	) -> Vec<futures::channel::mpsc::Sender<ErasureTask>> {
853		// At least 1 task, at most `MAX_THREADS.
854		let size = std::cmp::min(size, MAX_THREADS);
855		let mut senders = Vec::new();
856
857		for index in 0..size.into() {
858			let (tx, rx) = futures::channel::mpsc::channel(8);
859			senders.push(tx);
860
861			if let Err(e) = ctx
862				.spawn_blocking("erasure-task", Box::pin(erasure_task_thread(metrics.clone(), rx)))
863			{
864				gum::warn!(
865					target: LOG_TARGET,
866					err = ?e,
867					index,
868					"Failed to spawn a erasure task",
869				);
870			}
871		}
872		senders
873	}
874}
875
876// Handles CPU intensive operation on a dedicated blocking thread.
877async fn erasure_task_thread(
878	metrics: Metrics,
879	mut ingress: futures::channel::mpsc::Receiver<ErasureTask>,
880) {
881	loop {
882		match ingress.next().await {
883			Some(ErasureTask::Reconstruct(n_validators, chunks, sender)) => {
884				let _ = sender.send(polkadot_erasure_coding::reconstruct_v1(
885					n_validators,
886					chunks.iter().map(|(c_index, chunk)| {
887						(
888							&chunk[..],
889							usize::try_from(c_index.0)
890								.expect("usize is at least u32 bytes on all modern targets."),
891						)
892					}),
893				));
894			},
895			Some(ErasureTask::Reencode(n_validators, root, available_data, sender)) => {
896				let metrics = metrics.clone();
897
898				let maybe_data = if reconstructed_data_matches_root(
899					n_validators,
900					&root,
901					&available_data,
902					&metrics,
903				) {
904					Some(available_data)
905				} else {
906					None
907				};
908
909				let _ = sender.send(maybe_data);
910			},
911			None => {
912				gum::trace!(
913					target: LOG_TARGET,
914					"Erasure task channel closed. Node shutting down ?",
915				);
916				break
917			},
918		}
919
920		// In benchmarks this is a very hot loop not yielding at all.
921		// To update CPU metrics for the task we need to yield.
922		#[cfg(feature = "subsystem-benchmarks")]
923		tokio::task::yield_now().await;
924	}
925}