referrerpolicy=no-referrer-when-downgrade

cumulus_client_pov_recovery/
active_candidate_recovery.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use sp_runtime::traits::Block as BlockT;
19
20use polkadot_node_primitives::PoV;
21use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
22
23use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};
24
25use std::{pin::Pin, sync::Arc};
26
27use crate::RecoveryHandle;
28
29/// The active candidate recovery.
30///
31/// This handles the candidate recovery and tracks the activate recoveries.
32pub(crate) struct ActiveCandidateRecovery<Block: BlockT> {
33	/// The recoveries that are currently being executed.
34	recoveries:
35		FuturesUnordered<Pin<Box<dyn Future<Output = (Block::Hash, Option<Arc<PoV>>)> + Send>>>,
36	recovery_handle: Box<dyn RecoveryHandle>,
37}
38
39impl<Block: BlockT> ActiveCandidateRecovery<Block> {
40	pub fn new(recovery_handle: Box<dyn RecoveryHandle>) -> Self {
41		Self { recoveries: Default::default(), recovery_handle }
42	}
43
44	/// Recover the given `candidate`.
45	pub async fn recover_candidate(
46		&mut self,
47		block_hash: Block::Hash,
48		candidate: &crate::Candidate<Block>,
49	) {
50		let (tx, rx) = oneshot::channel();
51
52		self.recovery_handle
53			.send_recovery_msg(
54				AvailabilityRecoveryMessage::RecoverAvailableData(
55					candidate.receipt.clone(),
56					candidate.session_index,
57					None,
58					None,
59					tx,
60				),
61				"ActiveCandidateRecovery",
62			)
63			.await;
64
65		self.recoveries.push(
66			async move {
67				match rx.await {
68					Ok(Ok(res)) => (block_hash, Some(res.pov)),
69					Ok(Err(error)) => {
70						tracing::debug!(
71							target: crate::LOG_TARGET,
72							?error,
73							?block_hash,
74							"Availability recovery failed",
75						);
76						(block_hash, None)
77					},
78					Err(_) => {
79						tracing::debug!(
80							target: crate::LOG_TARGET,
81							"Availability recovery oneshot channel closed",
82						);
83						(block_hash, None)
84					},
85				}
86			}
87			.boxed(),
88		);
89	}
90
91	/// Waits for the next recovery.
92	///
93	/// If the returned [`PoV`] is `None`, it means that the recovery failed.
94	pub async fn wait_for_recovery(&mut self) -> (Block::Hash, Option<Arc<PoV>>) {
95		loop {
96			if let Some(res) = self.recoveries.next().await {
97				return res
98			} else {
99				futures::pending!()
100			}
101		}
102	}
103}