referrerpolicy=no-referrer-when-downgrade

cumulus_client_pov_recovery/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! Parachain PoV recovery
19//!
20//! A parachain needs to build PoVs that are send to the relay chain to progress. These PoVs are
21//! erasure encoded and one piece of it is stored by each relay chain validator. As the relay chain
22//! decides on which PoV per parachain to include and thus, to progress the parachain it can happen
23//! that the block corresponding to this PoV isn't propagated in the parachain network. This can
24//! have several reasons, either a malicious collator that managed to include its own PoV and
25//! doesn't want to share it with the rest of the network or maybe a collator went down before it
26//! could distribute the block in the network. When something like this happens we can use the PoV
27//! recovery algorithm implemented in this crate to recover a PoV and to propagate it with the rest
28//! of the network.
29//!
30//! It works in the following way:
31//!
32//! 1. For every included relay chain block we note the backed candidate of our parachain. If the
33//!    block belonging to the PoV is already known, we do nothing. Otherwise we start a timer that
34//!    waits for a randomized time inside a specified interval before starting to
35//! recover    the PoV.
36//!
37//! 2. If between starting and firing the timer the block is imported, we skip the recovery of the
38//!    PoV.
39//!
40//! 3. If the timer fired we recover the PoV using the relay chain PoV recovery protocol.
41//!
42//! 4a. After it is recovered, we restore the block and import it.
43//!
44//! 4b. Since we are trying to recover pending candidates, availability is not guaranteed. If the
45//! block     PoV is not yet available, we retry.
46//!
47//! If we need to recover multiple PoV blocks (which should hopefully not happen in real life), we
48//! make sure that the blocks are imported in the correct order.
49
50use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
51use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
52use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
53use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
54
55use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT};
56use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
57use polkadot_overseer::Handle as OverseerHandle;
58use polkadot_primitives::{
59	vstaging::{
60		CandidateReceiptV2 as CandidateReceipt,
61		CommittedCandidateReceiptV2 as CommittedCandidateReceipt,
62	},
63	Id as ParaId, SessionIndex,
64};
65
66use cumulus_primitives_core::ParachainBlockData;
67use cumulus_relay_chain_interface::RelayChainInterface;
68use cumulus_relay_chain_streams::pending_candidates;
69
70use codec::{Decode, DecodeAll};
71use futures::{
72	channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, StreamExt,
73};
74use futures_timer::Delay;
75use rand::{distributions::Uniform, prelude::Distribution, thread_rng};
76
77use std::{
78	collections::{HashMap, HashSet, VecDeque},
79	pin::Pin,
80	sync::Arc,
81	time::Duration,
82};
83
84#[cfg(test)]
85mod tests;
86
87mod active_candidate_recovery;
88use active_candidate_recovery::ActiveCandidateRecovery;
89
90const LOG_TARGET: &str = "cumulus-pov-recovery";
91
92/// Test-friendly wrapper trait for the overseer handle.
93/// Can be used to simulate failing recovery requests.
94#[async_trait::async_trait]
95pub trait RecoveryHandle: Send {
96	async fn send_recovery_msg(
97		&mut self,
98		message: AvailabilityRecoveryMessage,
99		origin: &'static str,
100	);
101}
102
103#[async_trait::async_trait]
104impl RecoveryHandle for OverseerHandle {
105	async fn send_recovery_msg(
106		&mut self,
107		message: AvailabilityRecoveryMessage,
108		origin: &'static str,
109	) {
110		self.send_msg(message, origin).await;
111	}
112}
113
114/// Type of recovery to trigger.
115#[derive(Debug, PartialEq)]
116pub enum RecoveryKind {
117	/// Single block recovery.
118	Simple,
119	/// Full ancestry recovery.
120	Full,
121}
122
123/// Structure used to trigger an explicit recovery request via `PoVRecovery`.
124pub struct RecoveryRequest<Block: BlockT> {
125	/// Hash of the last block to recover.
126	pub hash: Block::Hash,
127	/// Recovery type.
128	pub kind: RecoveryKind,
129}
130
131/// The delay between observing an unknown block and triggering the recovery of a block.
132/// Randomizing the start of the recovery within this interval
133/// can be used to prevent self-DOSing if the recovery request is part of a
134/// distributed protocol and there is the possibility that multiple actors are
135/// requiring to perform the recovery action at approximately the same time.
136#[derive(Clone, Copy)]
137pub struct RecoveryDelayRange {
138	/// Start recovering after `min` delay.
139	pub min: Duration,
140	/// Start recovering before `max` delay.
141	pub max: Duration,
142}
143
144impl RecoveryDelayRange {
145	/// Produce a randomized duration between `min` and `max`.
146	fn duration(&self) -> Duration {
147		Uniform::from(self.min..=self.max).sample(&mut thread_rng())
148	}
149}
150
151/// Represents an outstanding block candidate.
152struct Candidate<Block: BlockT> {
153	receipt: CandidateReceipt,
154	session_index: SessionIndex,
155	block_number: NumberFor<Block>,
156	parent_hash: Block::Hash,
157	// Lazy recovery has been submitted.
158	// Should be true iff a block is either queued to be recovered or
159	// recovery is currently in progress.
160	waiting_recovery: bool,
161}
162
163/// Queue that is used to decide when to start PoV-recovery operations.
164struct RecoveryQueue<Block: BlockT> {
165	recovery_delay_range: RecoveryDelayRange,
166	// Queue that keeps the hashes of blocks to be recovered.
167	recovery_queue: VecDeque<Block::Hash>,
168	// Futures that resolve when a new recovery should be started.
169	signaling_queue: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
170}
171
172impl<Block: BlockT> RecoveryQueue<Block> {
173	pub fn new(recovery_delay_range: RecoveryDelayRange) -> Self {
174		Self {
175			recovery_delay_range,
176			recovery_queue: Default::default(),
177			signaling_queue: Default::default(),
178		}
179	}
180
181	/// Add hash of a block that should go to the end of the recovery queue.
182	/// A new recovery will be signaled after `delay` has passed.
183	pub fn push_recovery(&mut self, hash: Block::Hash) {
184		let delay = self.recovery_delay_range.duration();
185		tracing::debug!(
186			target: LOG_TARGET,
187			block_hash = ?hash,
188			"Adding block to queue and adding new recovery slot in {:?} sec",
189			delay.as_secs(),
190		);
191		self.recovery_queue.push_back(hash);
192		self.signaling_queue.push(
193			async move {
194				Delay::new(delay).await;
195			}
196			.boxed(),
197		);
198	}
199
200	/// Get the next hash for block recovery.
201	pub async fn next_recovery(&mut self) -> Block::Hash {
202		loop {
203			if self.signaling_queue.next().await.is_some() {
204				if let Some(hash) = self.recovery_queue.pop_front() {
205					return hash
206				} else {
207					tracing::error!(
208						target: LOG_TARGET,
209						"Recovery was signaled, but no candidate hash available. This is a bug."
210					);
211				};
212			}
213			futures::pending!()
214		}
215	}
216}
217
218/// Encapsulates the logic of the pov recovery.
219pub struct PoVRecovery<Block: BlockT, PC, RC> {
220	/// All the pending candidates that we are waiting for to be imported or that need to be
221	/// recovered when `next_candidate_to_recover` tells us to do so.
222	candidates: HashMap<Block::Hash, Candidate<Block>>,
223	/// A stream of futures that resolve to hashes of candidates that need to be recovered.
224	///
225	/// The candidates to the hashes are stored in `candidates`. If a candidate is not
226	/// available anymore in this map, it means that it was already imported.
227	candidate_recovery_queue: RecoveryQueue<Block>,
228	active_candidate_recovery: ActiveCandidateRecovery<Block>,
229	/// Blocks that wait that the parent is imported.
230	///
231	/// Uses parent -> blocks mapping.
232	waiting_for_parent: HashMap<Block::Hash, Vec<Block>>,
233	parachain_client: Arc<PC>,
234	parachain_import_queue: Box<dyn ImportQueueService<Block>>,
235	relay_chain_interface: RC,
236	para_id: ParaId,
237	/// Explicit block recovery requests channel.
238	recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
239	/// Blocks that we are retrying currently
240	candidates_in_retry: HashSet<Block::Hash>,
241	parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
242}
243
244impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
245where
246	PC: BlockBackend<Block> + BlockchainEvents<Block> + UsageProvider<Block>,
247	RCInterface: RelayChainInterface + Clone,
248{
249	/// Create a new instance.
250	pub fn new(
251		recovery_handle: Box<dyn RecoveryHandle>,
252		recovery_delay_range: RecoveryDelayRange,
253		parachain_client: Arc<PC>,
254		parachain_import_queue: Box<dyn ImportQueueService<Block>>,
255		relay_chain_interface: RCInterface,
256		para_id: ParaId,
257		recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
258		parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
259	) -> Self {
260		Self {
261			candidates: HashMap::new(),
262			candidate_recovery_queue: RecoveryQueue::new(recovery_delay_range),
263			active_candidate_recovery: ActiveCandidateRecovery::new(recovery_handle),
264			waiting_for_parent: HashMap::new(),
265			parachain_client,
266			parachain_import_queue,
267			relay_chain_interface,
268			para_id,
269			candidates_in_retry: HashSet::new(),
270			recovery_chan_rx,
271			parachain_sync_service,
272		}
273	}
274
275	/// Handle a new pending candidate.
276	fn handle_pending_candidate(
277		&mut self,
278		receipt: CommittedCandidateReceipt,
279		session_index: SessionIndex,
280	) {
281		let header = match Block::Header::decode(&mut &receipt.commitments.head_data.0[..]) {
282			Ok(header) => header,
283			Err(e) => {
284				tracing::warn!(
285					target: LOG_TARGET,
286					error = ?e,
287					"Failed to decode parachain header from pending candidate",
288				);
289				return
290			},
291		};
292
293		if *header.number() <= self.parachain_client.usage_info().chain.finalized_number {
294			return
295		}
296
297		let hash = header.hash();
298
299		if self.candidates.contains_key(&hash) {
300			return
301		}
302
303		tracing::debug!(target: LOG_TARGET, block_hash = ?hash, "Adding outstanding candidate");
304		self.candidates.insert(
305			hash,
306			Candidate {
307				block_number: *header.number(),
308				receipt: receipt.to_plain(),
309				session_index,
310				parent_hash: *header.parent_hash(),
311				waiting_recovery: false,
312			},
313		);
314
315		// If required, triggers a lazy recovery request that will eventually be blocked
316		// if in the meantime the block is imported.
317		self.recover(RecoveryRequest { hash, kind: RecoveryKind::Simple });
318	}
319
320	/// Block is no longer waiting for recovery
321	fn clear_waiting_recovery(&mut self, block_hash: &Block::Hash) {
322		if let Some(candidate) = self.candidates.get_mut(block_hash) {
323			// Prevents triggering an already enqueued recovery request
324			candidate.waiting_recovery = false;
325		}
326	}
327
328	/// Handle a finalized block with the given `block_number`.
329	fn handle_block_finalized(&mut self, block_number: NumberFor<Block>) {
330		self.candidates.retain(|_, pc| pc.block_number > block_number);
331	}
332
333	/// Recover the candidate for the given `block_hash`.
334	async fn recover_candidate(&mut self, block_hash: Block::Hash) {
335		match self.candidates.get(&block_hash) {
336			Some(candidate) if candidate.waiting_recovery => {
337				tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
338				self.active_candidate_recovery.recover_candidate(block_hash, candidate).await;
339			},
340			_ => (),
341		}
342	}
343
344	/// Clear `waiting_for_parent` and `waiting_recovery` for the candidate with `hash`.
345	/// Also clears children blocks waiting for this parent.
346	fn reset_candidate(&mut self, hash: Block::Hash) {
347		let mut blocks_to_delete = vec![hash];
348
349		while let Some(delete) = blocks_to_delete.pop() {
350			if let Some(children) = self.waiting_for_parent.remove(&delete) {
351				blocks_to_delete.extend(children.iter().map(BlockT::hash));
352			}
353		}
354		self.clear_waiting_recovery(&hash);
355	}
356
357	/// Try to decode [`ParachainBlockData`] from `data`.
358	///
359	/// Internally it will handle the decoding of the different versions.
360	fn decode_parachain_block_data(
361		data: &[u8],
362		expected_block_hash: Block::Hash,
363	) -> Option<ParachainBlockData<Block>> {
364		match ParachainBlockData::<Block>::decode_all(&mut &data[..]) {
365			Ok(block_data) => {
366				if block_data.blocks().last().map_or(false, |b| b.hash() == expected_block_hash) {
367					return Some(block_data)
368				}
369
370				tracing::debug!(
371					target: LOG_TARGET,
372					?expected_block_hash,
373					"Could not find the expected block hash as latest block in `ParachainBlockData`"
374				);
375			},
376			Err(error) => {
377				tracing::debug!(
378					target: LOG_TARGET,
379					?expected_block_hash,
380					?error,
381					"Could not decode `ParachainBlockData` from recovered PoV",
382				);
383			},
384		}
385
386		None
387	}
388
389	/// Handle a recovered candidate.
390	async fn handle_candidate_recovered(&mut self, block_hash: Block::Hash, pov: Option<&PoV>) {
391		let pov = match pov {
392			Some(pov) => {
393				self.candidates_in_retry.remove(&block_hash);
394				pov
395			},
396			None =>
397				if self.candidates_in_retry.insert(block_hash) {
398					tracing::debug!(target: LOG_TARGET, ?block_hash, "Recovery failed, retrying.");
399					self.candidate_recovery_queue.push_recovery(block_hash);
400					return
401				} else {
402					tracing::warn!(
403						target: LOG_TARGET,
404						?block_hash,
405						"Unable to recover block after retry.",
406					);
407					self.candidates_in_retry.remove(&block_hash);
408					self.reset_candidate(block_hash);
409					return
410				},
411		};
412
413		let raw_block_data =
414			match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
415				Ok(r) => r,
416				Err(error) => {
417					tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
418
419					self.reset_candidate(block_hash);
420					return
421				},
422			};
423
424		let Some(block_data) = Self::decode_parachain_block_data(&raw_block_data, block_hash)
425		else {
426			self.reset_candidate(block_hash);
427			return
428		};
429
430		let blocks = block_data.into_blocks();
431
432		let Some(parent) = blocks.first().map(|b| *b.header().parent_hash()) else {
433			tracing::debug!(
434				target: LOG_TARGET,
435				?block_hash,
436				"Recovered candidate doesn't contain any blocks.",
437			);
438
439			self.reset_candidate(block_hash);
440			return;
441		};
442
443		match self.parachain_client.block_status(parent) {
444			Ok(BlockStatus::Unknown) => {
445				// If the parent block is currently being recovered or is scheduled to be recovered,
446				// we want to wait for the parent.
447				let parent_scheduled_for_recovery =
448					self.candidates.get(&parent).map_or(false, |parent| parent.waiting_recovery);
449				if parent_scheduled_for_recovery {
450					tracing::debug!(
451						target: LOG_TARGET,
452						?block_hash,
453						parent_hash = ?parent,
454						parent_scheduled_for_recovery,
455						waiting_blocks = self.waiting_for_parent.len(),
456						"Waiting for recovery of parent.",
457					);
458
459					blocks.into_iter().for_each(|b| {
460						self.waiting_for_parent
461							.entry(*b.header().parent_hash())
462							.or_default()
463							.push(b);
464					});
465					return
466				} else {
467					tracing::debug!(
468						target: LOG_TARGET,
469						?block_hash,
470						parent_hash = ?parent,
471						"Parent not found while trying to import recovered block.",
472					);
473
474					self.reset_candidate(block_hash);
475					return
476				}
477			},
478			Err(error) => {
479				tracing::debug!(
480					target: LOG_TARGET,
481					block_hash = ?parent,
482					?error,
483					"Error while checking block status",
484				);
485
486				self.reset_candidate(block_hash);
487				return
488			},
489			// Any other status is fine to "ignore/accept"
490			_ => (),
491		}
492
493		self.import_blocks(blocks.into_iter());
494	}
495
496	/// Import the given `blocks`.
497	///
498	/// This will also recursively drain `waiting_for_parent` and import them as well.
499	fn import_blocks(&mut self, blocks: impl Iterator<Item = Block>) {
500		let mut blocks = VecDeque::from_iter(blocks);
501
502		tracing::debug!(
503			target: LOG_TARGET,
504			blocks = ?blocks.iter().map(|b| b.hash()),
505			"Importing blocks retrieved using pov_recovery",
506		);
507
508		let mut incoming_blocks = Vec::new();
509
510		while let Some(block) = blocks.pop_front() {
511			let block_hash = block.hash();
512			let (header, body) = block.deconstruct();
513
514			incoming_blocks.push(IncomingBlock {
515				hash: block_hash,
516				header: Some(header),
517				body: Some(body),
518				import_existing: false,
519				allow_missing_state: false,
520				justifications: None,
521				origin: None,
522				skip_execution: false,
523				state: None,
524				indexed_body: None,
525			});
526
527			if let Some(waiting) = self.waiting_for_parent.remove(&block_hash) {
528				blocks.extend(waiting);
529			}
530		}
531
532		self.parachain_import_queue
533			// Use `ConsensusBroadcast` to inform the import pipeline that this blocks needs to be
534			// imported.
535			.import_blocks(BlockOrigin::ConsensusBroadcast, incoming_blocks);
536	}
537
538	/// Attempts an explicit recovery of one or more blocks.
539	pub fn recover(&mut self, req: RecoveryRequest<Block>) {
540		let RecoveryRequest { mut hash, kind } = req;
541		let mut to_recover = Vec::new();
542
543		loop {
544			let candidate = match self.candidates.get_mut(&hash) {
545				Some(candidate) => candidate,
546				None => {
547					tracing::debug!(
548						target: LOG_TARGET,
549						block_hash = ?hash,
550						"Could not recover. Block was never announced as candidate"
551					);
552					return
553				},
554			};
555
556			match self.parachain_client.block_status(hash) {
557				Ok(BlockStatus::Unknown) if !candidate.waiting_recovery => {
558					candidate.waiting_recovery = true;
559					to_recover.push(hash);
560				},
561				Ok(_) => break,
562				Err(e) => {
563					tracing::error!(
564						target: LOG_TARGET,
565						error = ?e,
566						block_hash = ?hash,
567						"Failed to get block status",
568					);
569					for hash in to_recover {
570						self.clear_waiting_recovery(&hash);
571					}
572					return
573				},
574			}
575
576			if kind == RecoveryKind::Simple {
577				break
578			}
579
580			hash = candidate.parent_hash;
581		}
582
583		for hash in to_recover.into_iter().rev() {
584			self.candidate_recovery_queue.push_recovery(hash);
585		}
586	}
587
588	/// Run the pov-recovery.
589	pub async fn run(mut self) {
590		let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
591		let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
592		let pending_candidates = match pending_candidates(
593			self.relay_chain_interface.clone(),
594			self.para_id,
595			self.parachain_sync_service.clone(),
596		)
597		.await
598		{
599			Ok(pending_candidates_stream) => pending_candidates_stream.fuse(),
600			Err(err) => {
601				tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
602				return
603			},
604		};
605
606		futures::pin_mut!(pending_candidates);
607		loop {
608			select! {
609				next_pending_candidates = pending_candidates.next() => {
610					if let Some((candidates, session_index, _)) = next_pending_candidates {
611						for candidate in candidates {
612							self.handle_pending_candidate(candidate, session_index);
613						}
614					} else {
615						tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended");
616						return;
617					}
618				},
619				recovery_req = self.recovery_chan_rx.next() => {
620					if let Some(req) = recovery_req {
621						self.recover(req);
622					} else {
623						tracing::debug!(target: LOG_TARGET, "Recovery channel stream ended");
624						return;
625					}
626				},
627				imported = imported_blocks.next() => {
628					if let Some(imported) = imported {
629						self.clear_waiting_recovery(&imported.hash);
630
631						// We need to double check that no blocks are waiting for this block.
632						// Can happen when a waiting child block is queued to wait for parent while the parent block is still
633						// in the import queue.
634						if let Some(waiting_blocks) = self.waiting_for_parent.remove(&imported.hash) {
635							for block in waiting_blocks {
636								tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), resolved_parent = ?imported.hash, "Found new waiting child block during import, queuing.");
637								self.import_blocks(std::iter::once(block));
638							}
639						};
640
641					} else {
642						tracing::debug!(target: LOG_TARGET,	"Imported blocks stream ended");
643						return;
644					}
645				},
646				finalized = finalized_blocks.next() => {
647					if let Some(finalized) = finalized {
648						self.handle_block_finalized(*finalized.header.number());
649					} else {
650						tracing::debug!(target: LOG_TARGET,	"Finalized blocks stream ended");
651						return;
652					}
653				},
654				next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => {
655						self.recover_candidate(next_to_recover).await;
656				},
657				(block_hash, pov) =
658					self.active_candidate_recovery.wait_for_recovery().fuse() =>
659				{
660					self.handle_candidate_recovered(block_hash, pov.as_deref()).await;
661				},
662			}
663		}
664	}
665}