1use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
51use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
52use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
53use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
54
55use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT};
56use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
57use polkadot_overseer::Handle as OverseerHandle;
58use polkadot_primitives::{
59 vstaging::{
60 CandidateReceiptV2 as CandidateReceipt,
61 CommittedCandidateReceiptV2 as CommittedCandidateReceipt,
62 },
63 Id as ParaId, SessionIndex,
64};
65
66use cumulus_primitives_core::ParachainBlockData;
67use cumulus_relay_chain_interface::RelayChainInterface;
68use cumulus_relay_chain_streams::pending_candidates;
69
70use codec::{Decode, DecodeAll};
71use futures::{
72 channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, StreamExt,
73};
74use futures_timer::Delay;
75use rand::{distributions::Uniform, prelude::Distribution, thread_rng};
76
77use std::{
78 collections::{HashMap, HashSet, VecDeque},
79 pin::Pin,
80 sync::Arc,
81 time::Duration,
82};
83
84#[cfg(test)]
85mod tests;
86
87mod active_candidate_recovery;
88use active_candidate_recovery::ActiveCandidateRecovery;
89
90const LOG_TARGET: &str = "cumulus-pov-recovery";
91
92#[async_trait::async_trait]
95pub trait RecoveryHandle: Send {
96 async fn send_recovery_msg(
97 &mut self,
98 message: AvailabilityRecoveryMessage,
99 origin: &'static str,
100 );
101}
102
103#[async_trait::async_trait]
104impl RecoveryHandle for OverseerHandle {
105 async fn send_recovery_msg(
106 &mut self,
107 message: AvailabilityRecoveryMessage,
108 origin: &'static str,
109 ) {
110 self.send_msg(message, origin).await;
111 }
112}
113
114#[derive(Debug, PartialEq)]
116pub enum RecoveryKind {
117 Simple,
119 Full,
121}
122
123pub struct RecoveryRequest<Block: BlockT> {
125 pub hash: Block::Hash,
127 pub kind: RecoveryKind,
129}
130
131#[derive(Clone, Copy)]
137pub struct RecoveryDelayRange {
138 pub min: Duration,
140 pub max: Duration,
142}
143
144impl RecoveryDelayRange {
145 fn duration(&self) -> Duration {
147 Uniform::from(self.min..=self.max).sample(&mut thread_rng())
148 }
149}
150
151struct Candidate<Block: BlockT> {
153 receipt: CandidateReceipt,
154 session_index: SessionIndex,
155 block_number: NumberFor<Block>,
156 parent_hash: Block::Hash,
157 waiting_recovery: bool,
161}
162
163struct RecoveryQueue<Block: BlockT> {
165 recovery_delay_range: RecoveryDelayRange,
166 recovery_queue: VecDeque<Block::Hash>,
168 signaling_queue: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
170}
171
172impl<Block: BlockT> RecoveryQueue<Block> {
173 pub fn new(recovery_delay_range: RecoveryDelayRange) -> Self {
174 Self {
175 recovery_delay_range,
176 recovery_queue: Default::default(),
177 signaling_queue: Default::default(),
178 }
179 }
180
181 pub fn push_recovery(&mut self, hash: Block::Hash) {
184 let delay = self.recovery_delay_range.duration();
185 tracing::debug!(
186 target: LOG_TARGET,
187 block_hash = ?hash,
188 "Adding block to queue and adding new recovery slot in {:?} sec",
189 delay.as_secs(),
190 );
191 self.recovery_queue.push_back(hash);
192 self.signaling_queue.push(
193 async move {
194 Delay::new(delay).await;
195 }
196 .boxed(),
197 );
198 }
199
200 pub async fn next_recovery(&mut self) -> Block::Hash {
202 loop {
203 if self.signaling_queue.next().await.is_some() {
204 if let Some(hash) = self.recovery_queue.pop_front() {
205 return hash
206 } else {
207 tracing::error!(
208 target: LOG_TARGET,
209 "Recovery was signaled, but no candidate hash available. This is a bug."
210 );
211 };
212 }
213 futures::pending!()
214 }
215 }
216}
217
218pub struct PoVRecovery<Block: BlockT, PC, RC> {
220 candidates: HashMap<Block::Hash, Candidate<Block>>,
223 candidate_recovery_queue: RecoveryQueue<Block>,
228 active_candidate_recovery: ActiveCandidateRecovery<Block>,
229 waiting_for_parent: HashMap<Block::Hash, Vec<Block>>,
233 parachain_client: Arc<PC>,
234 parachain_import_queue: Box<dyn ImportQueueService<Block>>,
235 relay_chain_interface: RC,
236 para_id: ParaId,
237 recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
239 candidates_in_retry: HashSet<Block::Hash>,
241 parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
242}
243
244impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
245where
246 PC: BlockBackend<Block> + BlockchainEvents<Block> + UsageProvider<Block>,
247 RCInterface: RelayChainInterface + Clone,
248{
249 pub fn new(
251 recovery_handle: Box<dyn RecoveryHandle>,
252 recovery_delay_range: RecoveryDelayRange,
253 parachain_client: Arc<PC>,
254 parachain_import_queue: Box<dyn ImportQueueService<Block>>,
255 relay_chain_interface: RCInterface,
256 para_id: ParaId,
257 recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
258 parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
259 ) -> Self {
260 Self {
261 candidates: HashMap::new(),
262 candidate_recovery_queue: RecoveryQueue::new(recovery_delay_range),
263 active_candidate_recovery: ActiveCandidateRecovery::new(recovery_handle),
264 waiting_for_parent: HashMap::new(),
265 parachain_client,
266 parachain_import_queue,
267 relay_chain_interface,
268 para_id,
269 candidates_in_retry: HashSet::new(),
270 recovery_chan_rx,
271 parachain_sync_service,
272 }
273 }
274
275 fn handle_pending_candidate(
277 &mut self,
278 receipt: CommittedCandidateReceipt,
279 session_index: SessionIndex,
280 ) {
281 let header = match Block::Header::decode(&mut &receipt.commitments.head_data.0[..]) {
282 Ok(header) => header,
283 Err(e) => {
284 tracing::warn!(
285 target: LOG_TARGET,
286 error = ?e,
287 "Failed to decode parachain header from pending candidate",
288 );
289 return
290 },
291 };
292
293 if *header.number() <= self.parachain_client.usage_info().chain.finalized_number {
294 return
295 }
296
297 let hash = header.hash();
298
299 if self.candidates.contains_key(&hash) {
300 return
301 }
302
303 tracing::debug!(target: LOG_TARGET, block_hash = ?hash, "Adding outstanding candidate");
304 self.candidates.insert(
305 hash,
306 Candidate {
307 block_number: *header.number(),
308 receipt: receipt.to_plain(),
309 session_index,
310 parent_hash: *header.parent_hash(),
311 waiting_recovery: false,
312 },
313 );
314
315 self.recover(RecoveryRequest { hash, kind: RecoveryKind::Simple });
318 }
319
320 fn clear_waiting_recovery(&mut self, block_hash: &Block::Hash) {
322 if let Some(candidate) = self.candidates.get_mut(block_hash) {
323 candidate.waiting_recovery = false;
325 }
326 }
327
328 fn handle_block_finalized(&mut self, block_number: NumberFor<Block>) {
330 self.candidates.retain(|_, pc| pc.block_number > block_number);
331 }
332
333 async fn recover_candidate(&mut self, block_hash: Block::Hash) {
335 match self.candidates.get(&block_hash) {
336 Some(candidate) if candidate.waiting_recovery => {
337 tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
338 self.active_candidate_recovery.recover_candidate(block_hash, candidate).await;
339 },
340 _ => (),
341 }
342 }
343
344 fn reset_candidate(&mut self, hash: Block::Hash) {
347 let mut blocks_to_delete = vec![hash];
348
349 while let Some(delete) = blocks_to_delete.pop() {
350 if let Some(children) = self.waiting_for_parent.remove(&delete) {
351 blocks_to_delete.extend(children.iter().map(BlockT::hash));
352 }
353 }
354 self.clear_waiting_recovery(&hash);
355 }
356
357 fn decode_parachain_block_data(
361 data: &[u8],
362 expected_block_hash: Block::Hash,
363 ) -> Option<ParachainBlockData<Block>> {
364 match ParachainBlockData::<Block>::decode_all(&mut &data[..]) {
365 Ok(block_data) => {
366 if block_data.blocks().last().map_or(false, |b| b.hash() == expected_block_hash) {
367 return Some(block_data)
368 }
369
370 tracing::debug!(
371 target: LOG_TARGET,
372 ?expected_block_hash,
373 "Could not find the expected block hash as latest block in `ParachainBlockData`"
374 );
375 },
376 Err(error) => {
377 tracing::debug!(
378 target: LOG_TARGET,
379 ?expected_block_hash,
380 ?error,
381 "Could not decode `ParachainBlockData` from recovered PoV",
382 );
383 },
384 }
385
386 None
387 }
388
389 async fn handle_candidate_recovered(&mut self, block_hash: Block::Hash, pov: Option<&PoV>) {
391 let pov = match pov {
392 Some(pov) => {
393 self.candidates_in_retry.remove(&block_hash);
394 pov
395 },
396 None =>
397 if self.candidates_in_retry.insert(block_hash) {
398 tracing::debug!(target: LOG_TARGET, ?block_hash, "Recovery failed, retrying.");
399 self.candidate_recovery_queue.push_recovery(block_hash);
400 return
401 } else {
402 tracing::warn!(
403 target: LOG_TARGET,
404 ?block_hash,
405 "Unable to recover block after retry.",
406 );
407 self.candidates_in_retry.remove(&block_hash);
408 self.reset_candidate(block_hash);
409 return
410 },
411 };
412
413 let raw_block_data =
414 match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
415 Ok(r) => r,
416 Err(error) => {
417 tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
418
419 self.reset_candidate(block_hash);
420 return
421 },
422 };
423
424 let Some(block_data) = Self::decode_parachain_block_data(&raw_block_data, block_hash)
425 else {
426 self.reset_candidate(block_hash);
427 return
428 };
429
430 let blocks = block_data.into_blocks();
431
432 let Some(parent) = blocks.first().map(|b| *b.header().parent_hash()) else {
433 tracing::debug!(
434 target: LOG_TARGET,
435 ?block_hash,
436 "Recovered candidate doesn't contain any blocks.",
437 );
438
439 self.reset_candidate(block_hash);
440 return;
441 };
442
443 match self.parachain_client.block_status(parent) {
444 Ok(BlockStatus::Unknown) => {
445 let parent_scheduled_for_recovery =
448 self.candidates.get(&parent).map_or(false, |parent| parent.waiting_recovery);
449 if parent_scheduled_for_recovery {
450 tracing::debug!(
451 target: LOG_TARGET,
452 ?block_hash,
453 parent_hash = ?parent,
454 parent_scheduled_for_recovery,
455 waiting_blocks = self.waiting_for_parent.len(),
456 "Waiting for recovery of parent.",
457 );
458
459 blocks.into_iter().for_each(|b| {
460 self.waiting_for_parent
461 .entry(*b.header().parent_hash())
462 .or_default()
463 .push(b);
464 });
465 return
466 } else {
467 tracing::debug!(
468 target: LOG_TARGET,
469 ?block_hash,
470 parent_hash = ?parent,
471 "Parent not found while trying to import recovered block.",
472 );
473
474 self.reset_candidate(block_hash);
475 return
476 }
477 },
478 Err(error) => {
479 tracing::debug!(
480 target: LOG_TARGET,
481 block_hash = ?parent,
482 ?error,
483 "Error while checking block status",
484 );
485
486 self.reset_candidate(block_hash);
487 return
488 },
489 _ => (),
491 }
492
493 self.import_blocks(blocks.into_iter());
494 }
495
496 fn import_blocks(&mut self, blocks: impl Iterator<Item = Block>) {
500 let mut blocks = VecDeque::from_iter(blocks);
501
502 tracing::debug!(
503 target: LOG_TARGET,
504 blocks = ?blocks.iter().map(|b| b.hash()),
505 "Importing blocks retrieved using pov_recovery",
506 );
507
508 let mut incoming_blocks = Vec::new();
509
510 while let Some(block) = blocks.pop_front() {
511 let block_hash = block.hash();
512 let (header, body) = block.deconstruct();
513
514 incoming_blocks.push(IncomingBlock {
515 hash: block_hash,
516 header: Some(header),
517 body: Some(body),
518 import_existing: false,
519 allow_missing_state: false,
520 justifications: None,
521 origin: None,
522 skip_execution: false,
523 state: None,
524 indexed_body: None,
525 });
526
527 if let Some(waiting) = self.waiting_for_parent.remove(&block_hash) {
528 blocks.extend(waiting);
529 }
530 }
531
532 self.parachain_import_queue
533 .import_blocks(BlockOrigin::ConsensusBroadcast, incoming_blocks);
536 }
537
538 pub fn recover(&mut self, req: RecoveryRequest<Block>) {
540 let RecoveryRequest { mut hash, kind } = req;
541 let mut to_recover = Vec::new();
542
543 loop {
544 let candidate = match self.candidates.get_mut(&hash) {
545 Some(candidate) => candidate,
546 None => {
547 tracing::debug!(
548 target: LOG_TARGET,
549 block_hash = ?hash,
550 "Could not recover. Block was never announced as candidate"
551 );
552 return
553 },
554 };
555
556 match self.parachain_client.block_status(hash) {
557 Ok(BlockStatus::Unknown) if !candidate.waiting_recovery => {
558 candidate.waiting_recovery = true;
559 to_recover.push(hash);
560 },
561 Ok(_) => break,
562 Err(e) => {
563 tracing::error!(
564 target: LOG_TARGET,
565 error = ?e,
566 block_hash = ?hash,
567 "Failed to get block status",
568 );
569 for hash in to_recover {
570 self.clear_waiting_recovery(&hash);
571 }
572 return
573 },
574 }
575
576 if kind == RecoveryKind::Simple {
577 break
578 }
579
580 hash = candidate.parent_hash;
581 }
582
583 for hash in to_recover.into_iter().rev() {
584 self.candidate_recovery_queue.push_recovery(hash);
585 }
586 }
587
588 pub async fn run(mut self) {
590 let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
591 let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
592 let pending_candidates = match pending_candidates(
593 self.relay_chain_interface.clone(),
594 self.para_id,
595 self.parachain_sync_service.clone(),
596 )
597 .await
598 {
599 Ok(pending_candidates_stream) => pending_candidates_stream.fuse(),
600 Err(err) => {
601 tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
602 return
603 },
604 };
605
606 futures::pin_mut!(pending_candidates);
607 loop {
608 select! {
609 next_pending_candidates = pending_candidates.next() => {
610 if let Some((candidates, session_index, _)) = next_pending_candidates {
611 for candidate in candidates {
612 self.handle_pending_candidate(candidate, session_index);
613 }
614 } else {
615 tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended");
616 return;
617 }
618 },
619 recovery_req = self.recovery_chan_rx.next() => {
620 if let Some(req) = recovery_req {
621 self.recover(req);
622 } else {
623 tracing::debug!(target: LOG_TARGET, "Recovery channel stream ended");
624 return;
625 }
626 },
627 imported = imported_blocks.next() => {
628 if let Some(imported) = imported {
629 self.clear_waiting_recovery(&imported.hash);
630
631 if let Some(waiting_blocks) = self.waiting_for_parent.remove(&imported.hash) {
635 for block in waiting_blocks {
636 tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), resolved_parent = ?imported.hash, "Found new waiting child block during import, queuing.");
637 self.import_blocks(std::iter::once(block));
638 }
639 };
640
641 } else {
642 tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended");
643 return;
644 }
645 },
646 finalized = finalized_blocks.next() => {
647 if let Some(finalized) = finalized {
648 self.handle_block_finalized(*finalized.header.number());
649 } else {
650 tracing::debug!(target: LOG_TARGET, "Finalized blocks stream ended");
651 return;
652 }
653 },
654 next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => {
655 self.recover_candidate(next_to_recover).await;
656 },
657 (block_hash, pov) =
658 self.active_candidate_recovery.wait_for_recovery().fuse() =>
659 {
660 self.handle_candidate_recovered(block_hash, pov.as_deref()).await;
661 },
662 }
663 }
664 }
665}