1#![warn(missing_docs)]
20
21use std::{
22 collections::{BTreeMap, VecDeque},
23 iter::Iterator,
24 num::NonZeroUsize,
25 pin::Pin,
26};
27
28use futures::{
29 channel::oneshot,
30 future::{Future, FutureExt, RemoteHandle},
31 pin_mut,
32 prelude::*,
33 sink::SinkExt,
34 stream::{FuturesUnordered, StreamExt},
35 task::{Context, Poll},
36};
37use sc_network::ProtocolName;
38use schnellru::{ByLength, LruMap};
39use task::{
40 FetchChunks, FetchChunksParams, FetchFull, FetchFullParams, FetchSystematicChunks,
41 FetchSystematicChunksParams,
42};
43
44use polkadot_erasure_coding::{
45 branches, obtain_chunks_v1, recovery_threshold, systematic_recovery_threshold,
46 Error as ErasureEncodingError,
47};
48use task::{RecoveryParams, RecoveryStrategy, RecoveryTask};
49
50use error::{log_error, Error, FatalError, Result};
51use polkadot_node_network_protocol::{
52 request_response::{
53 v1 as request_v1, v2 as request_v2, IncomingRequestReceiver, IsRequest, ReqProtocolNames,
54 },
55 UnifiedReputationChange as Rep,
56};
57use polkadot_node_primitives::AvailableData;
58use polkadot_node_subsystem::{
59 errors::RecoveryError,
60 messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage},
61 overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
62 SubsystemContext, SubsystemError,
63};
64use polkadot_node_subsystem_util::{
65 availability_chunks::availability_chunk_indices,
66 runtime::{ExtendedSessionInfo, RuntimeInfo},
67};
68use polkadot_primitives::{
69 node_features, BlockNumber, CandidateHash, CandidateReceiptV2 as CandidateReceipt, ChunkIndex,
70 CoreIndex, GroupIndex, Hash, SessionIndex, ValidatorIndex,
71};
72
73mod error;
74mod futures_undead;
75mod metrics;
76mod task;
77pub use metrics::Metrics;
78
79#[cfg(test)]
80mod tests;
81
82type RecoveryResult = std::result::Result<AvailableData, RecoveryError>;
83
84const LOG_TARGET: &str = "parachain::availability-recovery";
85
86const LRU_SIZE: u32 = 16;
88
89const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
90
91pub(crate) const CONSERVATIVE_FETCH_CHUNKS_THRESHOLD: usize = 1 * 1024 * 1024;
93pub const FETCH_CHUNKS_THRESHOLD: usize = 4 * 1024 * 1024;
95
96#[derive(Clone, PartialEq)]
97pub enum RecoveryStrategyKind {
99 BackersFirstIfSizeLower(usize),
102 BackersFirstIfSizeLowerThenSystematicChunks(usize),
105
106 #[allow(dead_code)]
110 BackersFirstAlways,
111 #[allow(dead_code)]
113 ChunksAlways,
114 #[allow(dead_code)]
116 BackersThenSystematicChunks,
117 #[allow(dead_code)]
119 SystematicChunks,
120}
121
122pub struct AvailabilityRecoverySubsystem {
124 recovery_strategy_kind: RecoveryStrategyKind,
126 bypass_availability_store: bool,
131 req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
133 metrics: Metrics,
135 post_recovery_check: PostRecoveryCheck,
137 req_v1_protocol_name: ProtocolName,
139 req_v2_protocol_name: ProtocolName,
141}
142
143#[derive(Clone, PartialEq, Debug)]
144enum PostRecoveryCheck {
146 Reencode,
148 PovHash,
150}
151
152enum ErasureTask {
154 Reconstruct(
156 usize,
157 BTreeMap<ChunkIndex, Vec<u8>>,
158 oneshot::Sender<std::result::Result<AvailableData, ErasureEncodingError>>,
159 ),
160 Reencode(usize, Hash, AvailableData, oneshot::Sender<Option<AvailableData>>),
163}
164
165fn reconstructed_data_matches_root(
181 n_validators: usize,
182 expected_root: &Hash,
183 data: &AvailableData,
184 metrics: &Metrics,
185) -> bool {
186 let _timer = metrics.time_reencode_chunks();
187
188 let chunks = match obtain_chunks_v1(n_validators, data) {
189 Ok(chunks) => chunks,
190 Err(e) => {
191 gum::debug!(
192 target: LOG_TARGET,
193 err = ?e,
194 "Failed to obtain chunks",
195 );
196 return false
197 },
198 };
199
200 let branches = branches(&chunks);
201
202 branches.root() == *expected_root
203}
204
205struct RecoveryHandle {
207 candidate_hash: CandidateHash,
208 remote: RemoteHandle<RecoveryResult>,
209 awaiting: Vec<oneshot::Sender<RecoveryResult>>,
210}
211
212impl Future for RecoveryHandle {
213 type Output = Option<(CandidateHash, RecoveryResult)>;
214
215 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
216 let mut indices_to_remove = Vec::new();
217 for (i, awaiting) in self.awaiting.iter_mut().enumerate().rev() {
218 if let Poll::Ready(()) = awaiting.poll_canceled(cx) {
219 indices_to_remove.push(i);
220 }
221 }
222
223 for index in indices_to_remove {
225 gum::debug!(
226 target: LOG_TARGET,
227 candidate_hash = ?self.candidate_hash,
228 "Receiver for available data dropped.",
229 );
230
231 self.awaiting.swap_remove(index);
232 }
233
234 if self.awaiting.is_empty() {
235 gum::debug!(
236 target: LOG_TARGET,
237 candidate_hash = ?self.candidate_hash,
238 "All receivers for available data dropped.",
239 );
240
241 return Poll::Ready(None)
242 }
243
244 let remote = &mut self.remote;
245 futures::pin_mut!(remote);
246 let result = futures::ready!(remote.poll(cx));
247
248 for awaiting in self.awaiting.drain(..) {
249 let _ = awaiting.send(result.clone());
250 }
251
252 Poll::Ready(Some((self.candidate_hash, result)))
253 }
254}
255
256#[derive(Debug, Clone)]
258enum CachedRecovery {
259 Valid(AvailableData),
261 Invalid,
263}
264
265impl CachedRecovery {
266 fn into_result(self) -> RecoveryResult {
268 match self {
269 Self::Valid(d) => Ok(d),
270 Self::Invalid => Err(RecoveryError::Invalid),
271 }
272 }
273}
274
275impl TryFrom<RecoveryResult> for CachedRecovery {
276 type Error = ();
277 fn try_from(o: RecoveryResult) -> std::result::Result<CachedRecovery, Self::Error> {
278 match o {
279 Ok(d) => Ok(Self::Valid(d)),
280 Err(RecoveryError::Invalid) => Ok(Self::Invalid),
281 Err(RecoveryError::Unavailable) => Err(()),
284 Err(RecoveryError::ChannelClosed) => Err(()),
285 }
286 }
287}
288
289struct State {
290 ongoing_recoveries: FuturesUnordered<RecoveryHandle>,
293
294 live_block: (BlockNumber, Hash),
296
297 availability_lru: LruMap<CandidateHash, CachedRecovery>,
299
300 runtime_info: RuntimeInfo,
302}
303
304impl Default for State {
305 fn default() -> Self {
306 Self {
307 ongoing_recoveries: FuturesUnordered::new(),
308 live_block: (0, Hash::default()),
309 availability_lru: LruMap::new(ByLength::new(LRU_SIZE)),
310 runtime_info: RuntimeInfo::new(None),
311 }
312 }
313}
314
315#[overseer::subsystem(AvailabilityRecovery, error=SubsystemError, prefix=self::overseer)]
316impl<Context> AvailabilityRecoverySubsystem {
317 fn start(self, ctx: Context) -> SpawnedSubsystem {
318 let future = self
319 .run(ctx)
320 .map_err(|e| SubsystemError::with_origin("availability-recovery", e))
321 .boxed();
322 SpawnedSubsystem { name: "availability-recovery-subsystem", future }
323 }
324}
325
326async fn handle_signal(state: &mut State, signal: OverseerSignal) -> bool {
329 match signal {
330 OverseerSignal::Conclude => true,
331 OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. }) => {
332 if let Some(activated) = activated {
334 if activated.number > state.live_block.0 {
335 state.live_block = (activated.number, activated.hash)
336 }
337 }
338
339 false
340 },
341 OverseerSignal::BlockFinalized(_, _) => false,
342 }
343}
344
345#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
347async fn launch_recovery_task<Context>(
348 state: &mut State,
349 ctx: &mut Context,
350 response_sender: oneshot::Sender<RecoveryResult>,
351 recovery_strategies: VecDeque<Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>>,
352 params: RecoveryParams,
353) -> Result<()> {
354 let candidate_hash = params.candidate_hash;
355 let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategies);
356
357 let (remote, remote_handle) = recovery_task.run().remote_handle();
358
359 state.ongoing_recoveries.push(RecoveryHandle {
360 candidate_hash,
361 remote: remote_handle,
362 awaiting: vec![response_sender],
363 });
364
365 ctx.spawn("recovery-task", Box::pin(remote))
366 .map_err(|err| Error::SpawnTask(err))
367}
368
369#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
371async fn handle_recover<Context>(
372 state: &mut State,
373 ctx: &mut Context,
374 receipt: CandidateReceipt,
375 session_index: SessionIndex,
376 backing_group: Option<GroupIndex>,
377 response_sender: oneshot::Sender<RecoveryResult>,
378 metrics: &Metrics,
379 erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
380 recovery_strategy_kind: RecoveryStrategyKind,
381 bypass_availability_store: bool,
382 post_recovery_check: PostRecoveryCheck,
383 maybe_core_index: Option<CoreIndex>,
384 req_v1_protocol_name: ProtocolName,
385 req_v2_protocol_name: ProtocolName,
386) -> Result<()> {
387 let candidate_hash = receipt.hash();
388
389 if let Some(result) =
390 state.availability_lru.get(&candidate_hash).cloned().map(|v| v.into_result())
391 {
392 return response_sender.send(result).map_err(|_| Error::CanceledResponseSender)
393 }
394
395 if let Some(i) =
396 state.ongoing_recoveries.iter_mut().find(|i| i.candidate_hash == candidate_hash)
397 {
398 i.awaiting.push(response_sender);
399 return Ok(())
400 }
401
402 let session_info_res = state
403 .runtime_info
404 .get_session_info_by_index(ctx.sender(), state.live_block.1, session_index)
405 .await;
406
407 match session_info_res {
408 Ok(ExtendedSessionInfo { session_info, node_features, .. }) => {
409 let mut backer_group = None;
410 let n_validators = session_info.validators.len();
411 let systematic_threshold = systematic_recovery_threshold(n_validators)?;
412 let mut recovery_strategies: VecDeque<
413 Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>,
414 > = VecDeque::with_capacity(3);
415
416 if let Some(backing_group) = backing_group {
417 if let Some(backing_validators) = session_info.validator_groups.get(backing_group) {
418 let mut small_pov_size = true;
419
420 match recovery_strategy_kind {
421 RecoveryStrategyKind::BackersFirstIfSizeLower(fetch_chunks_threshold) |
422 RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(
423 fetch_chunks_threshold,
424 ) => {
425 let chunk_size: Result<Option<usize>> =
427 query_chunk_size(ctx, candidate_hash).await;
428 if let Ok(Some(chunk_size)) = chunk_size {
429 let pov_size_estimate = chunk_size * systematic_threshold;
430 small_pov_size = pov_size_estimate < fetch_chunks_threshold;
431
432 if small_pov_size {
433 gum::trace!(
434 target: LOG_TARGET,
435 ?candidate_hash,
436 pov_size_estimate,
437 fetch_chunks_threshold,
438 "Prefer fetch from backing group",
439 );
440 }
441 } else {
442 small_pov_size = false;
445 }
446 },
447 _ => {},
448 };
449
450 match (&recovery_strategy_kind, small_pov_size) {
451 (RecoveryStrategyKind::BackersFirstAlways, _) |
452 (RecoveryStrategyKind::BackersFirstIfSizeLower(_), true) |
453 (
454 RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(_),
455 true,
456 ) |
457 (RecoveryStrategyKind::BackersThenSystematicChunks, _) =>
458 recovery_strategies.push_back(Box::new(FetchFull::new(
459 FetchFullParams { validators: backing_validators.to_vec() },
460 ))),
461 _ => {},
462 };
463
464 backer_group = Some(backing_validators);
465 }
466 }
467
468 let chunk_mapping_enabled = if let Some(&true) = node_features
469 .get(usize::from(node_features::FeatureIndex::AvailabilityChunkMapping as u8))
470 .as_deref()
471 {
472 true
473 } else {
474 false
475 };
476
477 if let Some(core_index) = maybe_core_index {
480 if matches!(
481 recovery_strategy_kind,
482 RecoveryStrategyKind::BackersThenSystematicChunks |
483 RecoveryStrategyKind::SystematicChunks |
484 RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(_)
485 ) && chunk_mapping_enabled
486 {
487 let chunk_indices =
488 availability_chunk_indices(node_features, n_validators, core_index)?;
489
490 let chunk_indices: VecDeque<_> = chunk_indices
491 .iter()
492 .enumerate()
493 .map(|(v_index, c_index)| {
494 (
495 *c_index,
496 ValidatorIndex(
497 u32::try_from(v_index)
498 .expect("validator count should not exceed u32"),
499 ),
500 )
501 })
502 .collect();
503
504 let validators = chunk_indices
506 .clone()
507 .into_iter()
508 .filter(|(c_index, _)| {
509 usize::try_from(c_index.0)
510 .expect("usize is at least u32 bytes on all modern targets.") <
511 systematic_threshold
512 })
513 .collect();
514
515 recovery_strategies.push_back(Box::new(FetchSystematicChunks::new(
516 FetchSystematicChunksParams {
517 validators,
518 backers: backer_group.map(|v| v.to_vec()).unwrap_or_else(|| vec![]),
519 },
520 )));
521 }
522 }
523
524 recovery_strategies.push_back(Box::new(FetchChunks::new(FetchChunksParams {
525 n_validators: session_info.validators.len(),
526 })));
527
528 let session_info = session_info.clone();
529
530 let n_validators = session_info.validators.len();
531
532 launch_recovery_task(
533 state,
534 ctx,
535 response_sender,
536 recovery_strategies,
537 RecoveryParams {
538 validator_authority_keys: session_info.discovery_keys.clone(),
539 n_validators,
540 threshold: recovery_threshold(n_validators)?,
541 systematic_threshold,
542 candidate_hash,
543 erasure_root: receipt.descriptor.erasure_root(),
544 metrics: metrics.clone(),
545 bypass_availability_store,
546 post_recovery_check,
547 pov_hash: receipt.descriptor.pov_hash(),
548 req_v1_protocol_name,
549 req_v2_protocol_name,
550 chunk_mapping_enabled,
551 erasure_task_tx,
552 },
553 )
554 .await
555 },
556 Err(_) => {
557 response_sender
558 .send(Err(RecoveryError::Unavailable))
559 .map_err(|_| Error::CanceledResponseSender)?;
560
561 Err(Error::SessionInfoUnavailable(state.live_block.1))
562 },
563 }
564}
565
566#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
568async fn query_full_data<Context>(
569 ctx: &mut Context,
570 candidate_hash: CandidateHash,
571) -> Result<Option<AvailableData>> {
572 let (tx, rx) = oneshot::channel();
573 ctx.send_message(AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx))
574 .await;
575
576 rx.await.map_err(Error::CanceledQueryFullData)
577}
578
579#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
581async fn query_chunk_size<Context>(
582 ctx: &mut Context,
583 candidate_hash: CandidateHash,
584) -> Result<Option<usize>> {
585 let (tx, rx) = oneshot::channel();
586 ctx.send_message(AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx))
587 .await;
588
589 rx.await.map_err(Error::CanceledQueryFullData)
590}
591
592#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
593impl AvailabilityRecoverySubsystem {
594 pub fn for_collator(
598 fetch_chunks_threshold: Option<usize>,
599 req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
600 req_protocol_names: &ReqProtocolNames,
601 metrics: Metrics,
602 ) -> Self {
603 Self {
604 recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(
605 fetch_chunks_threshold.unwrap_or(CONSERVATIVE_FETCH_CHUNKS_THRESHOLD),
606 ),
607 bypass_availability_store: true,
608 post_recovery_check: PostRecoveryCheck::PovHash,
609 req_receiver,
610 metrics,
611 req_v1_protocol_name: req_protocol_names
612 .get_name(request_v1::ChunkFetchingRequest::PROTOCOL),
613 req_v2_protocol_name: req_protocol_names
614 .get_name(request_v2::ChunkFetchingRequest::PROTOCOL),
615 }
616 }
617
618 pub fn for_validator(
627 fetch_chunks_threshold: Option<usize>,
628 req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
629 req_protocol_names: &ReqProtocolNames,
630 metrics: Metrics,
631 ) -> Self {
632 Self {
633 recovery_strategy_kind:
634 RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(
635 fetch_chunks_threshold.unwrap_or(CONSERVATIVE_FETCH_CHUNKS_THRESHOLD),
636 ),
637 bypass_availability_store: false,
638 post_recovery_check: PostRecoveryCheck::Reencode,
639 req_receiver,
640 metrics,
641 req_v1_protocol_name: req_protocol_names
642 .get_name(request_v1::ChunkFetchingRequest::PROTOCOL),
643 req_v2_protocol_name: req_protocol_names
644 .get_name(request_v2::ChunkFetchingRequest::PROTOCOL),
645 }
646 }
647
648 #[cfg(any(test, feature = "subsystem-benchmarks"))]
651 pub fn with_recovery_strategy_kind(
652 req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
653 req_protocol_names: &ReqProtocolNames,
654 metrics: Metrics,
655 recovery_strategy_kind: RecoveryStrategyKind,
656 ) -> Self {
657 Self {
658 recovery_strategy_kind,
659 bypass_availability_store: false,
660 post_recovery_check: PostRecoveryCheck::Reencode,
661 req_receiver,
662 metrics,
663 req_v1_protocol_name: req_protocol_names
664 .get_name(request_v1::ChunkFetchingRequest::PROTOCOL),
665 req_v2_protocol_name: req_protocol_names
666 .get_name(request_v2::ChunkFetchingRequest::PROTOCOL),
667 }
668 }
669
670 pub async fn run<Context>(self, mut ctx: Context) -> std::result::Result<(), FatalError> {
672 let mut state = State::default();
673 let Self {
674 mut req_receiver,
675 metrics,
676 recovery_strategy_kind,
677 bypass_availability_store,
678 post_recovery_check,
679 req_v1_protocol_name,
680 req_v2_protocol_name,
681 } = self;
682
683 let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16);
684 let mut erasure_task_rx = erasure_task_rx.fuse();
685
686 let mut to_pool = ThreadPoolBuilder::build(
705 NonZeroUsize::new(2).expect("There are 2 threads; qed"),
707 metrics.clone(),
708 &mut ctx,
709 )
710 .into_iter()
711 .cycle();
712
713 loop {
714 let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
715 pin_mut!(recv_req);
716 let res = futures::select! {
717 erasure_task = erasure_task_rx.next() => {
718 match erasure_task {
719 Some(task) => {
720 to_pool
721 .next()
722 .expect("Pool size is `NonZeroUsize`; qed")
723 .send(task)
724 .await
725 .map_err(|_| RecoveryError::ChannelClosed)
726 },
727 None => {
728 Err(RecoveryError::ChannelClosed)
729 }
730 }.map_err(Into::into)
731 }
732 signal = ctx.recv().fuse() => {
733 match signal {
734 Ok(signal) => {
735 match signal {
736 FromOrchestra::Signal(signal) => if handle_signal(
737 &mut state,
738 signal,
739 ).await {
740 gum::debug!(target: LOG_TARGET, "subsystem concluded");
741 return Ok(());
742 } else {
743 Ok(())
744 },
745 FromOrchestra::Communication {
746 msg: AvailabilityRecoveryMessage::RecoverAvailableData(
747 receipt,
748 session_index,
749 maybe_backing_group,
750 maybe_core_index,
751 response_sender,
752 )
753 } => handle_recover(
754 &mut state,
755 &mut ctx,
756 receipt,
757 session_index,
758 maybe_backing_group,
759 response_sender,
760 &metrics,
761 erasure_task_tx.clone(),
762 recovery_strategy_kind.clone(),
763 bypass_availability_store,
764 post_recovery_check.clone(),
765 maybe_core_index,
766 req_v1_protocol_name.clone(),
767 req_v2_protocol_name.clone(),
768 ).await
769 }
770 },
771 Err(e) => Err(Error::SubsystemReceive(e))
772 }
773 }
774 in_req = recv_req => {
775 match in_req {
776 Ok(req) => {
777 if bypass_availability_store {
778 gum::debug!(
779 target: LOG_TARGET,
780 "Skipping request to availability-store.",
781 );
782 let _ = req.send_response(None.into());
783 Ok(())
784 } else {
785 match query_full_data(&mut ctx, req.payload.candidate_hash).await {
786 Ok(res) => {
787 let _ = req.send_response(res.into());
788 Ok(())
789 }
790 Err(e) => {
791 let _ = req.send_response(None.into());
792 Err(e)
793 }
794 }
795 }
796 }
797 Err(e) => Err(Error::IncomingRequest(e))
798 }
799 }
800 output = state.ongoing_recoveries.select_next_some() => {
801 let mut res = Ok(());
802 if let Some((candidate_hash, result)) = output {
803 if let Err(ref e) = result {
804 res = Err(Error::Recovery(e.clone()));
805 }
806
807 if let Ok(recovery) = CachedRecovery::try_from(result) {
808 state.availability_lru.insert(candidate_hash, recovery);
809 }
810 }
811
812 res
813 }
814 };
815
816 if let Err(e) = res {
818 log_error(Err(e))?;
819 }
820 }
821 }
822}
823
824struct ThreadPoolBuilder;
826
827const MAX_THREADS: NonZeroUsize = match NonZeroUsize::new(4) {
828 Some(max_threads) => max_threads,
829 None => panic!("MAX_THREADS must be non-zero"),
830};
831
832impl ThreadPoolBuilder {
833 #[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
848 pub fn build<Context>(
849 size: NonZeroUsize,
850 metrics: Metrics,
851 ctx: &mut Context,
852 ) -> Vec<futures::channel::mpsc::Sender<ErasureTask>> {
853 let size = std::cmp::min(size, MAX_THREADS);
855 let mut senders = Vec::new();
856
857 for index in 0..size.into() {
858 let (tx, rx) = futures::channel::mpsc::channel(8);
859 senders.push(tx);
860
861 if let Err(e) = ctx
862 .spawn_blocking("erasure-task", Box::pin(erasure_task_thread(metrics.clone(), rx)))
863 {
864 gum::warn!(
865 target: LOG_TARGET,
866 err = ?e,
867 index,
868 "Failed to spawn a erasure task",
869 );
870 }
871 }
872 senders
873 }
874}
875
876async fn erasure_task_thread(
878 metrics: Metrics,
879 mut ingress: futures::channel::mpsc::Receiver<ErasureTask>,
880) {
881 loop {
882 match ingress.next().await {
883 Some(ErasureTask::Reconstruct(n_validators, chunks, sender)) => {
884 let _ = sender.send(polkadot_erasure_coding::reconstruct_v1(
885 n_validators,
886 chunks.iter().map(|(c_index, chunk)| {
887 (
888 &chunk[..],
889 usize::try_from(c_index.0)
890 .expect("usize is at least u32 bytes on all modern targets."),
891 )
892 }),
893 ));
894 },
895 Some(ErasureTask::Reencode(n_validators, root, available_data, sender)) => {
896 let metrics = metrics.clone();
897
898 let maybe_data = if reconstructed_data_matches_root(
899 n_validators,
900 &root,
901 &available_data,
902 &metrics,
903 ) {
904 Some(available_data)
905 } else {
906 None
907 };
908
909 let _ = sender.send(maybe_data);
910 },
911 None => {
912 gum::trace!(
913 target: LOG_TARGET,
914 "Erasure task channel closed. Node shutting down ?",
915 );
916 break
917 },
918 }
919
920 #[cfg(feature = "subsystem-benchmarks")]
923 tokio::task::yield_now().await;
924 }
925}