cumulus_client_pov_recovery/
active_candidate_recovery.rs1use sp_runtime::traits::Block as BlockT;
19
20use polkadot_node_primitives::PoV;
21use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
22
23use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};
24
25use std::{pin::Pin, sync::Arc};
26
27use crate::RecoveryHandle;
28
29pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
33 recoveries:
35 FuturesUnordered<Pin<Box<dyn Future<Output = (Block::Hash, Option<Arc<PoV>>)> + Send>>>,
36 recovery_handle: Box<dyn RecoveryHandle>,
37}
38
39impl<Block: BlockT> ActiveCandidateRecovery<Block> {
40 pub fn new(recovery_handle: Box<dyn RecoveryHandle>) -> Self {
41 Self { recoveries: Default::default(), recovery_handle }
42 }
43
44 pub async fn recover_candidate(
46 &mut self,
47 block_hash: Block::Hash,
48 candidate: &crate::Candidate<Block>,
49 ) {
50 let (tx, rx) = oneshot::channel();
51
52 self.recovery_handle
53 .send_recovery_msg(
54 AvailabilityRecoveryMessage::RecoverAvailableData(
55 candidate.receipt.clone(),
56 candidate.session_index,
57 None,
58 None,
59 tx,
60 ),
61 "ActiveCandidateRecovery",
62 )
63 .await;
64
65 self.recoveries.push(
66 async move {
67 match rx.await {
68 Ok(Ok(res)) => (block_hash, Some(res.pov)),
69 Ok(Err(error)) => {
70 tracing::debug!(
71 target: crate::LOG_TARGET,
72 ?error,
73 ?block_hash,
74 "Availability recovery failed",
75 );
76 (block_hash, None)
77 },
78 Err(_) => {
79 tracing::debug!(
80 target: crate::LOG_TARGET,
81 "Availability recovery oneshot channel closed",
82 );
83 (block_hash, None)
84 },
85 }
86 }
87 .boxed(),
88 );
89 }
90
91 pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option<Arc<PoV>>) {
95 loop {
96 if let Some(res) = self.recoveries.next().await {
97 return res
98 } else {
99 futures::pending!()
100 }
101 }
102 }
103}