polkadot_availability_recovery/task/
mod.rs1#![warn(missing_docs)]
20
21mod strategy;
22
23pub use self::strategy::{
24 FetchChunks, FetchChunksParams, FetchFull, FetchFullParams, FetchSystematicChunks,
25 FetchSystematicChunksParams, RecoveryStrategy, State,
26};
27
28#[cfg(test)]
29pub use self::strategy::{REGULAR_CHUNKS_REQ_RETRY_LIMIT, SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT};
30
31use crate::{metrics::Metrics, ErasureTask, PostRecoveryCheck, LOG_TARGET};
32
33use codec::Encode;
34use polkadot_node_primitives::AvailableData;
35use polkadot_node_subsystem::{messages::AvailabilityStoreMessage, overseer, RecoveryError};
36use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, Hash};
37use sc_network::ProtocolName;
38
39use futures::channel::{mpsc, oneshot};
40use std::collections::VecDeque;
41
42#[derive(Clone)]
44pub struct RecoveryParams {
45 pub validator_authority_keys: Vec<AuthorityDiscoveryId>,
47
48 pub n_validators: usize,
50
51 pub threshold: usize,
53
54 pub systematic_threshold: usize,
56
57 pub candidate_hash: CandidateHash,
59
60 pub erasure_root: Hash,
62
63 pub metrics: Metrics,
65
66 pub bypass_availability_store: bool,
68
69 pub post_recovery_check: PostRecoveryCheck,
71
72 pub pov_hash: Hash,
74
75 pub req_v1_protocol_name: ProtocolName,
77
78 pub req_v2_protocol_name: ProtocolName,
80
81 pub chunk_mapping_enabled: bool,
83
84 pub erasure_task_tx: mpsc::Sender<ErasureTask>,
86}
87
88pub struct RecoveryTask<Sender: overseer::AvailabilityRecoverySenderTrait> {
91 sender: Sender,
92 params: RecoveryParams,
93 strategies: VecDeque<Box<dyn RecoveryStrategy<Sender>>>,
94 state: State,
95}
96
97impl<Sender> RecoveryTask<Sender>
98where
99 Sender: overseer::AvailabilityRecoverySenderTrait,
100{
101 pub fn new(
103 sender: Sender,
104 params: RecoveryParams,
105 strategies: VecDeque<Box<dyn RecoveryStrategy<Sender>>>,
106 ) -> Self {
107 Self { sender, params, strategies, state: State::new() }
108 }
109
110 async fn in_availability_store(&mut self) -> Option<AvailableData> {
111 if !self.params.bypass_availability_store {
112 let (tx, rx) = oneshot::channel();
113 self.sender
114 .send_message(AvailabilityStoreMessage::QueryAvailableData(
115 self.params.candidate_hash,
116 tx,
117 ))
118 .await;
119
120 match rx.await {
121 Ok(Some(data)) => return Some(data),
122 Ok(None) => {},
123 Err(oneshot::Canceled) => {
124 gum::warn!(
125 target: LOG_TARGET,
126 candidate_hash = ?self.params.candidate_hash,
127 "Failed to reach the availability store",
128 )
129 },
130 }
131 }
132
133 None
134 }
135
136 pub async fn run(mut self) -> Result<AvailableData, RecoveryError> {
139 if let Some(data) = self.in_availability_store().await {
140 return Ok(data)
141 }
142
143 self.params.metrics.on_recovery_started();
144
145 let _timer = self.params.metrics.time_full_recovery();
146
147 while let Some(current_strategy) = self.strategies.pop_front() {
148 let display_name = current_strategy.display_name();
149 let strategy_type = current_strategy.strategy_type();
150
151 gum::debug!(
152 target: LOG_TARGET,
153 candidate_hash = ?self.params.candidate_hash,
154 "Starting `{}` strategy",
155 display_name
156 );
157
158 let res = current_strategy.run(&mut self.state, &mut self.sender, &self.params).await;
159
160 match res {
161 Err(RecoveryError::Unavailable) =>
162 if self.strategies.front().is_some() {
163 gum::debug!(
164 target: LOG_TARGET,
165 candidate_hash = ?self.params.candidate_hash,
166 "Recovery strategy `{}` did not conclude. Trying the next one.",
167 display_name
168 );
169 continue
170 },
171 Err(err) => {
172 match &err {
173 RecoveryError::Invalid =>
174 self.params.metrics.on_recovery_invalid(strategy_type),
175 _ => self.params.metrics.on_recovery_failed(strategy_type),
176 }
177 return Err(err)
178 },
179 Ok(data) => {
180 self.params.metrics.on_recovery_succeeded(strategy_type, data.encoded_size());
181 return Ok(data)
182 },
183 }
184 }
185
186 gum::warn!(
188 target: LOG_TARGET,
189 candidate_hash = ?self.params.candidate_hash,
190 "Recovery of available data failed.",
191 );
192
193 self.params.metrics.on_recovery_failed("all");
194
195 Err(RecoveryError::Unavailable)
196 }
197}