referrerpolicy=no-referrer-when-downgrade

polkadot_availability_recovery/task/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Main recovery task logic. Runs recovery strategies.
18
19#![warn(missing_docs)]
20
21mod strategy;
22
23pub use self::strategy::{
24	FetchChunks, FetchChunksParams, FetchFull, FetchFullParams, FetchSystematicChunks,
25	FetchSystematicChunksParams, RecoveryStrategy, State,
26};
27
28#[cfg(test)]
29pub use self::strategy::{REGULAR_CHUNKS_REQ_RETRY_LIMIT, SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT};
30
31use crate::{metrics::Metrics, ErasureTask, PostRecoveryCheck, LOG_TARGET};
32
33use codec::Encode;
34use polkadot_node_primitives::AvailableData;
35use polkadot_node_subsystem::{messages::AvailabilityStoreMessage, overseer, RecoveryError};
36use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, Hash};
37use sc_network::ProtocolName;
38
39use futures::channel::{mpsc, oneshot};
40use std::collections::VecDeque;
41
42/// Recovery parameters common to all strategies in a `RecoveryTask`.
43#[derive(Clone)]
44pub struct RecoveryParams {
45	/// Discovery ids of `validators`.
46	pub validator_authority_keys: Vec<AuthorityDiscoveryId>,
47
48	/// Number of validators.
49	pub n_validators: usize,
50
51	/// The number of regular chunks needed.
52	pub threshold: usize,
53
54	/// The number of systematic chunks needed.
55	pub systematic_threshold: usize,
56
57	/// A hash of the relevant candidate.
58	pub candidate_hash: CandidateHash,
59
60	/// The root of the erasure encoding of the candidate.
61	pub erasure_root: Hash,
62
63	/// Metrics to report.
64	pub metrics: Metrics,
65
66	/// Do not request data from availability-store. Useful for collators.
67	pub bypass_availability_store: bool,
68
69	/// The type of check to perform after available data was recovered.
70	pub post_recovery_check: PostRecoveryCheck,
71
72	/// The blake2-256 hash of the PoV.
73	pub pov_hash: Hash,
74
75	/// Protocol name for ChunkFetchingV1.
76	pub req_v1_protocol_name: ProtocolName,
77
78	/// Protocol name for ChunkFetchingV2.
79	pub req_v2_protocol_name: ProtocolName,
80
81	/// Whether or not chunk mapping is enabled.
82	pub chunk_mapping_enabled: bool,
83
84	/// Channel to the erasure task handler.
85	pub erasure_task_tx: mpsc::Sender<ErasureTask>,
86}
87
88/// A stateful reconstruction of availability data in reference to
89/// a candidate hash.
90pub struct RecoveryTask<Sender: overseer::AvailabilityRecoverySenderTrait> {
91	sender: Sender,
92	params: RecoveryParams,
93	strategies: VecDeque<Box<dyn RecoveryStrategy<Sender>>>,
94	state: State,
95}
96
97impl<Sender> RecoveryTask<Sender>
98where
99	Sender: overseer::AvailabilityRecoverySenderTrait,
100{
101	/// Instantiate a new recovery task.
102	pub fn new(
103		sender: Sender,
104		params: RecoveryParams,
105		strategies: VecDeque<Box<dyn RecoveryStrategy<Sender>>>,
106	) -> Self {
107		Self { sender, params, strategies, state: State::new() }
108	}
109
110	async fn in_availability_store(&mut self) -> Option<AvailableData> {
111		if !self.params.bypass_availability_store {
112			let (tx, rx) = oneshot::channel();
113			self.sender
114				.send_message(AvailabilityStoreMessage::QueryAvailableData(
115					self.params.candidate_hash,
116					tx,
117				))
118				.await;
119
120			match rx.await {
121				Ok(Some(data)) => return Some(data),
122				Ok(None) => {},
123				Err(oneshot::Canceled) => {
124					gum::warn!(
125						target: LOG_TARGET,
126						candidate_hash = ?self.params.candidate_hash,
127						"Failed to reach the availability store",
128					)
129				},
130			}
131		}
132
133		None
134	}
135
136	/// Run this recovery task to completion. It will loop through the configured strategies
137	/// in-order and return whenever the first one recovers the full `AvailableData`.
138	pub async fn run(mut self) -> Result<AvailableData, RecoveryError> {
139		if let Some(data) = self.in_availability_store().await {
140			return Ok(data)
141		}
142
143		self.params.metrics.on_recovery_started();
144
145		let _timer = self.params.metrics.time_full_recovery();
146
147		while let Some(current_strategy) = self.strategies.pop_front() {
148			let display_name = current_strategy.display_name();
149			let strategy_type = current_strategy.strategy_type();
150
151			gum::debug!(
152				target: LOG_TARGET,
153				candidate_hash = ?self.params.candidate_hash,
154				"Starting `{}` strategy",
155				display_name
156			);
157
158			let res = current_strategy.run(&mut self.state, &mut self.sender, &self.params).await;
159
160			match res {
161				Err(RecoveryError::Unavailable) =>
162					if self.strategies.front().is_some() {
163						gum::debug!(
164							target: LOG_TARGET,
165							candidate_hash = ?self.params.candidate_hash,
166							"Recovery strategy `{}` did not conclude. Trying the next one.",
167							display_name
168						);
169						continue
170					},
171				Err(err) => {
172					match &err {
173						RecoveryError::Invalid =>
174							self.params.metrics.on_recovery_invalid(strategy_type),
175						_ => self.params.metrics.on_recovery_failed(strategy_type),
176					}
177					return Err(err)
178				},
179				Ok(data) => {
180					self.params.metrics.on_recovery_succeeded(strategy_type, data.encoded_size());
181					return Ok(data)
182				},
183			}
184		}
185
186		// We have no other strategies to try.
187		gum::warn!(
188			target: LOG_TARGET,
189			candidate_hash = ?self.params.candidate_hash,
190			"Recovery of available data failed.",
191		);
192
193		self.params.metrics.on_recovery_failed("all");
194
195		Err(RecoveryError::Unavailable)
196	}
197}