use sp_runtime::traits::Block as BlockT;
use polkadot_node_primitives::PoV;
use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};
use std::{pin::Pin, sync::Arc};
use crate::RecoveryHandle;
pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
recoveries:
FuturesUnordered<Pin<Box<dyn Future<Output = (Block::Hash, Option<Arc<PoV>>)> + Send>>>,
recovery_handle: Box<dyn RecoveryHandle>,
}
impl<Block: BlockT> ActiveCandidateRecovery<Block> {
pub fn new(recovery_handle: Box<dyn RecoveryHandle>) -> Self {
Self { recoveries: Default::default(), recovery_handle }
}
pub async fn recover_candidate(
&mut self,
block_hash: Block::Hash,
candidate: &crate::Candidate<Block>,
) {
let (tx, rx) = oneshot::channel();
self.recovery_handle
.send_recovery_msg(
AvailabilityRecoveryMessage::RecoverAvailableData(
candidate.receipt.clone(),
candidate.session_index,
None,
None,
tx,
),
"ActiveCandidateRecovery",
)
.await;
self.recoveries.push(
async move {
match rx.await {
Ok(Ok(res)) => (block_hash, Some(res.pov)),
Ok(Err(error)) => {
tracing::debug!(
target: crate::LOG_TARGET,
?error,
?block_hash,
"Availability recovery failed",
);
(block_hash, None)
},
Err(_) => {
tracing::debug!(
target: crate::LOG_TARGET,
"Availability recovery oneshot channel closed",
);
(block_hash, None)
},
}
}
.boxed(),
);
}
pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option<Arc<PoV>>) {
loop {
if let Some(res) = self.recoveries.next().await {
return res
} else {
futures::pending!()
}
}
}
}