1mod chunks;
20mod full;
21mod systematic;
22
23pub use self::{
24 chunks::{FetchChunks, FetchChunksParams},
25 full::{FetchFull, FetchFullParams},
26 systematic::{FetchSystematicChunks, FetchSystematicChunksParams},
27};
28use crate::{
29 futures_undead::FuturesUndead, ErasureTask, PostRecoveryCheck, RecoveryParams, LOG_TARGET,
30};
31
32use codec::Decode;
33use futures::{channel::oneshot, SinkExt};
34use polkadot_erasure_coding::branch_hash;
35#[cfg(not(test))]
36use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT;
37use polkadot_node_network_protocol::request_response::{
38 self as req_res, outgoing::RequestError, OutgoingRequest, Recipient, Requests,
39};
40use polkadot_node_primitives::{AvailableData, ErasureChunk};
41use polkadot_node_subsystem::{
42 messages::{AvailabilityStoreMessage, NetworkBridgeTxMessage},
43 overseer, RecoveryError,
44};
45use polkadot_primitives::{AuthorityDiscoveryId, BlakeTwo256, ChunkIndex, HashT, ValidatorIndex};
46use sc_network::{IfDisconnected, OutboundFailure, ProtocolName, RequestFailure};
47use std::{
48 collections::{BTreeMap, HashMap, VecDeque},
49 time::Duration,
50};
51
52const N_PARALLEL: usize = 50;
54
55#[cfg(not(test))]
65const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
66#[cfg(test)]
67const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);
68
69pub const SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT: u32 = 2;
73pub const REGULAR_CHUNKS_REQ_RETRY_LIMIT: u32 = 5;
77
78type OngoingRequests = FuturesUndead<(
80 AuthorityDiscoveryId,
81 ValidatorIndex,
82 Result<(Option<ErasureChunk>, ProtocolName), RequestError>,
83)>;
84
85const fn is_unavailable(
86 received_chunks: usize,
87 requesting_chunks: usize,
88 unrequested_validators: usize,
89 threshold: usize,
90) -> bool {
91 received_chunks + requesting_chunks + unrequested_validators < threshold
92}
93
94fn is_chunk_valid(params: &RecoveryParams, chunk: &ErasureChunk) -> bool {
96 let anticipated_hash =
97 match branch_hash(¶ms.erasure_root, chunk.proof(), chunk.index.0 as usize) {
98 Ok(hash) => hash,
99 Err(e) => {
100 gum::debug!(
101 target: LOG_TARGET,
102 candidate_hash = ?params.candidate_hash,
103 chunk_index = ?chunk.index,
104 error = ?e,
105 "Invalid Merkle proof",
106 );
107 return false
108 },
109 };
110 let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
111 if anticipated_hash != erasure_chunk_hash {
112 gum::debug!(
113 target: LOG_TARGET,
114 candidate_hash = ?params.candidate_hash,
115 chunk_index = ?chunk.index,
116 "Merkle proof mismatch"
117 );
118 return false
119 }
120 true
121}
122
123async fn do_post_recovery_check(
125 params: &RecoveryParams,
126 data: AvailableData,
127) -> Result<AvailableData, RecoveryError> {
128 let mut erasure_task_tx = params.erasure_task_tx.clone();
129 match params.post_recovery_check {
130 PostRecoveryCheck::Reencode => {
131 let (reencode_tx, reencode_rx) = oneshot::channel();
133 erasure_task_tx
134 .send(ErasureTask::Reencode(
135 params.n_validators,
136 params.erasure_root,
137 data,
138 reencode_tx,
139 ))
140 .await
141 .map_err(|_| RecoveryError::ChannelClosed)?;
142
143 reencode_rx.await.map_err(|_| RecoveryError::ChannelClosed)?.ok_or_else(|| {
144 gum::trace!(
145 target: LOG_TARGET,
146 candidate_hash = ?params.candidate_hash,
147 erasure_root = ?params.erasure_root,
148 "Data recovery error - root mismatch",
149 );
150 RecoveryError::Invalid
151 })
152 },
153 PostRecoveryCheck::PovHash => {
154 let pov = data.pov.clone();
155 (pov.hash() == params.pov_hash).then_some(data).ok_or_else(|| {
156 gum::trace!(
157 target: LOG_TARGET,
158 candidate_hash = ?params.candidate_hash,
159 expected_pov_hash = ?params.pov_hash,
160 actual_pov_hash = ?pov.hash(),
161 "Data recovery error - PoV hash mismatch",
162 );
163 RecoveryError::Invalid
164 })
165 },
166 }
167}
168
169#[async_trait::async_trait]
170pub trait RecoveryStrategy<Sender: overseer::AvailabilityRecoverySenderTrait>: Send {
172 async fn run(
174 mut self: Box<Self>,
175 state: &mut State,
176 sender: &mut Sender,
177 common_params: &RecoveryParams,
178 ) -> Result<AvailableData, RecoveryError>;
179
180 fn display_name(&self) -> &'static str;
182
183 fn strategy_type(&self) -> &'static str;
185}
186
187enum ErrorRecord {
189 NonFatal(u32),
190 Fatal,
191}
192
193struct Chunk {
197 chunk: Vec<u8>,
199 validator_index: ValidatorIndex,
201}
202
203pub struct State {
206 received_chunks: BTreeMap<ChunkIndex, Chunk>,
211
212 recorded_errors: HashMap<(AuthorityDiscoveryId, ValidatorIndex), ErrorRecord>,
214}
215
216impl State {
217 pub fn new() -> Self {
218 Self { received_chunks: BTreeMap::new(), recorded_errors: HashMap::new() }
219 }
220
221 fn insert_chunk(&mut self, chunk_index: ChunkIndex, chunk: Chunk) {
222 self.received_chunks.insert(chunk_index, chunk);
223 }
224
225 fn chunk_count(&self) -> usize {
226 self.received_chunks.len()
227 }
228
229 fn systematic_chunk_count(&self, systematic_threshold: usize) -> usize {
230 self.received_chunks
231 .range(ChunkIndex(0)..ChunkIndex(systematic_threshold as u32))
232 .count()
233 }
234
235 fn record_error_fatal(
236 &mut self,
237 authority_id: AuthorityDiscoveryId,
238 validator_index: ValidatorIndex,
239 ) {
240 self.recorded_errors.insert((authority_id, validator_index), ErrorRecord::Fatal);
241 }
242
243 fn record_error_non_fatal(
244 &mut self,
245 authority_id: AuthorityDiscoveryId,
246 validator_index: ValidatorIndex,
247 ) {
248 self.recorded_errors
249 .entry((authority_id, validator_index))
250 .and_modify(|record| {
251 if let ErrorRecord::NonFatal(ref mut count) = record {
252 *count = count.saturating_add(1);
253 }
254 })
255 .or_insert(ErrorRecord::NonFatal(1));
256 }
257
258 fn can_retry_request(
259 &self,
260 key: &(AuthorityDiscoveryId, ValidatorIndex),
261 retry_threshold: u32,
262 ) -> bool {
263 match self.recorded_errors.get(key) {
264 None => true,
265 Some(entry) => match entry {
266 ErrorRecord::Fatal => false,
267 ErrorRecord::NonFatal(count) if *count < retry_threshold => true,
268 ErrorRecord::NonFatal(_) => false,
269 },
270 }
271 }
272
273 async fn populate_from_av_store<Sender: overseer::AvailabilityRecoverySenderTrait>(
275 &mut self,
276 params: &RecoveryParams,
277 sender: &mut Sender,
278 ) -> Vec<(ValidatorIndex, ChunkIndex)> {
279 let (tx, rx) = oneshot::channel();
280 sender
281 .send_message(AvailabilityStoreMessage::QueryAllChunks(params.candidate_hash, tx))
282 .await;
283
284 match rx.await {
285 Ok(chunks) => {
286 let chunk_indices: Vec<_> = chunks
289 .iter()
290 .map(|(validator_index, chunk)| (*validator_index, chunk.index))
291 .collect();
292
293 for (validator_index, chunk) in chunks {
294 if is_chunk_valid(params, &chunk) {
295 gum::trace!(
296 target: LOG_TARGET,
297 candidate_hash = ?params.candidate_hash,
298 chunk_index = ?chunk.index,
299 "Found valid chunk on disk"
300 );
301 self.insert_chunk(
302 chunk.index,
303 Chunk { chunk: chunk.chunk, validator_index },
304 );
305 } else {
306 gum::error!(
307 target: LOG_TARGET,
308 "Loaded invalid chunk from disk! Disk/Db corruption _very_ likely - please fix ASAP!"
309 );
310 };
311 }
312
313 chunk_indices
314 },
315 Err(oneshot::Canceled) => {
316 gum::warn!(
317 target: LOG_TARGET,
318 candidate_hash = ?params.candidate_hash,
319 "Failed to reach the availability store"
320 );
321
322 vec![]
323 },
324 }
325 }
326
327 async fn launch_parallel_chunk_requests<Sender>(
329 &mut self,
330 strategy_type: &str,
331 params: &RecoveryParams,
332 sender: &mut Sender,
333 desired_requests_count: usize,
334 validators: &mut VecDeque<(AuthorityDiscoveryId, ValidatorIndex)>,
335 requesting_chunks: &mut OngoingRequests,
336 ) where
337 Sender: overseer::AvailabilityRecoverySenderTrait,
338 {
339 let candidate_hash = params.candidate_hash;
340 let already_requesting_count = requesting_chunks.len();
341
342 let to_launch = desired_requests_count - already_requesting_count;
343 let mut requests = Vec::with_capacity(to_launch);
344
345 gum::trace!(
346 target: LOG_TARGET,
347 ?candidate_hash,
348 "Attempting to launch {} requests",
349 to_launch
350 );
351
352 while requesting_chunks.len() < desired_requests_count {
353 if let Some((authority_id, validator_index)) = validators.pop_back() {
354 gum::trace!(
355 target: LOG_TARGET,
356 ?authority_id,
357 ?validator_index,
358 ?candidate_hash,
359 "Requesting chunk",
360 );
361
362 let raw_request_v2 =
364 req_res::v2::ChunkFetchingRequest { candidate_hash, index: validator_index };
365 let raw_request_v1 = req_res::v1::ChunkFetchingRequest::from(raw_request_v2);
366
367 let (req, res) = OutgoingRequest::new_with_fallback(
368 Recipient::Authority(authority_id.clone()),
369 raw_request_v2,
370 raw_request_v1,
371 );
372 requests.push(Requests::ChunkFetching(req));
373
374 params.metrics.on_chunk_request_issued(strategy_type);
375 let timer = params.metrics.time_chunk_request(strategy_type);
376 let v1_protocol_name = params.req_v1_protocol_name.clone();
377 let v2_protocol_name = params.req_v2_protocol_name.clone();
378
379 let chunk_mapping_enabled = params.chunk_mapping_enabled;
380 let authority_id_clone = authority_id.clone();
381
382 requesting_chunks.push(Box::pin(async move {
383 let _timer = timer;
384 let res = match res.await {
385 Ok((bytes, protocol)) =>
386 if v2_protocol_name == protocol {
387 match req_res::v2::ChunkFetchingResponse::decode(&mut &bytes[..]) {
388 Ok(req_res::v2::ChunkFetchingResponse::Chunk(chunk)) =>
389 Ok((Some(chunk.into()), protocol)),
390 Ok(req_res::v2::ChunkFetchingResponse::NoSuchChunk) =>
391 Ok((None, protocol)),
392 Err(e) => Err(RequestError::InvalidResponse(e)),
393 }
394 } else if v1_protocol_name == protocol {
395 if chunk_mapping_enabled {
402 gum::info!(
403 target: LOG_TARGET,
404 ?candidate_hash,
405 authority_id = ?authority_id_clone,
406 "Another validator is responding on /req_chunk/1 protocol while the availability chunk \
407 mapping feature is enabled in the runtime. All validators must switch to /req_chunk/2."
408 );
409 }
410
411 match req_res::v1::ChunkFetchingResponse::decode(&mut &bytes[..]) {
412 Ok(req_res::v1::ChunkFetchingResponse::Chunk(chunk)) => Ok((
413 Some(chunk.recombine_into_chunk(&raw_request_v1)),
414 protocol,
415 )),
416 Ok(req_res::v1::ChunkFetchingResponse::NoSuchChunk) =>
417 Ok((None, protocol)),
418 Err(e) => Err(RequestError::InvalidResponse(e)),
419 }
420 } else {
421 Err(RequestError::NetworkError(RequestFailure::UnknownProtocol))
422 },
423
424 Err(e) => Err(e),
425 };
426
427 (authority_id, validator_index, res)
428 }));
429 } else {
430 break
431 }
432 }
433
434 if requests.len() != 0 {
435 sender
436 .send_message(NetworkBridgeTxMessage::SendRequests(
437 requests,
438 IfDisconnected::TryConnect,
439 ))
440 .await;
441 }
442 }
443
444 async fn wait_for_chunks(
446 &mut self,
447 strategy_type: &str,
448 params: &RecoveryParams,
449 retry_threshold: u32,
450 validators: &mut VecDeque<(AuthorityDiscoveryId, ValidatorIndex)>,
451 requesting_chunks: &mut OngoingRequests,
452 backup_validators: &mut Vec<AuthorityDiscoveryId>,
455 mut can_conclude: impl FnMut(
458 usize,
460 usize,
462 usize,
464 usize,
466 ) -> bool,
467 ) -> (usize, usize) {
468 let metrics = ¶ms.metrics;
469
470 let mut total_received_responses = 0;
471 let mut error_count = 0;
472
473 while let Some(res) = requesting_chunks.next_with_timeout(TIMEOUT_START_NEW_REQUESTS).await
477 {
478 total_received_responses += 1;
479
480 let (authority_id, validator_index, request_result) = res;
481
482 let mut is_error = false;
483
484 match request_result {
485 Ok((maybe_chunk, protocol)) => {
486 match protocol {
487 name if name == params.req_v1_protocol_name =>
488 params.metrics.on_chunk_response_v1(),
489 name if name == params.req_v2_protocol_name =>
490 params.metrics.on_chunk_response_v2(),
491 _ => {},
492 }
493
494 match maybe_chunk {
495 Some(chunk) =>
496 if is_chunk_valid(params, &chunk) {
497 metrics.on_chunk_request_succeeded(strategy_type);
498 gum::trace!(
499 target: LOG_TARGET,
500 candidate_hash = ?params.candidate_hash,
501 ?authority_id,
502 ?validator_index,
503 "Received valid chunk",
504 );
505 self.insert_chunk(
506 chunk.index,
507 Chunk { chunk: chunk.chunk, validator_index },
508 );
509 } else {
510 metrics.on_chunk_request_invalid(strategy_type);
511 error_count += 1;
512 self.record_error_fatal(authority_id.clone(), validator_index);
515 is_error = true;
516 },
517 None => {
518 metrics.on_chunk_request_no_such_chunk(strategy_type);
519 gum::trace!(
520 target: LOG_TARGET,
521 candidate_hash = ?params.candidate_hash,
522 ?authority_id,
523 ?validator_index,
524 "Validator did not have the chunk",
525 );
526 error_count += 1;
527 self.record_error_fatal(authority_id.clone(), validator_index);
530 is_error = true;
531 },
532 }
533 },
534 Err(err) => {
535 error_count += 1;
536
537 gum::trace!(
538 target: LOG_TARGET,
539 candidate_hash= ?params.candidate_hash,
540 ?err,
541 ?authority_id,
542 ?validator_index,
543 "Failure requesting chunk",
544 );
545
546 is_error = true;
547
548 match err {
549 RequestError::InvalidResponse(_) => {
550 metrics.on_chunk_request_invalid(strategy_type);
551
552 gum::debug!(
553 target: LOG_TARGET,
554 candidate_hash = ?params.candidate_hash,
555 ?err,
556 ?authority_id,
557 ?validator_index,
558 "Chunk fetching response was invalid",
559 );
560
561 self.record_error_fatal(authority_id.clone(), validator_index);
564 },
565 RequestError::NetworkError(err) => {
566 if let RequestFailure::Network(OutboundFailure::Timeout) = err {
569 metrics.on_chunk_request_timeout(strategy_type);
570 } else {
571 metrics.on_chunk_request_error(strategy_type);
572 }
573
574 self.record_error_non_fatal(authority_id.clone(), validator_index);
578 },
579 RequestError::Canceled(_) => {
580 metrics.on_chunk_request_error(strategy_type);
581
582 self.record_error_non_fatal(authority_id.clone(), validator_index);
586 },
587 }
588 },
589 }
590
591 if is_error {
592 if self.can_retry_request(&(authority_id.clone(), validator_index), retry_threshold)
594 {
595 validators.push_front((authority_id, validator_index));
596 } else {
597 let position = backup_validators.iter().position(|v| {
601 !self.recorded_errors.contains_key(&(v.clone(), validator_index))
602 });
603 if let Some(position) = position {
604 let backer = backup_validators.swap_remove(position);
606 validators.push_front((backer, validator_index));
607 }
608 }
609 }
610
611 if can_conclude(
612 validators.len(),
613 requesting_chunks.total_len(),
614 self.chunk_count(),
615 self.systematic_chunk_count(params.systematic_threshold),
616 ) {
617 gum::debug!(
618 target: LOG_TARGET,
619 validators_len = validators.len(),
620 candidate_hash = ?params.candidate_hash,
621 received_chunks_count = ?self.chunk_count(),
622 requested_chunks_count = ?requesting_chunks.len(),
623 threshold = ?params.threshold,
624 "Can conclude availability recovery strategy",
625 );
626 break
627 }
628 }
629
630 (total_received_responses, error_count)
631 }
632}
633
634#[cfg(test)]
635mod tests {
636 use super::*;
637 use crate::{tests::*, Metrics, RecoveryStrategy, RecoveryTask};
638 use assert_matches::assert_matches;
639 use codec::Error as DecodingError;
640 use futures::{
641 channel::mpsc::{self, UnboundedReceiver},
642 executor, future, Future, FutureExt, StreamExt,
643 };
644 use polkadot_erasure_coding::{recovery_threshold, systematic_recovery_threshold};
645 use polkadot_node_network_protocol::request_response::Protocol;
646 use polkadot_node_primitives::{BlockData, PoV};
647 use polkadot_node_subsystem::{AllMessages, TimeoutExt};
648 use polkadot_node_subsystem_test_helpers::{
649 derive_erasure_chunks_with_proofs_and_root, sender_receiver, TestSubsystemSender,
650 };
651 use polkadot_primitives::{CandidateHash, HeadData, PersistedValidationData};
652 use polkadot_primitives_test_helpers::dummy_hash;
653 use sp_keyring::Sr25519Keyring;
654 use std::sync::Arc;
655
656 const TIMEOUT: Duration = Duration::from_secs(1);
657
658 impl Default for RecoveryParams {
659 fn default() -> Self {
660 let validators = vec![
661 Sr25519Keyring::Ferdie,
662 Sr25519Keyring::Alice.into(),
663 Sr25519Keyring::Bob.into(),
664 Sr25519Keyring::Charlie,
665 Sr25519Keyring::Dave,
666 Sr25519Keyring::One,
667 Sr25519Keyring::Two,
668 ];
669 let (erasure_task_tx, _erasure_task_rx) = mpsc::channel(10);
670
671 Self {
672 validator_authority_keys: validator_authority_id(&validators),
673 n_validators: validators.len(),
674 threshold: recovery_threshold(validators.len()).unwrap(),
675 systematic_threshold: systematic_recovery_threshold(validators.len()).unwrap(),
676 candidate_hash: CandidateHash(dummy_hash()),
677 erasure_root: dummy_hash(),
678 metrics: Metrics::new_dummy(),
679 bypass_availability_store: false,
680 post_recovery_check: PostRecoveryCheck::Reencode,
681 pov_hash: dummy_hash(),
682 req_v1_protocol_name: "/req_chunk/1".into(),
683 req_v2_protocol_name: "/req_chunk/2".into(),
684 chunk_mapping_enabled: true,
685 erasure_task_tx,
686 }
687 }
688 }
689
690 impl RecoveryParams {
691 fn create_chunks(&mut self) -> Vec<ErasureChunk> {
692 let available_data = dummy_available_data();
693 let (chunks, erasure_root) = derive_erasure_chunks_with_proofs_and_root(
694 self.n_validators,
695 &available_data,
696 |_, _| {},
697 );
698
699 self.erasure_root = erasure_root;
700 self.pov_hash = available_data.pov.hash();
701
702 chunks
703 }
704 }
705
706 fn dummy_available_data() -> AvailableData {
707 let validation_data = PersistedValidationData {
708 parent_head: HeadData(vec![7, 8, 9]),
709 relay_parent_number: Default::default(),
710 max_pov_size: 1024,
711 relay_parent_storage_root: Default::default(),
712 };
713
714 AvailableData {
715 validation_data,
716 pov: Arc::new(PoV { block_data: BlockData(vec![42; 64]) }),
717 }
718 }
719
720 fn test_harness<RecvFut: Future<Output = ()>, TestFut: Future<Output = ()>>(
721 receiver_future: impl FnOnce(UnboundedReceiver<AllMessages>) -> RecvFut,
722 test: impl FnOnce(TestSubsystemSender) -> TestFut,
723 ) {
724 let (sender, receiver) = sender_receiver();
725
726 let test_fut = test(sender);
727 let receiver_future = receiver_future(receiver);
728
729 futures::pin_mut!(test_fut);
730 futures::pin_mut!(receiver_future);
731
732 executor::block_on(future::join(test_fut, receiver_future)).1
733 }
734
735 #[test]
736 fn test_recorded_errors() {
737 let retry_threshold = 2;
738 let mut state = State::new();
739
740 let alice = Sr25519Keyring::Alice.public();
741 let bob = Sr25519Keyring::Bob.public();
742 let eve = Sr25519Keyring::Eve.public();
743
744 assert!(state.can_retry_request(&(alice.into(), 0.into()), retry_threshold));
745 assert!(state.can_retry_request(&(alice.into(), 0.into()), 0));
746 state.record_error_non_fatal(alice.into(), 0.into());
747 assert!(state.can_retry_request(&(alice.into(), 0.into()), retry_threshold));
748 state.record_error_non_fatal(alice.into(), 0.into());
749 assert!(!state.can_retry_request(&(alice.into(), 0.into()), retry_threshold));
750 state.record_error_non_fatal(alice.into(), 0.into());
751 assert!(!state.can_retry_request(&(alice.into(), 0.into()), retry_threshold));
752
753 assert!(state.can_retry_request(&(alice.into(), 0.into()), 5));
754
755 state.record_error_fatal(bob.into(), 1.into());
756 assert!(!state.can_retry_request(&(bob.into(), 1.into()), retry_threshold));
757 state.record_error_non_fatal(bob.into(), 1.into());
758 assert!(!state.can_retry_request(&(bob.into(), 1.into()), retry_threshold));
759
760 assert!(state.can_retry_request(&(eve.into(), 4.into()), 0));
761 assert!(state.can_retry_request(&(eve.into(), 4.into()), retry_threshold));
762 }
763
764 #[test]
765 fn test_populate_from_av_store() {
766 let params = RecoveryParams::default();
767
768 {
770 let params = params.clone();
771 let candidate_hash = params.candidate_hash;
772 let mut state = State::new();
773
774 test_harness(
775 |mut receiver: UnboundedReceiver<AllMessages>| async move {
776 assert_matches!(
777 receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
778 AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryAllChunks(hash, tx)) => {
779 assert_eq!(hash, candidate_hash);
780 drop(tx);
781 });
782 },
783 |mut sender| async move {
784 let local_chunk_indices =
785 state.populate_from_av_store(¶ms, &mut sender).await;
786
787 assert_eq!(state.chunk_count(), 0);
788 assert_eq!(local_chunk_indices.len(), 0);
789 },
790 );
791 }
792
793 {
795 let mut params = params.clone();
796 let candidate_hash = params.candidate_hash;
797 let mut state = State::new();
798 let chunks = params.create_chunks();
799
800 test_harness(
801 |mut receiver: UnboundedReceiver<AllMessages>| async move {
802 assert_matches!(
803 receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
804 AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryAllChunks(hash, tx)) => {
805 assert_eq!(hash, candidate_hash);
806 let mut chunk = chunks[0].clone();
807 chunk.index = 3.into();
808 tx.send(vec![(2.into(), chunk)]).unwrap();
809 });
810 },
811 |mut sender| async move {
812 let local_chunk_indices =
813 state.populate_from_av_store(¶ms, &mut sender).await;
814
815 assert_eq!(state.chunk_count(), 0);
816 assert_eq!(local_chunk_indices.len(), 1);
817 },
818 );
819 }
820
821 {
823 let mut params = params.clone();
824 let candidate_hash = params.candidate_hash;
825 let mut state = State::new();
826 let chunks = params.create_chunks();
827
828 test_harness(
829 |mut receiver: UnboundedReceiver<AllMessages>| async move {
830 assert_matches!(
831 receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
832 AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryAllChunks(hash, tx)) => {
833 assert_eq!(hash, candidate_hash);
834 tx.send(vec![(4.into(), chunks[1].clone())]).unwrap();
835 });
836 },
837 |mut sender| async move {
838 let local_chunk_indices =
839 state.populate_from_av_store(¶ms, &mut sender).await;
840
841 assert_eq!(state.chunk_count(), 1);
842 assert_eq!(local_chunk_indices.len(), 1);
843 },
844 );
845 }
846 }
847
848 #[test]
849 fn test_launch_parallel_chunk_requests() {
850 let params = RecoveryParams::default();
851 let alice: AuthorityDiscoveryId = Sr25519Keyring::Alice.public().into();
852 let bob: AuthorityDiscoveryId = Sr25519Keyring::Bob.public().into();
853 let eve: AuthorityDiscoveryId = Sr25519Keyring::Eve.public().into();
854
855 {
857 let params = params.clone();
858 let mut state = State::new();
859 let mut ongoing_reqs = OngoingRequests::new();
860 let mut validators = VecDeque::new();
861
862 test_harness(
863 |mut receiver: UnboundedReceiver<AllMessages>| async move {
864 assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
866 },
867 |mut sender| async move {
868 state
869 .launch_parallel_chunk_requests(
870 "regular",
871 ¶ms,
872 &mut sender,
873 3,
874 &mut validators,
875 &mut ongoing_reqs,
876 )
877 .await;
878
879 assert_eq!(ongoing_reqs.total_len(), 0);
880 },
881 );
882 }
883
884 {
886 let params = params.clone();
887 let mut state = State::new();
888 let mut ongoing_reqs = OngoingRequests::new();
889 let mut validators = VecDeque::new();
890 validators.push_back((alice.clone(), ValidatorIndex(1)));
891
892 test_harness(
893 |mut receiver: UnboundedReceiver<AllMessages>| async move {
894 assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
896 },
897 |mut sender| async move {
898 state
899 .launch_parallel_chunk_requests(
900 "regular",
901 ¶ms,
902 &mut sender,
903 0,
904 &mut validators,
905 &mut ongoing_reqs,
906 )
907 .await;
908
909 assert_eq!(ongoing_reqs.total_len(), 0);
910 },
911 );
912 }
913
914 {
916 let params = params.clone();
917 let mut state = State::new();
918 let mut ongoing_reqs = OngoingRequests::new();
919 ongoing_reqs.push(async { todo!() }.boxed());
920 ongoing_reqs.soft_cancel();
921 let mut validators = VecDeque::new();
922 validators.push_back((alice.clone(), ValidatorIndex(1)));
923
924 test_harness(
925 |mut receiver: UnboundedReceiver<AllMessages>| async move {
926 assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
928 },
929 |mut sender| async move {
930 state
931 .launch_parallel_chunk_requests(
932 "regular",
933 ¶ms,
934 &mut sender,
935 0,
936 &mut validators,
937 &mut ongoing_reqs,
938 )
939 .await;
940
941 assert_eq!(ongoing_reqs.total_len(), 1);
942 assert_eq!(ongoing_reqs.len(), 0);
943 },
944 );
945 }
946
947 {
949 let params = params.clone();
950 let mut state = State::new();
951 let mut ongoing_reqs = OngoingRequests::new();
952 ongoing_reqs.push(async { todo!() }.boxed());
953 ongoing_reqs.soft_cancel();
954 ongoing_reqs.push(async { todo!() }.boxed());
955 let mut validators = VecDeque::new();
956 validators.push_back((alice.clone(), 0.into()));
957 validators.push_back((bob, 1.into()));
958 validators.push_back((eve, 2.into()));
959
960 test_harness(
961 |mut receiver: UnboundedReceiver<AllMessages>| async move {
962 assert_matches!(
963 receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
964 AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests(requests, _)) if requests.len()
965== 3 );
966 },
967 |mut sender| async move {
968 state
969 .launch_parallel_chunk_requests(
970 "regular",
971 ¶ms,
972 &mut sender,
973 10,
974 &mut validators,
975 &mut ongoing_reqs,
976 )
977 .await;
978
979 assert_eq!(ongoing_reqs.total_len(), 5);
980 assert_eq!(ongoing_reqs.len(), 4);
981 },
982 );
983 }
984
985 {
987 let params = params.clone();
988 let mut state = State::new();
989 let mut ongoing_reqs = OngoingRequests::new();
990 let mut validators = VecDeque::new();
991 validators.push_back((alice, 0.into()));
992
993 test_harness(
994 |mut receiver: UnboundedReceiver<AllMessages>| async move {
995 match receiver.next().timeout(TIMEOUT).await.unwrap().unwrap() {
996 AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendRequests(
997 mut requests,
998 _,
999 )) => {
1000 assert_eq!(requests.len(), 1);
1001 let (protocol, request) = requests.remove(0).encode_request();
1004 assert_eq!(protocol, Protocol::ChunkFetchingV2);
1005 assert_eq!(
1006 request.fallback_request.unwrap().1,
1007 Protocol::ChunkFetchingV1
1008 );
1009 },
1010 _ => unreachable!(),
1011 }
1012 },
1013 |mut sender| async move {
1014 state
1015 .launch_parallel_chunk_requests(
1016 "regular",
1017 ¶ms,
1018 &mut sender,
1019 10,
1020 &mut validators,
1021 &mut ongoing_reqs,
1022 )
1023 .await;
1024
1025 assert_eq!(ongoing_reqs.total_len(), 1);
1026 assert_eq!(ongoing_reqs.len(), 1);
1027 },
1028 );
1029 }
1030 }
1031
1032 #[test]
1033 fn test_wait_for_chunks() {
1034 let params = RecoveryParams::default();
1035 let retry_threshold = 2;
1036
1037 {
1039 let params = params.clone();
1040 let mut state = State::new();
1041 let mut ongoing_reqs = OngoingRequests::new();
1042 let mut validators = VecDeque::new();
1043
1044 test_harness(
1045 |mut receiver: UnboundedReceiver<AllMessages>| async move {
1046 assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
1048 },
1049 |_| async move {
1050 let (total_responses, error_count) = state
1051 .wait_for_chunks(
1052 "regular",
1053 ¶ms,
1054 retry_threshold,
1055 &mut validators,
1056 &mut ongoing_reqs,
1057 &mut vec![],
1058 |_, _, _, _| false,
1059 )
1060 .await;
1061 assert_eq!(total_responses, 0);
1062 assert_eq!(error_count, 0);
1063 assert_eq!(state.chunk_count(), 0);
1064 },
1065 );
1066 }
1067
1068 {
1070 let mut params = params.clone();
1071 let chunks = params.create_chunks();
1072 let mut state = State::new();
1073 let mut ongoing_reqs = OngoingRequests::new();
1074 ongoing_reqs.push(
1075 future::ready((
1076 params.validator_authority_keys[0].clone(),
1077 0.into(),
1078 Ok((Some(chunks[0].clone()), "".into())),
1079 ))
1080 .boxed(),
1081 );
1082 ongoing_reqs.soft_cancel();
1083 ongoing_reqs.push(
1084 future::ready((
1085 params.validator_authority_keys[1].clone(),
1086 1.into(),
1087 Ok((Some(chunks[1].clone()), "".into())),
1088 ))
1089 .boxed(),
1090 );
1091 ongoing_reqs.push(
1092 future::ready((
1093 params.validator_authority_keys[2].clone(),
1094 2.into(),
1095 Ok((None, "".into())),
1096 ))
1097 .boxed(),
1098 );
1099 ongoing_reqs.push(
1100 future::ready((
1101 params.validator_authority_keys[3].clone(),
1102 3.into(),
1103 Err(RequestError::from(DecodingError::from("err"))),
1104 ))
1105 .boxed(),
1106 );
1107 ongoing_reqs.push(
1108 future::ready((
1109 params.validator_authority_keys[4].clone(),
1110 4.into(),
1111 Err(RequestError::NetworkError(RequestFailure::NotConnected)),
1112 ))
1113 .boxed(),
1114 );
1115
1116 let mut validators: VecDeque<_> = (5..params.n_validators as u32)
1117 .map(|i| (params.validator_authority_keys[i as usize].clone(), i.into()))
1118 .collect();
1119 validators.push_back((
1120 Sr25519Keyring::AliceStash.public().into(),
1121 ValidatorIndex(params.n_validators as u32),
1122 ));
1123
1124 test_harness(
1125 |mut receiver: UnboundedReceiver<AllMessages>| async move {
1126 assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
1128 },
1129 |_| async move {
1130 let (total_responses, error_count) = state
1131 .wait_for_chunks(
1132 "regular",
1133 ¶ms,
1134 retry_threshold,
1135 &mut validators,
1136 &mut ongoing_reqs,
1137 &mut vec![],
1138 |_, _, _, _| false,
1139 )
1140 .await;
1141 assert_eq!(total_responses, 5);
1142 assert_eq!(error_count, 3);
1143 assert_eq!(state.chunk_count(), 2);
1144
1145 let mut expected_validators: VecDeque<_> = (4..params.n_validators as u32)
1146 .map(|i| (params.validator_authority_keys[i as usize].clone(), i.into()))
1147 .collect();
1148 expected_validators.push_back((
1149 Sr25519Keyring::AliceStash.public().into(),
1150 ValidatorIndex(params.n_validators as u32),
1151 ));
1152
1153 assert_eq!(validators, expected_validators);
1154
1155 ongoing_reqs.push(
1157 future::ready((
1158 params.validator_authority_keys[4].clone(),
1159 4.into(),
1160 Err(RequestError::NetworkError(RequestFailure::NotConnected)),
1161 ))
1162 .boxed(),
1163 );
1164
1165 let (total_responses, error_count) = state
1166 .wait_for_chunks(
1167 "regular",
1168 ¶ms,
1169 retry_threshold,
1170 &mut validators,
1171 &mut ongoing_reqs,
1172 &mut vec![],
1173 |_, _, _, _| false,
1174 )
1175 .await;
1176 assert_eq!(total_responses, 1);
1177 assert_eq!(error_count, 1);
1178 assert_eq!(state.chunk_count(), 2);
1179
1180 validators.pop_front();
1181 let mut expected_validators: VecDeque<_> = (5..params.n_validators as u32)
1182 .map(|i| (params.validator_authority_keys[i as usize].clone(), i.into()))
1183 .collect();
1184 expected_validators.push_back((
1185 Sr25519Keyring::AliceStash.public().into(),
1186 ValidatorIndex(params.n_validators as u32),
1187 ));
1188
1189 assert_eq!(validators, expected_validators);
1190
1191 let (total_responses, error_count) = state
1193 .wait_for_chunks(
1194 "regular",
1195 ¶ms,
1196 retry_threshold,
1197 &mut validators,
1198 &mut ongoing_reqs,
1199 &mut vec![],
1200 |_, _, _, _| true,
1201 )
1202 .await;
1203 assert_eq!(total_responses, 0);
1204 assert_eq!(error_count, 0);
1205 assert_eq!(state.chunk_count(), 2);
1206
1207 assert_eq!(validators, expected_validators);
1208 },
1209 );
1210 }
1211
1212 {
1214 let mut params = params.clone();
1215 let chunks = params.create_chunks();
1216 let mut state = State::new();
1217 let mut ongoing_reqs = OngoingRequests::new();
1218 ongoing_reqs.push(
1219 future::ready((
1220 params.validator_authority_keys[0].clone(),
1221 0.into(),
1222 Ok((Some(chunks[0].clone()), "".into())),
1223 ))
1224 .boxed(),
1225 );
1226 ongoing_reqs.soft_cancel();
1227 ongoing_reqs.push(
1228 future::ready((
1229 params.validator_authority_keys[1].clone(),
1230 1.into(),
1231 Ok((Some(chunks[1].clone()), "".into())),
1232 ))
1233 .boxed(),
1234 );
1235 ongoing_reqs.push(
1236 future::ready((
1237 params.validator_authority_keys[2].clone(),
1238 2.into(),
1239 Ok((None, "".into())),
1240 ))
1241 .boxed(),
1242 );
1243 ongoing_reqs.push(
1244 future::ready((
1245 params.validator_authority_keys[3].clone(),
1246 3.into(),
1247 Err(RequestError::from(DecodingError::from("err"))),
1248 ))
1249 .boxed(),
1250 );
1251 ongoing_reqs.push(
1252 future::ready((
1253 params.validator_authority_keys[4].clone(),
1254 4.into(),
1255 Err(RequestError::NetworkError(RequestFailure::NotConnected)),
1256 ))
1257 .boxed(),
1258 );
1259
1260 let mut validators: VecDeque<_> = (5..params.n_validators as u32)
1261 .map(|i| (params.validator_authority_keys[i as usize].clone(), i.into()))
1262 .collect();
1263 validators.push_back((
1264 Sr25519Keyring::Eve.public().into(),
1265 ValidatorIndex(params.n_validators as u32),
1266 ));
1267
1268 let mut backup_backers = vec![
1269 params.validator_authority_keys[2].clone(),
1270 params.validator_authority_keys[0].clone(),
1271 params.validator_authority_keys[4].clone(),
1272 params.validator_authority_keys[3].clone(),
1273 Sr25519Keyring::AliceStash.public().into(),
1274 Sr25519Keyring::BobStash.public().into(),
1275 ];
1276
1277 test_harness(
1278 |mut receiver: UnboundedReceiver<AllMessages>| async move {
1279 assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
1281 },
1282 |_| async move {
1283 let (total_responses, error_count) = state
1284 .wait_for_chunks(
1285 "regular",
1286 ¶ms,
1287 retry_threshold,
1288 &mut validators,
1289 &mut ongoing_reqs,
1290 &mut backup_backers,
1291 |_, _, _, _| false,
1292 )
1293 .await;
1294 assert_eq!(total_responses, 5);
1295 assert_eq!(error_count, 3);
1296 assert_eq!(state.chunk_count(), 2);
1297
1298 let mut expected_validators: VecDeque<_> = (5..params.n_validators as u32)
1299 .map(|i| (params.validator_authority_keys[i as usize].clone(), i.into()))
1300 .collect();
1301 expected_validators.push_back((
1302 Sr25519Keyring::Eve.public().into(),
1303 ValidatorIndex(params.n_validators as u32),
1304 ));
1305 expected_validators
1307 .push_front((params.validator_authority_keys[0].clone(), 2.into()));
1308 expected_validators
1309 .push_front((params.validator_authority_keys[2].clone(), 3.into()));
1310 expected_validators
1311 .push_front((params.validator_authority_keys[4].clone(), 4.into()));
1312
1313 assert_eq!(validators, expected_validators);
1314
1315 ongoing_reqs.push(
1317 future::ready((
1318 params.validator_authority_keys[4].clone(),
1319 4.into(),
1320 Err(RequestError::NetworkError(RequestFailure::NotConnected)),
1321 ))
1322 .boxed(),
1323 );
1324
1325 validators.pop_front();
1326
1327 let (total_responses, error_count) = state
1328 .wait_for_chunks(
1329 "regular",
1330 ¶ms,
1331 retry_threshold,
1332 &mut validators,
1333 &mut ongoing_reqs,
1334 &mut backup_backers,
1335 |_, _, _, _| false,
1336 )
1337 .await;
1338 assert_eq!(total_responses, 1);
1339 assert_eq!(error_count, 1);
1340 assert_eq!(state.chunk_count(), 2);
1341
1342 expected_validators.pop_front();
1343 expected_validators
1344 .push_front((Sr25519Keyring::AliceStash.public().into(), 4.into()));
1345
1346 assert_eq!(validators, expected_validators);
1347 },
1348 );
1349 }
1350 }
1351
1352 #[test]
1353 fn test_recovery_strategy_run() {
1354 let params = RecoveryParams::default();
1355
1356 struct GoodStrategy;
1357 #[async_trait::async_trait]
1358 impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender> for GoodStrategy {
1359 fn display_name(&self) -> &'static str {
1360 "GoodStrategy"
1361 }
1362
1363 fn strategy_type(&self) -> &'static str {
1364 "good_strategy"
1365 }
1366
1367 async fn run(
1368 mut self: Box<Self>,
1369 _state: &mut State,
1370 _sender: &mut Sender,
1371 _common_params: &RecoveryParams,
1372 ) -> Result<AvailableData, RecoveryError> {
1373 Ok(dummy_available_data())
1374 }
1375 }
1376
1377 struct UnavailableStrategy;
1378 #[async_trait::async_trait]
1379 impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender>
1380 for UnavailableStrategy
1381 {
1382 fn display_name(&self) -> &'static str {
1383 "UnavailableStrategy"
1384 }
1385
1386 fn strategy_type(&self) -> &'static str {
1387 "unavailable_strategy"
1388 }
1389
1390 async fn run(
1391 mut self: Box<Self>,
1392 _state: &mut State,
1393 _sender: &mut Sender,
1394 _common_params: &RecoveryParams,
1395 ) -> Result<AvailableData, RecoveryError> {
1396 Err(RecoveryError::Unavailable)
1397 }
1398 }
1399
1400 struct InvalidStrategy;
1401 #[async_trait::async_trait]
1402 impl<Sender: overseer::AvailabilityRecoverySenderTrait> RecoveryStrategy<Sender>
1403 for InvalidStrategy
1404 {
1405 fn display_name(&self) -> &'static str {
1406 "InvalidStrategy"
1407 }
1408
1409 fn strategy_type(&self) -> &'static str {
1410 "invalid_strategy"
1411 }
1412
1413 async fn run(
1414 mut self: Box<Self>,
1415 _state: &mut State,
1416 _sender: &mut Sender,
1417 _common_params: &RecoveryParams,
1418 ) -> Result<AvailableData, RecoveryError> {
1419 Err(RecoveryError::Invalid)
1420 }
1421 }
1422
1423 {
1425 let mut params = params.clone();
1426 let strategies = VecDeque::new();
1427 params.bypass_availability_store = true;
1428
1429 test_harness(
1430 |mut receiver: UnboundedReceiver<AllMessages>| async move {
1431 assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
1433 },
1434 |sender| async move {
1435 let task = RecoveryTask::new(sender, params, strategies);
1436
1437 assert_eq!(task.run().await.unwrap_err(), RecoveryError::Unavailable);
1438 },
1439 );
1440 }
1441
1442 {
1444 let params = params.clone();
1445 let strategies = VecDeque::new();
1446 let candidate_hash = params.candidate_hash;
1447
1448 test_harness(
1449 |mut receiver: UnboundedReceiver<AllMessages>| async move {
1450 assert_matches!(
1451 receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
1452 AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryAvailableData(hash, tx)) => {
1453 assert_eq!(hash, candidate_hash);
1454 tx.send(Some(dummy_available_data())).unwrap();
1455 });
1456 },
1457 |sender| async move {
1458 let task = RecoveryTask::new(sender, params, strategies);
1459
1460 assert_eq!(task.run().await.unwrap(), dummy_available_data());
1461 },
1462 );
1463 }
1464
1465 {
1467 let mut params = params.clone();
1468 params.bypass_availability_store = true;
1469 let mut strategies: VecDeque<Box<dyn RecoveryStrategy<TestSubsystemSender>>> =
1470 VecDeque::new();
1471 strategies.push_back(Box::new(InvalidStrategy));
1472 strategies.push_back(Box::new(GoodStrategy));
1473
1474 test_harness(
1475 |mut receiver: UnboundedReceiver<AllMessages>| async move {
1476 assert!(receiver.next().timeout(TIMEOUT).await.unwrap().is_none());
1478 },
1479 |sender| async move {
1480 let task = RecoveryTask::new(sender, params, strategies);
1481
1482 assert_eq!(task.run().await.unwrap_err(), RecoveryError::Invalid);
1483 },
1484 );
1485 }
1486
1487 {
1489 let params = params.clone();
1490 let candidate_hash = params.candidate_hash;
1491 let mut strategies: VecDeque<Box<dyn RecoveryStrategy<TestSubsystemSender>>> =
1492 VecDeque::new();
1493 strategies.push_back(Box::new(UnavailableStrategy));
1494 strategies.push_back(Box::new(GoodStrategy));
1495
1496 test_harness(
1497 |mut receiver: UnboundedReceiver<AllMessages>| async move {
1498 assert_matches!(
1499 receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
1500 AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryAvailableData(hash, tx)) => {
1501 assert_eq!(hash, candidate_hash);
1502 tx.send(Some(dummy_available_data())).unwrap();
1503 });
1504 },
1505 |sender| async move {
1506 let task = RecoveryTask::new(sender, params, strategies);
1507
1508 assert_eq!(task.run().await.unwrap(), dummy_available_data());
1509 },
1510 );
1511 }
1512
1513 {
1515 let params = params.clone();
1516 let candidate_hash = params.candidate_hash;
1517 let mut strategies: VecDeque<Box<dyn RecoveryStrategy<TestSubsystemSender>>> =
1518 VecDeque::new();
1519 strategies.push_back(Box::new(UnavailableStrategy));
1520 strategies.push_back(Box::new(UnavailableStrategy));
1521 strategies.push_back(Box::new(GoodStrategy));
1522 strategies.push_back(Box::new(InvalidStrategy));
1523
1524 test_harness(
1525 |mut receiver: UnboundedReceiver<AllMessages>| async move {
1526 assert_matches!(
1527 receiver.next().timeout(TIMEOUT).await.unwrap().unwrap(),
1528 AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryAvailableData(hash, tx)) => {
1529 assert_eq!(hash, candidate_hash);
1530 tx.send(Some(dummy_available_data())).unwrap();
1531 });
1532 },
1533 |sender| async move {
1534 let task = RecoveryTask::new(sender, params, strategies);
1535
1536 assert_eq!(task.run().await.unwrap(), dummy_available_data());
1537 },
1538 );
1539 }
1540 }
1541
1542 #[test]
1543 fn test_is_unavailable() {
1544 assert_eq!(is_unavailable(0, 0, 0, 0), false);
1545 assert_eq!(is_unavailable(2, 2, 2, 0), false);
1546 assert_eq!(is_unavailable(3, 0, 10, 3), false);
1548 assert_eq!(is_unavailable(3, 2, 0, 3), false);
1549 assert_eq!(is_unavailable(3, 2, 10, 3), false);
1550 assert_eq!(is_unavailable(0, 0, 10, 3), false);
1552 assert_eq!(is_unavailable(0, 0, 3, 3), false);
1553 assert_eq!(is_unavailable(1, 1, 1, 3), false);
1554 assert_eq!(is_unavailable(0, 0, 0, 3), true);
1556 assert_eq!(is_unavailable(2, 3, 2, 10), true);
1557 }
1558}