referrerpolicy=no-referrer-when-downgrade

cumulus_client_pov_recovery/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! Parachain PoV recovery
19//!
20//! A parachain needs to build PoVs that are send to the relay chain to progress. These PoVs are
21//! erasure encoded and one piece of it is stored by each relay chain validator. As the relay chain
22//! decides on which PoV per parachain to include and thus, to progress the parachain it can happen
23//! that the block corresponding to this PoV isn't propagated in the parachain network. This can
24//! have several reasons, either a malicious collator that managed to include its own PoV and
25//! doesn't want to share it with the rest of the network or maybe a collator went down before it
26//! could distribute the block in the network. When something like this happens we can use the PoV
27//! recovery algorithm implemented in this crate to recover a PoV and to propagate it with the rest
28//! of the network.
29//!
30//! It works in the following way:
31//!
32//! 1. For every included relay chain block we note the backed candidate of our parachain. If the
33//!    block belonging to the PoV is already known, we do nothing. Otherwise we start a timer that
34//!    waits for a randomized time inside a specified interval before starting to
35//! recover    the PoV.
36//!
37//! 2. If between starting and firing the timer the block is imported, we skip the recovery of the
38//!    PoV.
39//!
40//! 3. If the timer fired we recover the PoV using the relay chain PoV recovery protocol.
41//!
42//! 4a. After it is recovered, we restore the block and import it.
43//!
44//! 4b. Since we are trying to recover pending candidates, availability is not guaranteed. If the
45//! block     PoV is not yet available, we retry.
46//!
47//! If we need to recover multiple PoV blocks (which should hopefully not happen in real life), we
48//! make sure that the blocks are imported in the correct order.
49
50use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
51use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
52use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
53use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
54
55use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT};
56use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
57use polkadot_overseer::Handle as OverseerHandle;
58use polkadot_primitives::{
59	CandidateReceiptV2 as CandidateReceipt,
60	CommittedCandidateReceiptV2 as CommittedCandidateReceipt, Id as ParaId, SessionIndex,
61};
62
63use cumulus_primitives_core::ParachainBlockData;
64use cumulus_relay_chain_interface::RelayChainInterface;
65use cumulus_relay_chain_streams::pending_candidates;
66
67use codec::{Decode, DecodeAll};
68use futures::{
69	channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, StreamExt,
70};
71use futures_timer::Delay;
72use rand::{distributions::Uniform, prelude::Distribution, thread_rng};
73
74use std::{
75	collections::{HashMap, HashSet, VecDeque},
76	pin::Pin,
77	sync::Arc,
78	time::Duration,
79};
80
81#[cfg(test)]
82mod tests;
83
84mod active_candidate_recovery;
85use active_candidate_recovery::ActiveCandidateRecovery;
86
87const LOG_TARGET: &str = "cumulus-pov-recovery";
88
89/// Test-friendly wrapper trait for the overseer handle.
90/// Can be used to simulate failing recovery requests.
91#[async_trait::async_trait]
92pub trait RecoveryHandle: Send {
93	async fn send_recovery_msg(
94		&mut self,
95		message: AvailabilityRecoveryMessage,
96		origin: &'static str,
97	);
98}
99
100#[async_trait::async_trait]
101impl RecoveryHandle for OverseerHandle {
102	async fn send_recovery_msg(
103		&mut self,
104		message: AvailabilityRecoveryMessage,
105		origin: &'static str,
106	) {
107		self.send_msg(message, origin).await;
108	}
109}
110
111/// Type of recovery to trigger.
112#[derive(Debug, PartialEq)]
113pub enum RecoveryKind {
114	/// Single block recovery.
115	Simple,
116	/// Full ancestry recovery.
117	Full,
118}
119
120/// Structure used to trigger an explicit recovery request via `PoVRecovery`.
121pub struct RecoveryRequest<Block: BlockT> {
122	/// Hash of the last block to recover.
123	pub hash: Block::Hash,
124	/// Recovery type.
125	pub kind: RecoveryKind,
126}
127
128/// The delay between observing an unknown block and triggering the recovery of a block.
129/// Randomizing the start of the recovery within this interval
130/// can be used to prevent self-DOSing if the recovery request is part of a
131/// distributed protocol and there is the possibility that multiple actors are
132/// requiring to perform the recovery action at approximately the same time.
133#[derive(Clone, Copy)]
134pub struct RecoveryDelayRange {
135	/// Start recovering after `min` delay.
136	pub min: Duration,
137	/// Start recovering before `max` delay.
138	pub max: Duration,
139}
140
141impl RecoveryDelayRange {
142	/// Produce a randomized duration between `min` and `max`.
143	fn duration(&self) -> Duration {
144		Uniform::from(self.min..=self.max).sample(&mut thread_rng())
145	}
146}
147
148/// Represents an outstanding block candidate.
149struct Candidate<Block: BlockT> {
150	receipt: CandidateReceipt,
151	session_index: SessionIndex,
152	block_number: NumberFor<Block>,
153	parent_hash: Block::Hash,
154	// Lazy recovery has been submitted.
155	// Should be true iff a block is either queued to be recovered or
156	// recovery is currently in progress.
157	waiting_recovery: bool,
158}
159
160/// Queue that is used to decide when to start PoV-recovery operations.
161struct RecoveryQueue<Block: BlockT> {
162	recovery_delay_range: RecoveryDelayRange,
163	// Queue that keeps the hashes of blocks to be recovered.
164	recovery_queue: VecDeque<Block::Hash>,
165	// Futures that resolve when a new recovery should be started.
166	signaling_queue: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
167}
168
169impl<Block: BlockT> RecoveryQueue<Block> {
170	pub fn new(recovery_delay_range: RecoveryDelayRange) -> Self {
171		Self {
172			recovery_delay_range,
173			recovery_queue: Default::default(),
174			signaling_queue: Default::default(),
175		}
176	}
177
178	/// Add hash of a block that should go to the end of the recovery queue.
179	/// A new recovery will be signaled after `delay` has passed.
180	pub fn push_recovery(&mut self, hash: Block::Hash) {
181		let delay = self.recovery_delay_range.duration();
182		tracing::debug!(
183			target: LOG_TARGET,
184			block_hash = ?hash,
185			"Adding block to queue and adding new recovery slot in {:?} sec",
186			delay.as_secs(),
187		);
188		self.recovery_queue.push_back(hash);
189		self.signaling_queue.push(
190			async move {
191				Delay::new(delay).await;
192			}
193			.boxed(),
194		);
195	}
196
197	/// Get the next hash for block recovery.
198	pub async fn next_recovery(&mut self) -> Block::Hash {
199		loop {
200			if self.signaling_queue.next().await.is_some() {
201				if let Some(hash) = self.recovery_queue.pop_front() {
202					return hash;
203				} else {
204					tracing::error!(
205						target: LOG_TARGET,
206						"Recovery was signaled, but no candidate hash available. This is a bug."
207					);
208				};
209			}
210			futures::pending!()
211		}
212	}
213}
214
215/// Encapsulates the logic of the pov recovery.
216pub struct PoVRecovery<Block: BlockT, PC, RC> {
217	/// All the pending candidates that we are waiting for to be imported or that need to be
218	/// recovered when `next_candidate_to_recover` tells us to do so.
219	candidates: HashMap<Block::Hash, Candidate<Block>>,
220	/// A stream of futures that resolve to hashes of candidates that need to be recovered.
221	///
222	/// The candidates to the hashes are stored in `candidates`. If a candidate is not
223	/// available anymore in this map, it means that it was already imported.
224	candidate_recovery_queue: RecoveryQueue<Block>,
225	active_candidate_recovery: ActiveCandidateRecovery<Block>,
226	/// Blocks that wait that the parent is imported.
227	///
228	/// Uses parent -> blocks mapping.
229	waiting_for_parent: HashMap<Block::Hash, Vec<Block>>,
230	parachain_client: Arc<PC>,
231	parachain_import_queue: Box<dyn ImportQueueService<Block>>,
232	relay_chain_interface: RC,
233	para_id: ParaId,
234	/// Explicit block recovery requests channel.
235	recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
236	/// Blocks that we are retrying currently
237	candidates_in_retry: HashSet<Block::Hash>,
238	parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
239}
240
241impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
242where
243	PC: BlockBackend<Block> + BlockchainEvents<Block> + UsageProvider<Block>,
244	RCInterface: RelayChainInterface + Clone,
245{
246	/// Create a new instance.
247	pub fn new(
248		recovery_handle: Box<dyn RecoveryHandle>,
249		recovery_delay_range: RecoveryDelayRange,
250		parachain_client: Arc<PC>,
251		parachain_import_queue: Box<dyn ImportQueueService<Block>>,
252		relay_chain_interface: RCInterface,
253		para_id: ParaId,
254		recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
255		parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
256	) -> Self {
257		Self {
258			candidates: HashMap::new(),
259			candidate_recovery_queue: RecoveryQueue::new(recovery_delay_range),
260			active_candidate_recovery: ActiveCandidateRecovery::new(recovery_handle),
261			waiting_for_parent: HashMap::new(),
262			parachain_client,
263			parachain_import_queue,
264			relay_chain_interface,
265			para_id,
266			candidates_in_retry: HashSet::new(),
267			recovery_chan_rx,
268			parachain_sync_service,
269		}
270	}
271
272	/// Handle a new pending candidate.
273	fn handle_pending_candidate(
274		&mut self,
275		receipt: CommittedCandidateReceipt,
276		session_index: SessionIndex,
277	) {
278		let header = match Block::Header::decode(&mut &receipt.commitments.head_data.0[..]) {
279			Ok(header) => header,
280			Err(e) => {
281				tracing::warn!(
282					target: LOG_TARGET,
283					error = ?e,
284					"Failed to decode parachain header from pending candidate",
285				);
286				return;
287			},
288		};
289
290		if *header.number() <= self.parachain_client.usage_info().chain.finalized_number {
291			return;
292		}
293
294		let hash = header.hash();
295
296		if self.candidates.contains_key(&hash) {
297			return;
298		}
299
300		tracing::debug!(target: LOG_TARGET, block_hash = ?hash, "Adding outstanding candidate");
301		self.candidates.insert(
302			hash,
303			Candidate {
304				block_number: *header.number(),
305				receipt: receipt.to_plain(),
306				session_index,
307				parent_hash: *header.parent_hash(),
308				waiting_recovery: false,
309			},
310		);
311
312		// If required, triggers a lazy recovery request that will eventually be blocked
313		// if in the meantime the block is imported.
314		self.recover(RecoveryRequest { hash, kind: RecoveryKind::Simple });
315	}
316
317	/// Block is no longer waiting for recovery
318	fn clear_waiting_recovery(&mut self, block_hash: &Block::Hash) {
319		if let Some(candidate) = self.candidates.get_mut(block_hash) {
320			// Prevents triggering an already enqueued recovery request
321			candidate.waiting_recovery = false;
322		}
323	}
324
325	/// Handle a finalized block with the given `block_number`.
326	fn handle_block_finalized(&mut self, block_number: NumberFor<Block>) {
327		self.candidates.retain(|_, pc| pc.block_number > block_number);
328	}
329
330	/// Recover the candidate for the given `block_hash`.
331	async fn recover_candidate(&mut self, block_hash: Block::Hash) {
332		match self.candidates.get(&block_hash) {
333			Some(candidate) if candidate.waiting_recovery => {
334				tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
335				self.active_candidate_recovery.recover_candidate(block_hash, candidate).await;
336			},
337			_ => (),
338		}
339	}
340
341	/// Clear `waiting_for_parent` and `waiting_recovery` for the candidate with `hash`.
342	/// Also clears children blocks waiting for this parent.
343	fn reset_candidate(&mut self, hash: Block::Hash) {
344		let mut blocks_to_delete = vec![hash];
345
346		while let Some(delete) = blocks_to_delete.pop() {
347			if let Some(children) = self.waiting_for_parent.remove(&delete) {
348				blocks_to_delete.extend(children.iter().map(BlockT::hash));
349			}
350		}
351		self.clear_waiting_recovery(&hash);
352	}
353
354	/// Try to decode [`ParachainBlockData`] from `data`.
355	///
356	/// Internally it will handle the decoding of the different versions.
357	fn decode_parachain_block_data(
358		data: &[u8],
359		expected_block_hash: Block::Hash,
360	) -> Option<ParachainBlockData<Block>> {
361		match ParachainBlockData::<Block>::decode_all(&mut &data[..]) {
362			Ok(block_data) => {
363				if block_data.blocks().last().map_or(false, |b| b.hash() == expected_block_hash) {
364					return Some(block_data);
365				}
366
367				tracing::debug!(
368					target: LOG_TARGET,
369					?expected_block_hash,
370					"Could not find the expected block hash as latest block in `ParachainBlockData`"
371				);
372			},
373			Err(error) => {
374				tracing::debug!(
375					target: LOG_TARGET,
376					?expected_block_hash,
377					?error,
378					"Could not decode `ParachainBlockData` from recovered PoV",
379				);
380			},
381		}
382
383		None
384	}
385
386	/// Handle a recovered candidate.
387	async fn handle_candidate_recovered(&mut self, block_hash: Block::Hash, pov: Option<&PoV>) {
388		let pov = match pov {
389			Some(pov) => {
390				self.candidates_in_retry.remove(&block_hash);
391				pov
392			},
393			None => {
394				if self.candidates_in_retry.insert(block_hash) {
395					tracing::debug!(target: LOG_TARGET, ?block_hash, "Recovery failed, retrying.");
396					self.candidate_recovery_queue.push_recovery(block_hash);
397					return;
398				} else {
399					tracing::warn!(
400						target: LOG_TARGET,
401						?block_hash,
402						"Unable to recover block after retry.",
403					);
404					self.candidates_in_retry.remove(&block_hash);
405					self.reset_candidate(block_hash);
406					return;
407				}
408			},
409		};
410
411		let raw_block_data =
412			match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
413				Ok(r) => r,
414				Err(error) => {
415					tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
416
417					self.reset_candidate(block_hash);
418					return;
419				},
420			};
421
422		let Some(block_data) = Self::decode_parachain_block_data(&raw_block_data, block_hash)
423		else {
424			self.reset_candidate(block_hash);
425			return;
426		};
427
428		let blocks = block_data.into_blocks();
429
430		let Some(parent) = blocks.first().map(|b| *b.header().parent_hash()) else {
431			tracing::debug!(
432				target: LOG_TARGET,
433				?block_hash,
434				"Recovered candidate doesn't contain any blocks.",
435			);
436
437			self.reset_candidate(block_hash);
438			return;
439		};
440
441		match self.parachain_client.block_status(parent) {
442			Ok(BlockStatus::Unknown) => {
443				// If the parent block is currently being recovered or is scheduled to be recovered,
444				// we want to wait for the parent.
445				let parent_scheduled_for_recovery =
446					self.candidates.get(&parent).map_or(false, |parent| parent.waiting_recovery);
447				if parent_scheduled_for_recovery {
448					tracing::debug!(
449						target: LOG_TARGET,
450						?block_hash,
451						parent_hash = ?parent,
452						parent_scheduled_for_recovery,
453						waiting_blocks = self.waiting_for_parent.len(),
454						"Waiting for recovery of parent.",
455					);
456
457					blocks.into_iter().for_each(|b| {
458						self.waiting_for_parent
459							.entry(*b.header().parent_hash())
460							.or_default()
461							.push(b);
462					});
463					return;
464				} else {
465					tracing::debug!(
466						target: LOG_TARGET,
467						?block_hash,
468						parent_hash = ?parent,
469						"Parent not found while trying to import recovered block.",
470					);
471
472					self.reset_candidate(block_hash);
473					return;
474				}
475			},
476			Err(error) => {
477				tracing::debug!(
478					target: LOG_TARGET,
479					block_hash = ?parent,
480					?error,
481					"Error while checking block status",
482				);
483
484				self.reset_candidate(block_hash);
485				return;
486			},
487			// Any other status is fine to "ignore/accept"
488			_ => (),
489		}
490
491		self.import_blocks(blocks.into_iter());
492	}
493
494	/// Import the given `blocks`.
495	///
496	/// This will also recursively drain `waiting_for_parent` and import them as well.
497	fn import_blocks(&mut self, blocks: impl Iterator<Item = Block>) {
498		let mut blocks = VecDeque::from_iter(blocks);
499
500		tracing::debug!(
501			target: LOG_TARGET,
502			blocks = ?blocks.iter().map(|b| b.hash()),
503			"Importing blocks retrieved using pov_recovery",
504		);
505
506		let mut incoming_blocks = Vec::new();
507
508		while let Some(block) = blocks.pop_front() {
509			let block_hash = block.hash();
510			let (header, body) = block.deconstruct();
511
512			incoming_blocks.push(IncomingBlock {
513				hash: block_hash,
514				header: Some(header),
515				body: Some(body),
516				import_existing: false,
517				allow_missing_state: false,
518				justifications: None,
519				origin: None,
520				skip_execution: false,
521				state: None,
522				indexed_body: None,
523			});
524
525			if let Some(waiting) = self.waiting_for_parent.remove(&block_hash) {
526				blocks.extend(waiting);
527			}
528		}
529
530		self.parachain_import_queue
531			// Use `ConsensusBroadcast` to inform the import pipeline that this blocks needs to be
532			// imported.
533			.import_blocks(BlockOrigin::ConsensusBroadcast, incoming_blocks);
534	}
535
536	/// Attempts an explicit recovery of one or more blocks.
537	pub fn recover(&mut self, req: RecoveryRequest<Block>) {
538		let RecoveryRequest { mut hash, kind } = req;
539		let mut to_recover = Vec::new();
540
541		loop {
542			let candidate = match self.candidates.get_mut(&hash) {
543				Some(candidate) => candidate,
544				None => {
545					tracing::debug!(
546						target: LOG_TARGET,
547						block_hash = ?hash,
548						"Could not recover. Block was never announced as candidate"
549					);
550					return;
551				},
552			};
553
554			match self.parachain_client.block_status(hash) {
555				Ok(BlockStatus::Unknown) if !candidate.waiting_recovery => {
556					candidate.waiting_recovery = true;
557					to_recover.push(hash);
558				},
559				Ok(_) => break,
560				Err(e) => {
561					tracing::error!(
562						target: LOG_TARGET,
563						error = ?e,
564						block_hash = ?hash,
565						"Failed to get block status",
566					);
567					for hash in to_recover {
568						self.clear_waiting_recovery(&hash);
569					}
570					return;
571				},
572			}
573
574			if kind == RecoveryKind::Simple {
575				break;
576			}
577
578			hash = candidate.parent_hash;
579		}
580
581		for hash in to_recover.into_iter().rev() {
582			self.candidate_recovery_queue.push_recovery(hash);
583		}
584	}
585
586	/// Run the pov-recovery.
587	pub async fn run(mut self) {
588		let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
589		let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
590		let pending_candidates = match pending_candidates(
591			self.relay_chain_interface.clone(),
592			self.para_id,
593			self.parachain_sync_service.clone(),
594		)
595		.await
596		{
597			Ok(pending_candidates_stream) => pending_candidates_stream.fuse(),
598			Err(err) => {
599				tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
600				return;
601			},
602		};
603
604		futures::pin_mut!(pending_candidates);
605		loop {
606			select! {
607				next_pending_candidates = pending_candidates.next() => {
608					if let Some((candidates, session_index, _)) = next_pending_candidates {
609						for candidate in candidates {
610							self.handle_pending_candidate(candidate, session_index);
611						}
612					} else {
613						tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended");
614						return;
615					}
616				},
617				recovery_req = self.recovery_chan_rx.next() => {
618					if let Some(req) = recovery_req {
619						self.recover(req);
620					} else {
621						tracing::debug!(target: LOG_TARGET, "Recovery channel stream ended");
622						return;
623					}
624				},
625				imported = imported_blocks.next() => {
626					if let Some(imported) = imported {
627						self.clear_waiting_recovery(&imported.hash);
628
629						// We need to double check that no blocks are waiting for this block.
630						// Can happen when a waiting child block is queued to wait for parent while the parent block is still
631						// in the import queue.
632						if let Some(waiting_blocks) = self.waiting_for_parent.remove(&imported.hash) {
633							for block in waiting_blocks {
634								tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), resolved_parent = ?imported.hash, "Found new waiting child block during import, queuing.");
635								self.import_blocks(std::iter::once(block));
636							}
637						};
638
639					} else {
640						tracing::debug!(target: LOG_TARGET,	"Imported blocks stream ended");
641						return;
642					}
643				},
644				finalized = finalized_blocks.next() => {
645					if let Some(finalized) = finalized {
646						self.handle_block_finalized(*finalized.header.number());
647					} else {
648						tracing::debug!(target: LOG_TARGET,	"Finalized blocks stream ended");
649						return;
650					}
651				},
652				next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => {
653						self.recover_candidate(next_to_recover).await;
654				},
655				(block_hash, pov) =
656					self.active_candidate_recovery.wait_for_recovery().fuse() =>
657				{
658					self.handle_candidate_recovered(block_hash, pov.as_deref()).await;
659				},
660			}
661		}
662	}
663}