polkadot_availability_recovery/task/strategy/
full.rs1use crate::{
18 task::{RecoveryParams, RecoveryStrategy, State},
19 ErasureTask, PostRecoveryCheck, LOG_TARGET,
20};
21
22use polkadot_node_network_protocol::request_response::{
23 self as req_res, outgoing::RequestError, OutgoingRequest, Recipient, Requests,
24};
25use polkadot_node_primitives::AvailableData;
26use polkadot_node_subsystem::{messages::NetworkBridgeTxMessage, overseer, RecoveryError};
27use polkadot_primitives::ValidatorIndex;
28use sc_network::{IfDisconnected, OutboundFailure, RequestFailure};
29
30use futures::{channel::oneshot, SinkExt};
31use rand::seq::SliceRandom;
32
33pub struct FetchFullParams {
35 pub validators: Vec<ValidatorIndex>,
37}
38
39pub struct FetchFull {
42 params: FetchFullParams,
43}
44
45impl FetchFull {
46 pub fn new(mut params: FetchFullParams) -> Self {
48 params.validators.shuffle(&mut rand::thread_rng());
49 Self { params }
50 }
51}
52
53#[async_trait::async_trait]
54impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender> for FetchFull {
55 fn display_name(&self) -> &'static str {
56 "Full recovery from backers"
57 }
58
59 fn strategy_type(&self) -> &'static str {
60 "full_from_backers"
61 }
62
63 async fn run(
64 mut self: Box<Self>,
65 _: &mut State,
66 sender: &mut Sender,
67 common_params: &RecoveryParams,
68 ) -> Result<AvailableData, RecoveryError> {
69 let strategy_type = RecoveryStrategy::<Sender>::strategy_type(&*self);
70
71 loop {
72 let validator_index =
74 self.params.validators.pop().ok_or_else(|| RecoveryError::Unavailable)?;
75
76 let (req, response) = OutgoingRequest::new(
78 Recipient::Authority(
79 common_params.validator_authority_keys[validator_index.0 as usize].clone(),
80 ),
81 req_res::v1::AvailableDataFetchingRequest {
82 candidate_hash: common_params.candidate_hash,
83 },
84 );
85
86 sender
87 .send_message(NetworkBridgeTxMessage::SendRequests(
88 vec![Requests::AvailableDataFetchingV1(req)],
89 IfDisconnected::ImmediateError,
90 ))
91 .await;
92
93 common_params.metrics.on_full_request_issued();
94
95 match response.await {
96 Ok(req_res::v1::AvailableDataFetchingResponse::AvailableData(data)) => {
97 let recovery_duration =
98 common_params.metrics.time_erasure_recovery(strategy_type);
99 let maybe_data = match common_params.post_recovery_check {
100 PostRecoveryCheck::Reencode => {
101 let (reencode_tx, reencode_rx) = oneshot::channel();
102 let mut erasure_task_tx = common_params.erasure_task_tx.clone();
103
104 erasure_task_tx
105 .send(ErasureTask::Reencode(
106 common_params.n_validators,
107 common_params.erasure_root,
108 data,
109 reencode_tx,
110 ))
111 .await
112 .map_err(|_| RecoveryError::ChannelClosed)?;
113
114 reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?
115 },
116 PostRecoveryCheck::PovHash =>
117 (data.pov.hash() == common_params.pov_hash).then_some(data),
118 };
119
120 match maybe_data {
121 Some(data) => {
122 gum::trace!(
123 target: LOG_TARGET,
124 candidate_hash = ?common_params.candidate_hash,
125 "Received full data",
126 );
127
128 common_params.metrics.on_full_request_succeeded();
129 return Ok(data)
130 },
131 None => {
132 common_params.metrics.on_full_request_invalid();
133 recovery_duration.map(|rd| rd.stop_and_discard());
134
135 gum::debug!(
136 target: LOG_TARGET,
137 candidate_hash = ?common_params.candidate_hash,
138 ?validator_index,
139 "Invalid data response",
140 );
141
142 },
145 }
146 },
147 Ok(req_res::v1::AvailableDataFetchingResponse::NoSuchData) => {
148 common_params.metrics.on_full_request_no_such_data();
149 },
150 Err(e) => {
151 match &e {
152 RequestError::Canceled(_) => common_params.metrics.on_full_request_error(),
153 RequestError::InvalidResponse(_) =>
154 common_params.metrics.on_full_request_invalid(),
155 RequestError::NetworkError(req_failure) => {
156 if let RequestFailure::Network(OutboundFailure::Timeout) = req_failure {
157 common_params.metrics.on_full_request_timeout();
158 } else {
159 common_params.metrics.on_full_request_error();
160 }
161 },
162 };
163 gum::debug!(
164 target: LOG_TARGET,
165 candidate_hash = ?common_params.candidate_hash,
166 ?validator_index,
167 err = ?e,
168 "Error fetching full available data."
169 );
170 },
171 }
172 }
173 }
174}