1use sc_client_api::{BlockBackend, BlockchainEvents, UsageProvider};
51use sc_consensus::import_queue::{ImportQueueService, IncomingBlock};
52use sp_consensus::{BlockOrigin, BlockStatus, SyncOracle};
53use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
54
55use polkadot_node_primitives::{PoV, POV_BOMB_LIMIT};
56use polkadot_node_subsystem::messages::AvailabilityRecoveryMessage;
57use polkadot_overseer::Handle as OverseerHandle;
58use polkadot_primitives::{
59 CandidateReceiptV2 as CandidateReceipt,
60 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, Id as ParaId, SessionIndex,
61};
62
63use cumulus_primitives_core::ParachainBlockData;
64use cumulus_relay_chain_interface::RelayChainInterface;
65use cumulus_relay_chain_streams::pending_candidates;
66
67use codec::{Decode, DecodeAll};
68use futures::{
69 channel::mpsc::Receiver, select, stream::FuturesUnordered, Future, FutureExt, StreamExt,
70};
71use futures_timer::Delay;
72use rand::{distributions::Uniform, prelude::Distribution, thread_rng};
73
74use std::{
75 collections::{HashMap, HashSet, VecDeque},
76 pin::Pin,
77 sync::Arc,
78 time::Duration,
79};
80
81#[cfg(test)]
82mod tests;
83
84mod active_candidate_recovery;
85use active_candidate_recovery::ActiveCandidateRecovery;
86
87const LOG_TARGET: &str = "cumulus-pov-recovery";
88
89#[async_trait::async_trait]
92pub trait RecoveryHandle: Send {
93 async fn send_recovery_msg(
94 &mut self,
95 message: AvailabilityRecoveryMessage,
96 origin: &'static str,
97 );
98}
99
100#[async_trait::async_trait]
101impl RecoveryHandle for OverseerHandle {
102 async fn send_recovery_msg(
103 &mut self,
104 message: AvailabilityRecoveryMessage,
105 origin: &'static str,
106 ) {
107 self.send_msg(message, origin).await;
108 }
109}
110
111#[derive(Debug, PartialEq)]
113pub enum RecoveryKind {
114 Simple,
116 Full,
118}
119
120pub struct RecoveryRequest<Block: BlockT> {
122 pub hash: Block::Hash,
124 pub kind: RecoveryKind,
126}
127
128#[derive(Clone, Copy)]
134pub struct RecoveryDelayRange {
135 pub min: Duration,
137 pub max: Duration,
139}
140
141impl RecoveryDelayRange {
142 fn duration(&self) -> Duration {
144 Uniform::from(self.min..=self.max).sample(&mut thread_rng())
145 }
146}
147
148struct Candidate<Block: BlockT> {
150 receipt: CandidateReceipt,
151 session_index: SessionIndex,
152 block_number: NumberFor<Block>,
153 parent_hash: Block::Hash,
154 waiting_recovery: bool,
158}
159
160struct RecoveryQueue<Block: BlockT> {
162 recovery_delay_range: RecoveryDelayRange,
163 recovery_queue: VecDeque<Block::Hash>,
165 signaling_queue: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
167}
168
169impl<Block: BlockT> RecoveryQueue<Block> {
170 pub fn new(recovery_delay_range: RecoveryDelayRange) -> Self {
171 Self {
172 recovery_delay_range,
173 recovery_queue: Default::default(),
174 signaling_queue: Default::default(),
175 }
176 }
177
178 pub fn push_recovery(&mut self, hash: Block::Hash) {
181 let delay = self.recovery_delay_range.duration();
182 tracing::debug!(
183 target: LOG_TARGET,
184 block_hash = ?hash,
185 "Adding block to queue and adding new recovery slot in {:?} sec",
186 delay.as_secs(),
187 );
188 self.recovery_queue.push_back(hash);
189 self.signaling_queue.push(
190 async move {
191 Delay::new(delay).await;
192 }
193 .boxed(),
194 );
195 }
196
197 pub async fn next_recovery(&mut self) -> Block::Hash {
199 loop {
200 if self.signaling_queue.next().await.is_some() {
201 if let Some(hash) = self.recovery_queue.pop_front() {
202 return hash;
203 } else {
204 tracing::error!(
205 target: LOG_TARGET,
206 "Recovery was signaled, but no candidate hash available. This is a bug."
207 );
208 };
209 }
210 futures::pending!()
211 }
212 }
213}
214
215pub struct PoVRecovery<Block: BlockT, PC, RC> {
217 candidates: HashMap<Block::Hash, Candidate<Block>>,
220 candidate_recovery_queue: RecoveryQueue<Block>,
225 active_candidate_recovery: ActiveCandidateRecovery<Block>,
226 waiting_for_parent: HashMap<Block::Hash, Vec<Block>>,
230 parachain_client: Arc<PC>,
231 parachain_import_queue: Box<dyn ImportQueueService<Block>>,
232 relay_chain_interface: RC,
233 para_id: ParaId,
234 recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
236 candidates_in_retry: HashSet<Block::Hash>,
238 parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
239}
240
241impl<Block: BlockT, PC, RCInterface> PoVRecovery<Block, PC, RCInterface>
242where
243 PC: BlockBackend<Block> + BlockchainEvents<Block> + UsageProvider<Block>,
244 RCInterface: RelayChainInterface + Clone,
245{
246 pub fn new(
248 recovery_handle: Box<dyn RecoveryHandle>,
249 recovery_delay_range: RecoveryDelayRange,
250 parachain_client: Arc<PC>,
251 parachain_import_queue: Box<dyn ImportQueueService<Block>>,
252 relay_chain_interface: RCInterface,
253 para_id: ParaId,
254 recovery_chan_rx: Receiver<RecoveryRequest<Block>>,
255 parachain_sync_service: Arc<dyn SyncOracle + Sync + Send>,
256 ) -> Self {
257 Self {
258 candidates: HashMap::new(),
259 candidate_recovery_queue: RecoveryQueue::new(recovery_delay_range),
260 active_candidate_recovery: ActiveCandidateRecovery::new(recovery_handle),
261 waiting_for_parent: HashMap::new(),
262 parachain_client,
263 parachain_import_queue,
264 relay_chain_interface,
265 para_id,
266 candidates_in_retry: HashSet::new(),
267 recovery_chan_rx,
268 parachain_sync_service,
269 }
270 }
271
272 fn handle_pending_candidate(
274 &mut self,
275 receipt: CommittedCandidateReceipt,
276 session_index: SessionIndex,
277 ) {
278 let header = match Block::Header::decode(&mut &receipt.commitments.head_data.0[..]) {
279 Ok(header) => header,
280 Err(e) => {
281 tracing::warn!(
282 target: LOG_TARGET,
283 error = ?e,
284 "Failed to decode parachain header from pending candidate",
285 );
286 return;
287 },
288 };
289
290 if *header.number() <= self.parachain_client.usage_info().chain.finalized_number {
291 return;
292 }
293
294 let hash = header.hash();
295
296 if self.candidates.contains_key(&hash) {
297 return;
298 }
299
300 tracing::debug!(target: LOG_TARGET, block_hash = ?hash, "Adding outstanding candidate");
301 self.candidates.insert(
302 hash,
303 Candidate {
304 block_number: *header.number(),
305 receipt: receipt.to_plain(),
306 session_index,
307 parent_hash: *header.parent_hash(),
308 waiting_recovery: false,
309 },
310 );
311
312 self.recover(RecoveryRequest { hash, kind: RecoveryKind::Simple });
315 }
316
317 fn clear_waiting_recovery(&mut self, block_hash: &Block::Hash) {
319 if let Some(candidate) = self.candidates.get_mut(block_hash) {
320 candidate.waiting_recovery = false;
322 }
323 }
324
325 fn handle_block_finalized(&mut self, block_number: NumberFor<Block>) {
327 self.candidates.retain(|_, pc| pc.block_number > block_number);
328 }
329
330 async fn recover_candidate(&mut self, block_hash: Block::Hash) {
332 match self.candidates.get(&block_hash) {
333 Some(candidate) if candidate.waiting_recovery => {
334 tracing::debug!(target: LOG_TARGET, ?block_hash, "Issuing recovery request");
335 self.active_candidate_recovery.recover_candidate(block_hash, candidate).await;
336 },
337 _ => (),
338 }
339 }
340
341 fn reset_candidate(&mut self, hash: Block::Hash) {
344 let mut blocks_to_delete = vec![hash];
345
346 while let Some(delete) = blocks_to_delete.pop() {
347 if let Some(children) = self.waiting_for_parent.remove(&delete) {
348 blocks_to_delete.extend(children.iter().map(BlockT::hash));
349 }
350 }
351 self.clear_waiting_recovery(&hash);
352 }
353
354 fn decode_parachain_block_data(
358 data: &[u8],
359 expected_block_hash: Block::Hash,
360 ) -> Option<ParachainBlockData<Block>> {
361 match ParachainBlockData::<Block>::decode_all(&mut &data[..]) {
362 Ok(block_data) => {
363 if block_data.blocks().last().map_or(false, |b| b.hash() == expected_block_hash) {
364 return Some(block_data);
365 }
366
367 tracing::debug!(
368 target: LOG_TARGET,
369 ?expected_block_hash,
370 "Could not find the expected block hash as latest block in `ParachainBlockData`"
371 );
372 },
373 Err(error) => {
374 tracing::debug!(
375 target: LOG_TARGET,
376 ?expected_block_hash,
377 ?error,
378 "Could not decode `ParachainBlockData` from recovered PoV",
379 );
380 },
381 }
382
383 None
384 }
385
386 async fn handle_candidate_recovered(&mut self, block_hash: Block::Hash, pov: Option<&PoV>) {
388 let pov = match pov {
389 Some(pov) => {
390 self.candidates_in_retry.remove(&block_hash);
391 pov
392 },
393 None => {
394 if self.candidates_in_retry.insert(block_hash) {
395 tracing::debug!(target: LOG_TARGET, ?block_hash, "Recovery failed, retrying.");
396 self.candidate_recovery_queue.push_recovery(block_hash);
397 return;
398 } else {
399 tracing::warn!(
400 target: LOG_TARGET,
401 ?block_hash,
402 "Unable to recover block after retry.",
403 );
404 self.candidates_in_retry.remove(&block_hash);
405 self.reset_candidate(block_hash);
406 return;
407 }
408 },
409 };
410
411 let raw_block_data =
412 match sp_maybe_compressed_blob::decompress(&pov.block_data.0, POV_BOMB_LIMIT) {
413 Ok(r) => r,
414 Err(error) => {
415 tracing::debug!(target: LOG_TARGET, ?error, "Failed to decompress PoV");
416
417 self.reset_candidate(block_hash);
418 return;
419 },
420 };
421
422 let Some(block_data) = Self::decode_parachain_block_data(&raw_block_data, block_hash)
423 else {
424 self.reset_candidate(block_hash);
425 return;
426 };
427
428 let blocks = block_data.into_blocks();
429
430 let Some(parent) = blocks.first().map(|b| *b.header().parent_hash()) else {
431 tracing::debug!(
432 target: LOG_TARGET,
433 ?block_hash,
434 "Recovered candidate doesn't contain any blocks.",
435 );
436
437 self.reset_candidate(block_hash);
438 return;
439 };
440
441 match self.parachain_client.block_status(parent) {
442 Ok(BlockStatus::Unknown) => {
443 let parent_scheduled_for_recovery =
446 self.candidates.get(&parent).map_or(false, |parent| parent.waiting_recovery);
447 if parent_scheduled_for_recovery {
448 tracing::debug!(
449 target: LOG_TARGET,
450 ?block_hash,
451 parent_hash = ?parent,
452 parent_scheduled_for_recovery,
453 waiting_blocks = self.waiting_for_parent.len(),
454 "Waiting for recovery of parent.",
455 );
456
457 blocks.into_iter().for_each(|b| {
458 self.waiting_for_parent
459 .entry(*b.header().parent_hash())
460 .or_default()
461 .push(b);
462 });
463 return;
464 } else {
465 tracing::debug!(
466 target: LOG_TARGET,
467 ?block_hash,
468 parent_hash = ?parent,
469 "Parent not found while trying to import recovered block.",
470 );
471
472 self.reset_candidate(block_hash);
473 return;
474 }
475 },
476 Err(error) => {
477 tracing::debug!(
478 target: LOG_TARGET,
479 block_hash = ?parent,
480 ?error,
481 "Error while checking block status",
482 );
483
484 self.reset_candidate(block_hash);
485 return;
486 },
487 _ => (),
489 }
490
491 self.import_blocks(blocks.into_iter());
492 }
493
494 fn import_blocks(&mut self, blocks: impl Iterator<Item = Block>) {
498 let mut blocks = VecDeque::from_iter(blocks);
499
500 tracing::debug!(
501 target: LOG_TARGET,
502 blocks = ?blocks.iter().map(|b| b.hash()),
503 "Importing blocks retrieved using pov_recovery",
504 );
505
506 let mut incoming_blocks = Vec::new();
507
508 while let Some(block) = blocks.pop_front() {
509 let block_hash = block.hash();
510 let (header, body) = block.deconstruct();
511
512 incoming_blocks.push(IncomingBlock {
513 hash: block_hash,
514 header: Some(header),
515 body: Some(body),
516 import_existing: false,
517 allow_missing_state: false,
518 justifications: None,
519 origin: None,
520 skip_execution: false,
521 state: None,
522 indexed_body: None,
523 });
524
525 if let Some(waiting) = self.waiting_for_parent.remove(&block_hash) {
526 blocks.extend(waiting);
527 }
528 }
529
530 self.parachain_import_queue
531 .import_blocks(BlockOrigin::ConsensusBroadcast, incoming_blocks);
534 }
535
536 pub fn recover(&mut self, req: RecoveryRequest<Block>) {
538 let RecoveryRequest { mut hash, kind } = req;
539 let mut to_recover = Vec::new();
540
541 loop {
542 let candidate = match self.candidates.get_mut(&hash) {
543 Some(candidate) => candidate,
544 None => {
545 tracing::debug!(
546 target: LOG_TARGET,
547 block_hash = ?hash,
548 "Could not recover. Block was never announced as candidate"
549 );
550 return;
551 },
552 };
553
554 match self.parachain_client.block_status(hash) {
555 Ok(BlockStatus::Unknown) if !candidate.waiting_recovery => {
556 candidate.waiting_recovery = true;
557 to_recover.push(hash);
558 },
559 Ok(_) => break,
560 Err(e) => {
561 tracing::error!(
562 target: LOG_TARGET,
563 error = ?e,
564 block_hash = ?hash,
565 "Failed to get block status",
566 );
567 for hash in to_recover {
568 self.clear_waiting_recovery(&hash);
569 }
570 return;
571 },
572 }
573
574 if kind == RecoveryKind::Simple {
575 break;
576 }
577
578 hash = candidate.parent_hash;
579 }
580
581 for hash in to_recover.into_iter().rev() {
582 self.candidate_recovery_queue.push_recovery(hash);
583 }
584 }
585
586 pub async fn run(mut self) {
588 let mut imported_blocks = self.parachain_client.import_notification_stream().fuse();
589 let mut finalized_blocks = self.parachain_client.finality_notification_stream().fuse();
590 let pending_candidates = match pending_candidates(
591 self.relay_chain_interface.clone(),
592 self.para_id,
593 self.parachain_sync_service.clone(),
594 )
595 .await
596 {
597 Ok(pending_candidates_stream) => pending_candidates_stream.fuse(),
598 Err(err) => {
599 tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve pending candidate stream.");
600 return;
601 },
602 };
603
604 futures::pin_mut!(pending_candidates);
605 loop {
606 select! {
607 next_pending_candidates = pending_candidates.next() => {
608 if let Some((candidates, session_index, _)) = next_pending_candidates {
609 for candidate in candidates {
610 self.handle_pending_candidate(candidate, session_index);
611 }
612 } else {
613 tracing::debug!(target: LOG_TARGET, "Pending candidates stream ended");
614 return;
615 }
616 },
617 recovery_req = self.recovery_chan_rx.next() => {
618 if let Some(req) = recovery_req {
619 self.recover(req);
620 } else {
621 tracing::debug!(target: LOG_TARGET, "Recovery channel stream ended");
622 return;
623 }
624 },
625 imported = imported_blocks.next() => {
626 if let Some(imported) = imported {
627 self.clear_waiting_recovery(&imported.hash);
628
629 if let Some(waiting_blocks) = self.waiting_for_parent.remove(&imported.hash) {
633 for block in waiting_blocks {
634 tracing::debug!(target: LOG_TARGET, block_hash = ?block.hash(), resolved_parent = ?imported.hash, "Found new waiting child block during import, queuing.");
635 self.import_blocks(std::iter::once(block));
636 }
637 };
638
639 } else {
640 tracing::debug!(target: LOG_TARGET, "Imported blocks stream ended");
641 return;
642 }
643 },
644 finalized = finalized_blocks.next() => {
645 if let Some(finalized) = finalized {
646 self.handle_block_finalized(*finalized.header.number());
647 } else {
648 tracing::debug!(target: LOG_TARGET, "Finalized blocks stream ended");
649 return;
650 }
651 },
652 next_to_recover = self.candidate_recovery_queue.next_recovery().fuse() => {
653 self.recover_candidate(next_to_recover).await;
654 },
655 (block_hash, pov) =
656 self.active_candidate_recovery.wait_for_recovery().fuse() =>
657 {
658 self.handle_candidate_recovered(block_hash, pov.as_deref()).await;
659 },
660 }
661 }
662 }
663}