referrerpolicy=no-referrer-when-downgrade

cumulus_client_pov_recovery/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! Parachain PoV recovery
19//!
20//! A parachain needs to build PoVs that are send to the relay chain to progress. These PoVs are
21//! erasure encoded and one piece of it is stored by each relay chain validator. As the relay chain
22//! decides on which PoV per parachain to include and thus, to progress the parachain it can happen
23//! that the block corresponding to this PoV isn't propagated in the parachain network. This can
24//! have several reasons, either a malicious collator that managed to include its own PoV and
25//! doesn't want to share it with the rest of the network or maybe a collator went down before it
26//! could distribute the block in the network. When something like this happens we can use the PoV
27//! recovery algorithm implemented in this crate to recover a PoV and to propagate it with the rest
28//! of the network.
29//!
30//! It works in the following way:
31//!
32//! 1. For every included relay chain block we note the backed candidate of our parachain. If the
33//!    block belonging to the PoV is already known, we do nothing. Otherwise we start a timer that
34//!    waits for a randomized time inside a specified interval before starting to
35//! recover    the PoV.
36//!
37//! 2. If between starting and firing the timer the block is imported, we skip the recovery of the
38//!    PoV.
39//!
40//! 3. If the timer fired we recover the PoV using the relay chain PoV recovery protocol.
41//!
42//! 4a. After it is recovered, we restore the block and import it.
43//!
44//! 4b. Since we are trying to recover pending candidates, availability is not guaranteed. If the
45//! block     PoV is not yet available, we retry.
46//!
47//! If we need to recover multiple PoV blocks (which should hopefully not happen in real life), we
48//! make sure that the blocks are imported in the correct order.
49
50use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
51use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
52use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
53use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
54
55use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT};
56use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
57use polkadot_overseer::Handle as OverseerHandle;
58use polkadot_primitives::{
59	CandidateReceiptV2 as CandidateReceipt,
60	CommittedCandidateReceiptV2 as CommittedCandidateReceipt, Id as ParaId, SessionIndex,
61};
62
63use cumulus_primitives_core::ParachainBlockData;
64use cumulus_relay_chain_interface::RelayChainInterface;
65use cumulus_relay_chain_streams::pending_candidates;
66
67use codec::{Decode, DecodeAll};
68use futures::{
69	channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, StreamExt,
70};
71use futures_timer::Delay;
72use rand::{distributions::Uniform, prelude::Distribution, thread_rng};
73
74use std::{
75	collections::{HashMap, HashSet, VecDeque},
76	pin::Pin,
77	sync::Arc,
78	time::Duration,
79};
80
81#[cfg(test)]
82mod tests;
83
84mod active_candidate_recovery;
85use active_candidate_recovery::ActiveCandidateRecovery;
86
87const LOG_TARGET: &str = "cumulus-pov-recovery";
88
89/// Test-friendly wrapper trait for the overseer handle.
90/// Can be used to simulate failing recovery requests.
91#[async_trait::async_trait]
92pub trait RecoveryHandle: Send {
93	async fn send_recovery_msg(
94		&mut self,
95		message: AvailabilityRecoveryMessage,
96		origin: &'static str,
97	);
98}
99
100#[async_trait::async_trait]
101impl RecoveryHandle for OverseerHandle {
102	async fn send_recovery_msg(
103		&mut self,
104		message: AvailabilityRecoveryMessage,
105		origin: &'static str,
106	) {
107		self.send_msg(message, origin).await;
108	}
109}
110
111/// Type of recovery to trigger.
112#[derive(Debug, PartialEq)]
113pub enum RecoveryKind {
114	/// Single block recovery.
115	Simple,
116	/// Full ancestry recovery.
117	Full,
118}
119
120/// Structure used to trigger an explicit recovery request via `PoVRecovery`.
121pub struct RecoveryRequest<Block: BlockT> {
122	/// Hash of the last block to recover.
123	pub hash: Block::Hash,
124	/// Recovery type.
125	pub kind: RecoveryKind,
126}
127
128/// The delay between observing an unknown block and triggering the recovery of a block.
129/// Randomizing the start of the recovery within this interval
130/// can be used to prevent self-DOSing if the recovery request is part of a
131/// distributed protocol and there is the possibility that multiple actors are
132/// requiring to perform the recovery action at approximately the same time.
133#[derive(Clone, Copy)]
134pub struct RecoveryDelayRange {
135	/// Start recovering after `min` delay.
136	pub min: Duration,
137	/// Start recovering before `max` delay.
138	pub max: Duration,
139}
140
141impl RecoveryDelayRange {
142	/// Produce a randomized duration between `min` and `max`.
143	fn duration(&self) -> Duration {
144		Uniform::from(self.min..=self.max).sample(&mut thread_rng())
145	}
146}
147
148/// Represents an outstanding block candidate.
149struct Candidate<Block: BlockT> {
150	receipt: CandidateReceipt,
151	session_index: SessionIndex,
152	block_number: NumberFor<Block>,
153	parent_hash: Block::Hash,
154	// Lazy recovery has been submitted.
155	// Should be true iff a block is either queued to be recovered or
156	// recovery is currently in progress.
157	waiting_recovery: bool,
158}
159
160/// Queue that is used to decide when to start PoV-recovery operations.
161struct RecoveryQueue<Block: BlockT> {
162	recovery_delay_range: RecoveryDelayRange,
163	// Queue that keeps the hashes of blocks to be recovered.
164	recovery_queue: VecDeque<Block::Hash>,
165	// Futures that resolve when a new recovery should be started.
166	signaling_queue: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
167}
168
169impl<Block: BlockT> RecoveryQueue<Block> {
170	pub fn new(recovery_delay_range: RecoveryDelayRange) -> Self {
171		Self {
172			recovery_delay_range,
173			recovery_queue: Default::default(),
174			signaling_queue: Default::default(),
175		}
176	}
177
178	/// Add hash of a block that should go to the end of the recovery queue.
179	/// A new recovery will be signaled after `delay` has passed.
180	pub fn push_recovery(&mut self, hash: Block::Hash) {
181		let delay = self.recovery_delay_range.duration();
182		tracing::debug!(
183			target: LOG_TARGET,
184			block_hash = ?hash,
185			"Adding block to queue and adding new recovery slot in {:?} sec",
186			delay.as_secs(),
187		);
188		self.recovery_queue.push_back(hash);
189		self.signaling_queue.push(
190			async move {
191				Delay::new(delay).await;
192			}
193			.boxed(),
194		);
195	}
196
197	/// Get the next hash for block recovery.
198	pub async fn next_recovery(&mut self) -> Block::Hash {
199		loop {
200			if self.signaling_queue.next().await.is_some() {
201				if let Some(hash) = self.recovery_queue.pop_front() {
202					return hash
203				} else {
204					tracing::error!(
205						target: LOG_TARGET,
206						"Recovery was signaled, but no candidate hash available. This is a bug."
207					);
208				};
209			}
210			futures::pending!()
211		}
212	}
213}
214
215/// Encapsulates the logic of the pov recovery.
216pub struct PoVRecovery<Block: BlockT, PC, RC> {
217	/// All the pending candidates that we are waiting for to be imported or that need to be
218	/// recovered when `next_candidate_to_recover` tells us to do so.
219	candidates: HashMap<Block::Hash, Candidate<Block>>,
220	/// A stream of futures that resolve to hashes of candidates that need to be recovered.
221	///
222	/// The candidates to the hashes are stored in `candidates`. If a candidate is not
223	/// available anymore in this map, it means that it was already imported.
224	candidate_recovery_queue: RecoveryQueue<Block>,
225	active_candidate_recovery: ActiveCandidateRecovery<Block>,
226	/// Blocks that wait that the parent is imported.
227	///
228	/// Uses parent -> blocks mapping.
229	waiting_for_parent: HashMap<Block::Hash, Vec<Block>>,
230	parachain_client: Arc<PC>,
231	parachain_import_queue: Box<dyn ImportQueueService<Block>>,
232	relay_chain_interface: RC,
233	para_id: ParaId,
234	/// Explicit block recovery requests channel.
235	recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
236	/// Blocks that we are retrying currently
237	candidates_in_retry: HashSet<Block::Hash>,
238	parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
239}
240
241impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
242where
243	PC: BlockBackend<Block> + BlockchainEvents<Block> + UsageProvider<Block>,
244	RCInterface: RelayChainInterface + Clone,
245{
246	/// Create a new instance.
247	pub fn new(
248		recovery_handle: Box<dyn RecoveryHandle>,
249		recovery_delay_range: RecoveryDelayRange,
250		parachain_client: Arc<PC>,
251		parachain_import_queue: Box<dyn ImportQueueService<Block>>,
252		relay_chain_interface: RCInterface,
253		para_id: ParaId,
254		recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
255		parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
256	) -> Self {
257		Self {
258			candidates: HashMap::new(),
259			candidate_recovery_queue: RecoveryQueue::new(recovery_delay_range),
260			active_candidate_recovery: ActiveCandidateRecovery::new(recovery_handle),
261			waiting_for_parent: HashMap::new(),
262			parachain_client,
263			parachain_import_queue,
264			relay_chain_interface,
265			para_id,
266			candidates_in_retry: HashSet::new(),
267			recovery_chan_rx,
268			parachain_sync_service,
269		}
270	}
271
272	/// Handle a new pending candidate.
273	fn handle_pending_candidate(
274		&mut self,
275		receipt: CommittedCandidateReceipt,
276		session_index: SessionIndex,
277	) {
278		let header = match Block::Header::decode(&mut &receipt.commitments.head_data.0[..]) {
279			Ok(header) => header,
280			Err(e) => {
281				tracing::warn!(
282					target: LOG_TARGET,
283					error = ?e,
284					"Failed to decode parachain header from pending candidate",
285				);
286				return
287			},
288		};
289
290		if *header.number() <= self.parachain_client.usage_info().chain.finalized_number {
291			return
292		}
293
294		let hash = header.hash();
295
296		if self.candidates.contains_key(&hash) {
297			return
298		}
299
300		tracing::debug!(target: LOG_TARGET, block_hash = ?hash, "Adding outstanding candidate");
301		self.candidates.insert(
302			hash,
303			Candidate {
304				block_number: *header.number(),
305				receipt: receipt.to_plain(),
306				session_index,
307				parent_hash: *header.parent_hash(),
308				waiting_recovery: false,
309			},
310		);
311
312		// If required, triggers a lazy recovery request that will eventually be blocked
313		// if in the meantime the block is imported.
314		self.recover(RecoveryRequest { hash, kind: RecoveryKind::Simple });
315	}
316
317	/// Block is no longer waiting for recovery
318	fn clear_waiting_recovery(&mut self, block_hash: &Block::Hash) {
319		if let Some(candidate) = self.candidates.get_mut(block_hash) {
320			// Prevents triggering an already enqueued recovery request
321			candidate.waiting_recovery = false;
322		}
323	}
324
325	/// Handle a finalized block with the given `block_number`.
326	fn handle_block_finalized(&mut self, block_number: NumberFor<Block>) {
327		self.candidates.retain(|_, pc| pc.block_number > block_number);
328	}
329
330	/// Recover the candidate for the given `block_hash`.
331	async fn recover_candidate(&mut self, block_hash: Block::Hash) {
332		match self.candidates.get(&block_hash) {
333			Some(candidate) if candidate.waiting_recovery => {
334				tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
335				self.active_candidate_recovery.recover_candidate(block_hash, candidate).await;
336			},
337			_ => (),
338		}
339	}
340
341	/// Clear `waiting_for_parent` and `waiting_recovery` for the candidate with `hash`.
342	/// Also clears children blocks waiting for this parent.
343	fn reset_candidate(&mut self, hash: Block::Hash) {
344		let mut blocks_to_delete = vec![hash];
345
346		while let Some(delete) = blocks_to_delete.pop() {
347			if let Some(children) = self.waiting_for_parent.remove(&delete) {
348				blocks_to_delete.extend(children.iter().map(BlockT::hash));
349			}
350		}
351		self.clear_waiting_recovery(&hash);
352	}
353
354	/// Try to decode [`ParachainBlockData`] from `data`.
355	///
356	/// Internally it will handle the decoding of the different versions.
357	fn decode_parachain_block_data(
358		data: &[u8],
359		expected_block_hash: Block::Hash,
360	) -> Option<ParachainBlockData<Block>> {
361		match ParachainBlockData::<Block>::decode_all(&mut &data[..]) {
362			Ok(block_data) => {
363				if block_data.blocks().last().map_or(false, |b| b.hash() == expected_block_hash) {
364					return Some(block_data)
365				}
366
367				tracing::debug!(
368					target: LOG_TARGET,
369					?expected_block_hash,
370					"Could not find the expected block hash as latest block in `ParachainBlockData`"
371				);
372			},
373			Err(error) => {
374				tracing::debug!(
375					target: LOG_TARGET,
376					?expected_block_hash,
377					?error,
378					"Could not decode `ParachainBlockData` from recovered PoV",
379				);
380			},
381		}
382
383		None
384	}
385
386	/// Handle a recovered candidate.
387	async fn handle_candidate_recovered(&mut self, block_hash: Block::Hash, pov: Option<&PoV>) {
388		let pov = match pov {
389			Some(pov) => {
390				self.candidates_in_retry.remove(&block_hash);
391				pov
392			},
393			None =>
394				if self.candidates_in_retry.insert(block_hash) {
395					tracing::debug!(target: LOG_TARGET, ?block_hash, "Recovery failed, retrying.");
396					self.candidate_recovery_queue.push_recovery(block_hash);
397					return
398				} else {
399					tracing::warn!(
400						target: LOG_TARGET,
401						?block_hash,
402						"Unable to recover block after retry.",
403					);
404					self.candidates_in_retry.remove(&block_hash);
405					self.reset_candidate(block_hash);
406					return
407				},
408		};
409
410		let raw_block_data =
411			match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
412				Ok(r) => r,
413				Err(error) => {
414					tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
415
416					self.reset_candidate(block_hash);
417					return
418				},
419			};
420
421		let Some(block_data) = Self::decode_parachain_block_data(&raw_block_data, block_hash)
422		else {
423			self.reset_candidate(block_hash);
424			return
425		};
426
427		let blocks = block_data.into_blocks();
428
429		let Some(parent) = blocks.first().map(|b| *b.header().parent_hash()) else {
430			tracing::debug!(
431				target: LOG_TARGET,
432				?block_hash,
433				"Recovered candidate doesn't contain any blocks.",
434			);
435
436			self.reset_candidate(block_hash);
437			return;
438		};
439
440		match self.parachain_client.block_status(parent) {
441			Ok(BlockStatus::Unknown) => {
442				// If the parent block is currently being recovered or is scheduled to be recovered,
443				// we want to wait for the parent.
444				let parent_scheduled_for_recovery =
445					self.candidates.get(&parent).map_or(false, |parent| parent.waiting_recovery);
446				if parent_scheduled_for_recovery {
447					tracing::debug!(
448						target: LOG_TARGET,
449						?block_hash,
450						parent_hash = ?parent,
451						parent_scheduled_for_recovery,
452						waiting_blocks = self.waiting_for_parent.len(),
453						"Waiting for recovery of parent.",
454					);
455
456					blocks.into_iter().for_each(|b| {
457						self.waiting_for_parent
458							.entry(*b.header().parent_hash())
459							.or_default()
460							.push(b);
461					});
462					return
463				} else {
464					tracing::debug!(
465						target: LOG_TARGET,
466						?block_hash,
467						parent_hash = ?parent,
468						"Parent not found while trying to import recovered block.",
469					);
470
471					self.reset_candidate(block_hash);
472					return
473				}
474			},
475			Err(error) => {
476				tracing::debug!(
477					target: LOG_TARGET,
478					block_hash = ?parent,
479					?error,
480					"Error while checking block status",
481				);
482
483				self.reset_candidate(block_hash);
484				return
485			},
486			// Any other status is fine to "ignore/accept"
487			_ => (),
488		}
489
490		self.import_blocks(blocks.into_iter());
491	}
492
493	/// Import the given `blocks`.
494	///
495	/// This will also recursively drain `waiting_for_parent` and import them as well.
496	fn import_blocks(&mut self, blocks: impl Iterator<Item = Block>) {
497		let mut blocks = VecDeque::from_iter(blocks);
498
499		tracing::debug!(
500			target: LOG_TARGET,
501			blocks = ?blocks.iter().map(|b| b.hash()),
502			"Importing blocks retrieved using pov_recovery",
503		);
504
505		let mut incoming_blocks = Vec::new();
506
507		while let Some(block) = blocks.pop_front() {
508			let block_hash = block.hash();
509			let (header, body) = block.deconstruct();
510
511			incoming_blocks.push(IncomingBlock {
512				hash: block_hash,
513				header: Some(header),
514				body: Some(body),
515				import_existing: false,
516				allow_missing_state: false,
517				justifications: None,
518				origin: None,
519				skip_execution: false,
520				state: None,
521				indexed_body: None,
522			});
523
524			if let Some(waiting) = self.waiting_for_parent.remove(&block_hash) {
525				blocks.extend(waiting);
526			}
527		}
528
529		self.parachain_import_queue
530			// Use `ConsensusBroadcast` to inform the import pipeline that this blocks needs to be
531			// imported.
532			.import_blocks(BlockOrigin::ConsensusBroadcast, incoming_blocks);
533	}
534
535	/// Attempts an explicit recovery of one or more blocks.
536	pub fn recover(&mut self, req: RecoveryRequest<Block>) {
537		let RecoveryRequest { mut hash, kind } = req;
538		let mut to_recover = Vec::new();
539
540		loop {
541			let candidate = match self.candidates.get_mut(&hash) {
542				Some(candidate) => candidate,
543				None => {
544					tracing::debug!(
545						target: LOG_TARGET,
546						block_hash = ?hash,
547						"Could not recover. Block was never announced as candidate"
548					);
549					return
550				},
551			};
552
553			match self.parachain_client.block_status(hash) {
554				Ok(BlockStatus::Unknown) if !candidate.waiting_recovery => {
555					candidate.waiting_recovery = true;
556					to_recover.push(hash);
557				},
558				Ok(_) => break,
559				Err(e) => {
560					tracing::error!(
561						target: LOG_TARGET,
562						error = ?e,
563						block_hash = ?hash,
564						"Failed to get block status",
565					);
566					for hash in to_recover {
567						self.clear_waiting_recovery(&hash);
568					}
569					return
570				},
571			}
572
573			if kind == RecoveryKind::Simple {
574				break
575			}
576
577			hash = candidate.parent_hash;
578		}
579
580		for hash in to_recover.into_iter().rev() {
581			self.candidate_recovery_queue.push_recovery(hash);
582		}
583	}
584
585	/// Run the pov-recovery.
586	pub async fn run(mut self) {
587		let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
588		let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
589		let pending_candidates = match pending_candidates(
590			self.relay_chain_interface.clone(),
591			self.para_id,
592			self.parachain_sync_service.clone(),
593		)
594		.await
595		{
596			Ok(pending_candidates_stream) => pending_candidates_stream.fuse(),
597			Err(err) => {
598				tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
599				return
600			},
601		};
602
603		futures::pin_mut!(pending_candidates);
604		loop {
605			select! {
606				next_pending_candidates = pending_candidates.next() => {
607					if let Some((candidates, session_index, _)) = next_pending_candidates {
608						for candidate in candidates {
609							self.handle_pending_candidate(candidate, session_index);
610						}
611					} else {
612						tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended");
613						return;
614					}
615				},
616				recovery_req = self.recovery_chan_rx.next() => {
617					if let Some(req) = recovery_req {
618						self.recover(req);
619					} else {
620						tracing::debug!(target: LOG_TARGET, "Recovery channel stream ended");
621						return;
622					}
623				},
624				imported = imported_blocks.next() => {
625					if let Some(imported) = imported {
626						self.clear_waiting_recovery(&imported.hash);
627
628						// We need to double check that no blocks are waiting for this block.
629						// Can happen when a waiting child block is queued to wait for parent while the parent block is still
630						// in the import queue.
631						if let Some(waiting_blocks) = self.waiting_for_parent.remove(&imported.hash) {
632							for block in waiting_blocks {
633								tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), resolved_parent = ?imported.hash, "Found new waiting child block during import, queuing.");
634								self.import_blocks(std::iter::once(block));
635							}
636						};
637
638					} else {
639						tracing::debug!(target: LOG_TARGET,	"Imported blocks stream ended");
640						return;
641					}
642				},
643				finalized = finalized_blocks.next() => {
644					if let Some(finalized) = finalized {
645						self.handle_block_finalized(*finalized.header.number());
646					} else {
647						tracing::debug!(target: LOG_TARGET,	"Finalized blocks stream ended");
648						return;
649					}
650				},
651				next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => {
652						self.recover_candidate(next_to_recover).await;
653				},
654				(block_hash, pov) =
655					self.active_candidate_recovery.wait_for_recovery().fuse() =>
656				{
657					self.handle_candidate_recovered(block_hash, pov.as_deref()).await;
658				},
659			}
660		}
661	}
662}