1use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
51use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
52use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
53use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
54
55use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT};
56use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
57use polkadot_overseer::Handle as OverseerHandle;
58use polkadot_primitives::{
59 CandidateReceiptV2 as CandidateReceipt,
60 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, Id as ParaId, SessionIndex,
61};
62
63use cumulus_primitives_core::ParachainBlockData;
64use cumulus_relay_chain_interface::RelayChainInterface;
65use cumulus_relay_chain_streams::pending_candidates;
66
67use codec::{Decode, DecodeAll};
68use futures::{
69 channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, StreamExt,
70};
71use futures_timer::Delay;
72use rand::{distributions::Uniform, prelude::Distribution, thread_rng};
73
74use std::{
75 collections::{HashMap, HashSet, VecDeque},
76 pin::Pin,
77 sync::Arc,
78 time::Duration,
79};
80
81#[cfg(test)]
82mod tests;
83
84mod active_candidate_recovery;
85use active_candidate_recovery::ActiveCandidateRecovery;
86
87const LOG_TARGET: &str = "cumulus-pov-recovery";
88
89#[async_trait::async_trait]
92pub trait RecoveryHandle: Send {
93 async fn send_recovery_msg(
94 &mut self,
95 message: AvailabilityRecoveryMessage,
96 origin: &'static str,
97 );
98}
99
100#[async_trait::async_trait]
101impl RecoveryHandle for OverseerHandle {
102 async fn send_recovery_msg(
103 &mut self,
104 message: AvailabilityRecoveryMessage,
105 origin: &'static str,
106 ) {
107 self.send_msg(message, origin).await;
108 }
109}
110
111#[derive(Debug, PartialEq)]
113pub enum RecoveryKind {
114 Simple,
116 Full,
118}
119
120pub struct RecoveryRequest<Block: BlockT> {
122 pub hash: Block::Hash,
124 pub kind: RecoveryKind,
126}
127
128#[derive(Clone, Copy)]
134pub struct RecoveryDelayRange {
135 pub min: Duration,
137 pub max: Duration,
139}
140
141impl RecoveryDelayRange {
142 fn duration(&self) -> Duration {
144 Uniform::from(self.min..=self.max).sample(&mut thread_rng())
145 }
146}
147
148struct Candidate<Block: BlockT> {
150 receipt: CandidateReceipt,
151 session_index: SessionIndex,
152 block_number: NumberFor<Block>,
153 parent_hash: Block::Hash,
154 waiting_recovery: bool,
158}
159
160struct RecoveryQueue<Block: BlockT> {
162 recovery_delay_range: RecoveryDelayRange,
163 recovery_queue: VecDeque<Block::Hash>,
165 signaling_queue: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
167}
168
169impl<Block: BlockT> RecoveryQueue<Block> {
170 pub fn new(recovery_delay_range: RecoveryDelayRange) -> Self {
171 Self {
172 recovery_delay_range,
173 recovery_queue: Default::default(),
174 signaling_queue: Default::default(),
175 }
176 }
177
178 pub fn push_recovery(&mut self, hash: Block::Hash) {
181 let delay = self.recovery_delay_range.duration();
182 tracing::debug!(
183 target: LOG_TARGET,
184 block_hash = ?hash,
185 "Adding block to queue and adding new recovery slot in {:?} sec",
186 delay.as_secs(),
187 );
188 self.recovery_queue.push_back(hash);
189 self.signaling_queue.push(
190 async move {
191 Delay::new(delay).await;
192 }
193 .boxed(),
194 );
195 }
196
197 pub async fn next_recovery(&mut self) -> Block::Hash {
199 loop {
200 if self.signaling_queue.next().await.is_some() {
201 if let Some(hash) = self.recovery_queue.pop_front() {
202 return hash
203 } else {
204 tracing::error!(
205 target: LOG_TARGET,
206 "Recovery was signaled, but no candidate hash available. This is a bug."
207 );
208 };
209 }
210 futures::pending!()
211 }
212 }
213}
214
215pub struct PoVRecovery<Block: BlockT, PC, RC> {
217 candidates: HashMap<Block::Hash, Candidate<Block>>,
220 candidate_recovery_queue: RecoveryQueue<Block>,
225 active_candidate_recovery: ActiveCandidateRecovery<Block>,
226 waiting_for_parent: HashMap<Block::Hash, Vec<Block>>,
230 parachain_client: Arc<PC>,
231 parachain_import_queue: Box<dyn ImportQueueService<Block>>,
232 relay_chain_interface: RC,
233 para_id: ParaId,
234 recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
236 candidates_in_retry: HashSet<Block::Hash>,
238 parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
239}
240
241impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
242where
243 PC: BlockBackend<Block> + BlockchainEvents<Block> + UsageProvider<Block>,
244 RCInterface: RelayChainInterface + Clone,
245{
246 pub fn new(
248 recovery_handle: Box<dyn RecoveryHandle>,
249 recovery_delay_range: RecoveryDelayRange,
250 parachain_client: Arc<PC>,
251 parachain_import_queue: Box<dyn ImportQueueService<Block>>,
252 relay_chain_interface: RCInterface,
253 para_id: ParaId,
254 recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
255 parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
256 ) -> Self {
257 Self {
258 candidates: HashMap::new(),
259 candidate_recovery_queue: RecoveryQueue::new(recovery_delay_range),
260 active_candidate_recovery: ActiveCandidateRecovery::new(recovery_handle),
261 waiting_for_parent: HashMap::new(),
262 parachain_client,
263 parachain_import_queue,
264 relay_chain_interface,
265 para_id,
266 candidates_in_retry: HashSet::new(),
267 recovery_chan_rx,
268 parachain_sync_service,
269 }
270 }
271
272 fn handle_pending_candidate(
274 &mut self,
275 receipt: CommittedCandidateReceipt,
276 session_index: SessionIndex,
277 ) {
278 let header = match Block::Header::decode(&mut &receipt.commitments.head_data.0[..]) {
279 Ok(header) => header,
280 Err(e) => {
281 tracing::warn!(
282 target: LOG_TARGET,
283 error = ?e,
284 "Failed to decode parachain header from pending candidate",
285 );
286 return
287 },
288 };
289
290 if *header.number() <= self.parachain_client.usage_info().chain.finalized_number {
291 return
292 }
293
294 let hash = header.hash();
295
296 if self.candidates.contains_key(&hash) {
297 return
298 }
299
300 tracing::debug!(target: LOG_TARGET, block_hash = ?hash, "Adding outstanding candidate");
301 self.candidates.insert(
302 hash,
303 Candidate {
304 block_number: *header.number(),
305 receipt: receipt.to_plain(),
306 session_index,
307 parent_hash: *header.parent_hash(),
308 waiting_recovery: false,
309 },
310 );
311
312 self.recover(RecoveryRequest { hash, kind: RecoveryKind::Simple });
315 }
316
317 fn clear_waiting_recovery(&mut self, block_hash: &Block::Hash) {
319 if let Some(candidate) = self.candidates.get_mut(block_hash) {
320 candidate.waiting_recovery = false;
322 }
323 }
324
325 fn handle_block_finalized(&mut self, block_number: NumberFor<Block>) {
327 self.candidates.retain(|_, pc| pc.block_number > block_number);
328 }
329
330 async fn recover_candidate(&mut self, block_hash: Block::Hash) {
332 match self.candidates.get(&block_hash) {
333 Some(candidate) if candidate.waiting_recovery => {
334 tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
335 self.active_candidate_recovery.recover_candidate(block_hash, candidate).await;
336 },
337 _ => (),
338 }
339 }
340
341 fn reset_candidate(&mut self, hash: Block::Hash) {
344 let mut blocks_to_delete = vec![hash];
345
346 while let Some(delete) = blocks_to_delete.pop() {
347 if let Some(children) = self.waiting_for_parent.remove(&delete) {
348 blocks_to_delete.extend(children.iter().map(BlockT::hash));
349 }
350 }
351 self.clear_waiting_recovery(&hash);
352 }
353
354 fn decode_parachain_block_data(
358 data: &[u8],
359 expected_block_hash: Block::Hash,
360 ) -> Option<ParachainBlockData<Block>> {
361 match ParachainBlockData::<Block>::decode_all(&mut &data[..]) {
362 Ok(block_data) => {
363 if block_data.blocks().last().map_or(false, |b| b.hash() == expected_block_hash) {
364 return Some(block_data)
365 }
366
367 tracing::debug!(
368 target: LOG_TARGET,
369 ?expected_block_hash,
370 "Could not find the expected block hash as latest block in `ParachainBlockData`"
371 );
372 },
373 Err(error) => {
374 tracing::debug!(
375 target: LOG_TARGET,
376 ?expected_block_hash,
377 ?error,
378 "Could not decode `ParachainBlockData` from recovered PoV",
379 );
380 },
381 }
382
383 None
384 }
385
386 async fn handle_candidate_recovered(&mut self, block_hash: Block::Hash, pov: Option<&PoV>) {
388 let pov = match pov {
389 Some(pov) => {
390 self.candidates_in_retry.remove(&block_hash);
391 pov
392 },
393 None =>
394 if self.candidates_in_retry.insert(block_hash) {
395 tracing::debug!(target: LOG_TARGET, ?block_hash, "Recovery failed, retrying.");
396 self.candidate_recovery_queue.push_recovery(block_hash);
397 return
398 } else {
399 tracing::warn!(
400 target: LOG_TARGET,
401 ?block_hash,
402 "Unable to recover block after retry.",
403 );
404 self.candidates_in_retry.remove(&block_hash);
405 self.reset_candidate(block_hash);
406 return
407 },
408 };
409
410 let raw_block_data =
411 match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
412 Ok(r) => r,
413 Err(error) => {
414 tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
415
416 self.reset_candidate(block_hash);
417 return
418 },
419 };
420
421 let Some(block_data) = Self::decode_parachain_block_data(&raw_block_data, block_hash)
422 else {
423 self.reset_candidate(block_hash);
424 return
425 };
426
427 let blocks = block_data.into_blocks();
428
429 let Some(parent) = blocks.first().map(|b| *b.header().parent_hash()) else {
430 tracing::debug!(
431 target: LOG_TARGET,
432 ?block_hash,
433 "Recovered candidate doesn't contain any blocks.",
434 );
435
436 self.reset_candidate(block_hash);
437 return;
438 };
439
440 match self.parachain_client.block_status(parent) {
441 Ok(BlockStatus::Unknown) => {
442 let parent_scheduled_for_recovery =
445 self.candidates.get(&parent).map_or(false, |parent| parent.waiting_recovery);
446 if parent_scheduled_for_recovery {
447 tracing::debug!(
448 target: LOG_TARGET,
449 ?block_hash,
450 parent_hash = ?parent,
451 parent_scheduled_for_recovery,
452 waiting_blocks = self.waiting_for_parent.len(),
453 "Waiting for recovery of parent.",
454 );
455
456 blocks.into_iter().for_each(|b| {
457 self.waiting_for_parent
458 .entry(*b.header().parent_hash())
459 .or_default()
460 .push(b);
461 });
462 return
463 } else {
464 tracing::debug!(
465 target: LOG_TARGET,
466 ?block_hash,
467 parent_hash = ?parent,
468 "Parent not found while trying to import recovered block.",
469 );
470
471 self.reset_candidate(block_hash);
472 return
473 }
474 },
475 Err(error) => {
476 tracing::debug!(
477 target: LOG_TARGET,
478 block_hash = ?parent,
479 ?error,
480 "Error while checking block status",
481 );
482
483 self.reset_candidate(block_hash);
484 return
485 },
486 _ => (),
488 }
489
490 self.import_blocks(blocks.into_iter());
491 }
492
493 fn import_blocks(&mut self, blocks: impl Iterator<Item = Block>) {
497 let mut blocks = VecDeque::from_iter(blocks);
498
499 tracing::debug!(
500 target: LOG_TARGET,
501 blocks = ?blocks.iter().map(|b| b.hash()),
502 "Importing blocks retrieved using pov_recovery",
503 );
504
505 let mut incoming_blocks = Vec::new();
506
507 while let Some(block) = blocks.pop_front() {
508 let block_hash = block.hash();
509 let (header, body) = block.deconstruct();
510
511 incoming_blocks.push(IncomingBlock {
512 hash: block_hash,
513 header: Some(header),
514 body: Some(body),
515 import_existing: false,
516 allow_missing_state: false,
517 justifications: None,
518 origin: None,
519 skip_execution: false,
520 state: None,
521 indexed_body: None,
522 });
523
524 if let Some(waiting) = self.waiting_for_parent.remove(&block_hash) {
525 blocks.extend(waiting);
526 }
527 }
528
529 self.parachain_import_queue
530 .import_blocks(BlockOrigin::ConsensusBroadcast, incoming_blocks);
533 }
534
535 pub fn recover(&mut self, req: RecoveryRequest<Block>) {
537 let RecoveryRequest { mut hash, kind } = req;
538 let mut to_recover = Vec::new();
539
540 loop {
541 let candidate = match self.candidates.get_mut(&hash) {
542 Some(candidate) => candidate,
543 None => {
544 tracing::debug!(
545 target: LOG_TARGET,
546 block_hash = ?hash,
547 "Could not recover. Block was never announced as candidate"
548 );
549 return
550 },
551 };
552
553 match self.parachain_client.block_status(hash) {
554 Ok(BlockStatus::Unknown) if !candidate.waiting_recovery => {
555 candidate.waiting_recovery = true;
556 to_recover.push(hash);
557 },
558 Ok(_) => break,
559 Err(e) => {
560 tracing::error!(
561 target: LOG_TARGET,
562 error = ?e,
563 block_hash = ?hash,
564 "Failed to get block status",
565 );
566 for hash in to_recover {
567 self.clear_waiting_recovery(&hash);
568 }
569 return
570 },
571 }
572
573 if kind == RecoveryKind::Simple {
574 break
575 }
576
577 hash = candidate.parent_hash;
578 }
579
580 for hash in to_recover.into_iter().rev() {
581 self.candidate_recovery_queue.push_recovery(hash);
582 }
583 }
584
585 pub async fn run(mut self) {
587 let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
588 let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
589 let pending_candidates = match pending_candidates(
590 self.relay_chain_interface.clone(),
591 self.para_id,
592 self.parachain_sync_service.clone(),
593 )
594 .await
595 {
596 Ok(pending_candidates_stream) => pending_candidates_stream.fuse(),
597 Err(err) => {
598 tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
599 return
600 },
601 };
602
603 futures::pin_mut!(pending_candidates);
604 loop {
605 select! {
606 next_pending_candidates = pending_candidates.next() => {
607 if let Some((candidates, session_index, _)) = next_pending_candidates {
608 for candidate in candidates {
609 self.handle_pending_candidate(candidate, session_index);
610 }
611 } else {
612 tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended");
613 return;
614 }
615 },
616 recovery_req = self.recovery_chan_rx.next() => {
617 if let Some(req) = recovery_req {
618 self.recover(req);
619 } else {
620 tracing::debug!(target: LOG_TARGET, "Recovery channel stream ended");
621 return;
622 }
623 },
624 imported = imported_blocks.next() => {
625 if let Some(imported) = imported {
626 self.clear_waiting_recovery(&imported.hash);
627
628 if let Some(waiting_blocks) = self.waiting_for_parent.remove(&imported.hash) {
632 for block in waiting_blocks {
633 tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), resolved_parent = ?imported.hash, "Found new waiting child block during import, queuing.");
634 self.import_blocks(std::iter::once(block));
635 }
636 };
637
638 } else {
639 tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended");
640 return;
641 }
642 },
643 finalized = finalized_blocks.next() => {
644 if let Some(finalized) = finalized {
645 self.handle_block_finalized(*finalized.header.number());
646 } else {
647 tracing::debug!(target: LOG_TARGET, "Finalized blocks stream ended");
648 return;
649 }
650 },
651 next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => {
652 self.recover_candidate(next_to_recover).await;
653 },
654 (block_hash, pov) =
655 self.active_candidate_recovery.wait_for_recovery().fuse() =>
656 {
657 self.handle_candidate_recovered(block_hash, pov.as_deref()).await;
658 },
659 }
660 }
661 }
662}