referrerpolicy=no-referrer-when-downgrade

polkadot_availability_recovery/task/strategy/
full.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	task::{RecoveryParams, RecoveryStrategy, State},
19	ErasureTask, PostRecoveryCheck, LOG_TARGET,
20};
21
22use polkadot_node_network_protocol::request_response::{
23	self as req_res, outgoing::RequestError, OutgoingRequest, Recipient, Requests,
24};
25use polkadot_node_primitives::AvailableData;
26use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, overseer, RecoveryError};
27use polkadot_primitives::ValidatorIndex;
28use sc_network::{IfDisconnected, OutboundFailure, RequestFailure};
29
30use futures::{channel::oneshot, SinkExt};
31use rand::seq::SliceRandom;
32
33/// Parameters specific to the `FetchFull` strategy.
34pub struct FetchFullParams {
35	/// Validators that will be used for fetching the data.
36	pub validators: Vec<ValidatorIndex>,
37}
38
39/// `RecoveryStrategy` that sequentially tries to fetch the full `AvailableData` from
40/// already-connected validators in the configured validator set.
41pub struct FetchFull {
42	params: FetchFullParams,
43}
44
45impl FetchFull {
46	/// Create a new `FetchFull` recovery strategy.
47	pub fn new(mut params: FetchFullParams) -> Self {
48		params.validators.shuffle(&mut rand::thread_rng());
49		Self { params }
50	}
51}
52
53#[async_trait::async_trait]
54impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender> for FetchFull {
55	fn display_name(&self) -> &'static str {
56		"Full recovery from backers"
57	}
58
59	fn strategy_type(&self) -> &'static str {
60		"full_from_backers"
61	}
62
63	async fn run(
64		mut self: Box<Self>,
65		_: &mut State,
66		sender: &mut Sender,
67		common_params: &RecoveryParams,
68	) -> Result<AvailableData, RecoveryError> {
69		let strategy_type = RecoveryStrategy::<Sender>::strategy_type(&*self);
70
71		loop {
72			// Pop the next validator.
73			let validator_index =
74				self.params.validators.pop().ok_or_else(|| RecoveryError::Unavailable)?;
75
76			// Request data.
77			let (req, response) = OutgoingRequest::new(
78				Recipient::Authority(
79					common_params.validator_authority_keys[validator_index.0 as usize].clone(),
80				),
81				req_res::v1::AvailableDataFetchingRequest {
82					candidate_hash: common_params.candidate_hash,
83				},
84			);
85
86			sender
87				.send_message(NetworkBridgeTxMessage::SendRequests(
88					vec![Requests::AvailableDataFetchingV1(req)],
89					IfDisconnected::ImmediateError,
90				))
91				.await;
92
93			common_params.metrics.on_full_request_issued();
94
95			match response.await {
96				Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
97					let recovery_duration =
98						common_params.metrics.time_erasure_recovery(strategy_type);
99					let maybe_data = match common_params.post_recovery_check {
100						PostRecoveryCheck::Reencode => {
101							let (reencode_tx, reencode_rx) = oneshot::channel();
102							let mut erasure_task_tx = common_params.erasure_task_tx.clone();
103
104							erasure_task_tx
105								.send(ErasureTask::Reencode(
106									common_params.n_validators,
107									common_params.erasure_root,
108									data,
109									reencode_tx,
110								))
111								.await
112								.map_err(|_| RecoveryError::ChannelClosed)?;
113
114							reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?
115						},
116						PostRecoveryCheck::PovHash =>
117							(data.pov.hash() == common_params.pov_hash).then_some(data),
118					};
119
120					match maybe_data {
121						Some(data) => {
122							gum::trace!(
123								target: LOG_TARGET,
124								candidate_hash = ?common_params.candidate_hash,
125								"Received full data",
126							);
127
128							common_params.metrics.on_full_request_succeeded();
129							return Ok(data)
130						},
131						None => {
132							common_params.metrics.on_full_request_invalid();
133							recovery_duration.map(|rd| rd.stop_and_discard());
134
135							gum::debug!(
136								target: LOG_TARGET,
137								candidate_hash = ?common_params.candidate_hash,
138								?validator_index,
139								"Invalid data response",
140							);
141
142							// it doesn't help to report the peer with req/res.
143							// we'll try the next backer.
144						},
145					}
146				},
147				Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {
148					common_params.metrics.on_full_request_no_such_data();
149				},
150				Err(e) => {
151					match &e {
152						RequestError::Canceled(_) => common_params.metrics.on_full_request_error(),
153						RequestError::InvalidResponse(_) =>
154							common_params.metrics.on_full_request_invalid(),
155						RequestError::NetworkError(req_failure) => {
156							if let RequestFailure::Network(OutboundFailure::Timeout) = req_failure {
157								common_params.metrics.on_full_request_timeout();
158							} else {
159								common_params.metrics.on_full_request_error();
160							}
161						},
162					};
163					gum::debug!(
164						target: LOG_TARGET,
165						candidate_hash = ?common_params.candidate_hash,
166						?validator_index,
167						err = ?e,
168						"Error fetching full available data."
169					);
170				},
171			}
172		}
173	}
174}