1#![deny(unused_crate_dependencies)]
30
31use std::collections::{BTreeSet, HashMap, HashSet};
32
33use fragment_chain::CandidateStorage;
34use futures::{channel::oneshot, prelude::*};
35
36use polkadot_node_subsystem::{
37 messages::{
38 Ancestors, ChainApiMessage, HypotheticalCandidate, HypotheticalMembership,
39 HypotheticalMembershipRequest, IntroduceSecondedCandidateRequest, ParentHeadData,
40 ProspectiveParachainsMessage, ProspectiveValidationDataRequest, RuntimeApiMessage,
41 RuntimeApiRequest,
42 },
43 overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
44};
45use polkadot_node_subsystem_util::{
46 backing_implicit_view::{BlockInfoProspectiveParachains as BlockInfo, View as ImplicitView},
47 inclusion_emulator::{Constraints, RelayChainBlockInfo},
48 request_backing_constraints, request_candidates_pending_availability,
49 request_session_index_for_child,
50 runtime::{fetch_claim_queue, fetch_scheduling_lookahead},
51};
52use polkadot_primitives::{
53 transpose_claim_queue, BlockNumber, CandidateHash,
54 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, Hash, Header, Id as ParaId,
55 PersistedValidationData,
56};
57
58use crate::{
59 error::{FatalError, FatalResult, JfyiError, JfyiErrorResult, Result},
60 fragment_chain::{
61 CandidateEntry, Error as FragmentChainError, FragmentChain, Scope as FragmentChainScope,
62 },
63};
64
65mod error;
66mod fragment_chain;
67#[cfg(test)]
68mod tests;
69
70mod metrics;
71use self::metrics::Metrics;
72
73const LOG_TARGET: &str = "parachain::prospective-parachains";
74
75struct RelayBlockViewData {
76 fragment_chains: HashMap<ParaId, FragmentChain>,
78}
79
80struct View {
81 per_relay_parent: HashMap<Hash, RelayBlockViewData>,
83 active_leaves: HashSet<Hash>,
86 implicit_view: ImplicitView,
88}
89
90impl View {
91 fn new() -> Self {
93 View {
94 per_relay_parent: HashMap::new(),
95 active_leaves: HashSet::new(),
96 implicit_view: ImplicitView::default(),
97 }
98 }
99
100 fn get_fragment_chains(&self, leaf: &Hash) -> Option<&HashMap<ParaId, FragmentChain>> {
102 self.per_relay_parent.get(&leaf).map(|view_data| &view_data.fragment_chains)
103 }
104}
105
106#[derive(Default)]
108pub struct ProspectiveParachainsSubsystem {
109 metrics: Metrics,
110}
111
112impl ProspectiveParachainsSubsystem {
113 pub fn new(metrics: Metrics) -> Self {
115 Self { metrics }
116 }
117}
118
119#[overseer::subsystem(ProspectiveParachains, error = SubsystemError, prefix = self::overseer)]
120impl<Context> ProspectiveParachainsSubsystem
121where
122 Context: Send + Sync,
123{
124 fn start(self, ctx: Context) -> SpawnedSubsystem {
125 SpawnedSubsystem {
126 future: run(ctx, self.metrics)
127 .map_err(|e| SubsystemError::with_origin("prospective-parachains", e))
128 .boxed(),
129 name: "prospective-parachains-subsystem",
130 }
131 }
132}
133
134#[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)]
135async fn run<Context>(mut ctx: Context, metrics: Metrics) -> FatalResult<()> {
136 let mut view = View::new();
137 loop {
138 crate::error::log_error(
139 run_iteration(&mut ctx, &mut view, &metrics).await,
140 "Encountered issue during run iteration",
141 )?;
142 }
143}
144
145#[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)]
146async fn run_iteration<Context>(
147 ctx: &mut Context,
148 view: &mut View,
149 metrics: &Metrics,
150) -> Result<()> {
151 loop {
152 match ctx.recv().await.map_err(FatalError::SubsystemReceive)? {
153 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
154 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
155 handle_active_leaves_update(&mut *ctx, view, update, metrics).await?;
156 },
157 FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
158 FromOrchestra::Communication { msg } => match msg {
159 ProspectiveParachainsMessage::IntroduceSecondedCandidate(request, tx) =>
160 handle_introduce_seconded_candidate(view, request, tx, metrics).await,
161 ProspectiveParachainsMessage::CandidateBacked(para, candidate_hash) =>
162 handle_candidate_backed(view, para, candidate_hash, metrics).await,
163 ProspectiveParachainsMessage::GetBackableCandidates(
164 relay_parent,
165 para,
166 count,
167 ancestors,
168 tx,
169 ) => answer_get_backable_candidates(&view, relay_parent, para, count, ancestors, tx),
170 ProspectiveParachainsMessage::GetHypotheticalMembership(request, tx) =>
171 answer_hypothetical_membership_request(&view, request, tx, metrics),
172 ProspectiveParachainsMessage::GetMinimumRelayParents(relay_parent, tx) =>
173 answer_minimum_relay_parents_request(&view, relay_parent, tx),
174 ProspectiveParachainsMessage::GetProspectiveValidationData(request, tx) =>
175 answer_prospective_validation_data_request(&view, request, tx),
176 },
177 }
178 }
179}
180
181#[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)]
182async fn handle_active_leaves_update<Context>(
183 ctx: &mut Context,
184 view: &mut View,
185 update: ActiveLeavesUpdate,
186 metrics: &Metrics,
187) -> JfyiErrorResult<()> {
188 let _timer = metrics.time_handle_active_leaves_update();
199
200 gum::trace!(
201 target: LOG_TARGET,
202 activated = ?update.activated,
203 deactivated = ?update.deactivated,
204 "Handle ActiveLeavesUpdate"
205 );
206
207 let mut temp_header_cache = HashMap::new();
208 for activated in update.activated.into_iter() {
210 if update.deactivated.contains(&activated.hash) {
211 continue
212 }
213
214 let hash = activated.hash;
215
216 let transposed_claim_queue =
217 transpose_claim_queue(fetch_claim_queue(ctx.sender(), hash).await?.0);
218
219 let block_info = match fetch_block_info(ctx, &mut temp_header_cache, hash).await? {
220 None => {
221 gum::warn!(
222 target: LOG_TARGET,
223 block_hash = ?hash,
224 "Failed to get block info for newly activated leaf block."
225 );
226
227 continue
231 },
232 Some(info) => info,
233 };
234
235 let session_index = request_session_index_for_child(hash, ctx.sender())
236 .await
237 .await
238 .map_err(JfyiError::RuntimeApiRequestCanceled)??;
239 let ancestry_len = fetch_scheduling_lookahead(hash, session_index, ctx.sender())
240 .await?
241 .saturating_sub(1);
242
243 let ancestry =
244 fetch_ancestry(ctx, &mut temp_header_cache, hash, ancestry_len as usize, session_index)
245 .await?;
246
247 let prev_fragment_chains =
248 ancestry.first().and_then(|prev_leaf| view.get_fragment_chains(&prev_leaf.hash));
249
250 let mut fragment_chains = HashMap::new();
251 for (para, claims_by_depth) in transposed_claim_queue.iter() {
252 let Some((constraints, pending_availability)) =
254 fetch_backing_constraints_and_candidates(ctx, hash, *para).await?
255 else {
256 gum::debug!(
258 target: LOG_TARGET,
259 para_id = ?para,
260 relay_parent = ?hash,
261 "Failed to get inclusion backing state."
262 );
263
264 continue
265 };
266
267 let pending_availability = preprocess_candidates_pending_availability(
268 ctx,
269 &mut temp_header_cache,
270 &constraints,
271 pending_availability,
272 )
273 .await?;
274 let mut compact_pending = Vec::with_capacity(pending_availability.len());
275
276 let mut pending_availability_storage = CandidateStorage::default();
277
278 for c in pending_availability {
279 let candidate_hash = c.compact.candidate_hash;
280 let res = pending_availability_storage.add_pending_availability_candidate(
281 candidate_hash,
282 c.candidate,
283 c.persisted_validation_data,
284 );
285
286 match res {
287 Ok(_) | Err(FragmentChainError::CandidateAlreadyKnown) => {},
288 Err(err) => {
289 gum::warn!(
290 target: LOG_TARGET,
291 ?candidate_hash,
292 para_id = ?para,
293 ?err,
294 "Scraped invalid candidate pending availability",
295 );
296
297 break
298 },
299 }
300
301 compact_pending.push(c.compact);
302 }
303
304 let max_backable_chain_len =
305 claims_by_depth.values().flatten().collect::<BTreeSet<_>>().len();
306 let scope = match FragmentChainScope::with_ancestors(
307 block_info.clone().into(),
308 constraints,
309 compact_pending,
310 max_backable_chain_len,
311 ancestry
312 .iter()
313 .map(|a| RelayChainBlockInfo::from(a.clone()))
314 .collect::<Vec<_>>(),
315 ) {
316 Ok(scope) => scope,
317 Err(unexpected_ancestors) => {
318 gum::warn!(
319 target: LOG_TARGET,
320 para_id = ?para,
321 max_backable_chain_len,
322 ?ancestry,
323 leaf = ?hash,
324 "Relay chain ancestors have wrong order: {:?}",
325 unexpected_ancestors
326 );
327 continue
328 },
329 };
330
331 gum::trace!(
332 target: LOG_TARGET,
333 relay_parent = ?hash,
334 min_relay_parent = scope.earliest_relay_parent().number,
335 max_backable_chain_len,
336 para_id = ?para,
337 ancestors = ?ancestry,
338 "Creating fragment chain"
339 );
340
341 let number_of_pending_candidates = pending_availability_storage.len();
342
343 let mut chain = FragmentChain::init(scope, pending_availability_storage);
345
346 if chain.best_chain_len() < number_of_pending_candidates {
347 gum::warn!(
348 target: LOG_TARGET,
349 relay_parent = ?hash,
350 para_id = ?para,
351 "Not all pending availability candidates could be introduced. Actual vs expected count: {}, {}",
352 chain.best_chain_len(),
353 number_of_pending_candidates
354 )
355 }
356
357 if let Some(prev_fragment_chain) =
360 prev_fragment_chains.and_then(|chains| chains.get(para))
361 {
362 chain.populate_from_previous(prev_fragment_chain);
363 }
364
365 gum::trace!(
366 target: LOG_TARGET,
367 relay_parent = ?hash,
368 para_id = ?para,
369 "Populated fragment chain with {} candidates: {:?}",
370 chain.best_chain_len(),
371 chain.best_chain_vec()
372 );
373
374 gum::trace!(
375 target: LOG_TARGET,
376 relay_parent = ?hash,
377 para_id = ?para,
378 "Potential candidate storage for para: {:?}",
379 chain.unconnected().map(|candidate| candidate.hash()).collect::<Vec<_>>()
380 );
381
382 fragment_chains.insert(*para, chain);
383 }
384
385 view.per_relay_parent.insert(hash, RelayBlockViewData { fragment_chains });
386
387 view.active_leaves.insert(hash);
388
389 view.implicit_view
390 .activate_leaf_from_prospective_parachains(block_info, &ancestry);
391 }
392
393 for deactivated in update.deactivated {
394 view.active_leaves.remove(&deactivated);
395 view.implicit_view.deactivate_leaf(deactivated);
396 }
397
398 {
399 let remaining: HashSet<_> = view.implicit_view.all_allowed_relay_parents().collect();
400
401 view.per_relay_parent.retain(|r, _| remaining.contains(&r));
402 }
403
404 if metrics.0.is_some() {
405 let mut active_connected = 0;
406 let mut active_unconnected = 0;
407 let mut candidates_in_implicit_view = 0;
408
409 for (hash, RelayBlockViewData { fragment_chains, .. }) in view.per_relay_parent.iter() {
410 if view.active_leaves.contains(hash) {
411 for chain in fragment_chains.values() {
412 active_connected += chain.best_chain_len();
413 active_unconnected += chain.unconnected_len();
414 }
415 } else {
416 for chain in fragment_chains.values() {
417 candidates_in_implicit_view += chain.best_chain_len();
418 candidates_in_implicit_view += chain.unconnected_len();
419 }
420 }
421 }
422
423 metrics.record_candidate_count(active_connected as u64, active_unconnected as u64);
424 metrics.record_candidate_count_in_implicit_view(candidates_in_implicit_view as u64);
425 }
426
427 let num_active_leaves = view.active_leaves.len() as u64;
428 let num_inactive_leaves =
429 (view.per_relay_parent.len() as u64).saturating_sub(num_active_leaves);
430 metrics.record_leaves_count(num_active_leaves, num_inactive_leaves);
431
432 Ok(())
433}
434
435struct ImportablePendingAvailability {
436 candidate: CommittedCandidateReceipt,
437 persisted_validation_data: PersistedValidationData,
438 compact: fragment_chain::PendingAvailability,
439}
440
441#[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)]
442async fn preprocess_candidates_pending_availability<Context>(
443 ctx: &mut Context,
444 cache: &mut HashMap<Hash, Header>,
445 constraints: &Constraints,
446 pending_availability: Vec<CommittedCandidateReceipt>,
447) -> JfyiErrorResult<Vec<ImportablePendingAvailability>> {
448 let mut required_parent = constraints.required_parent.clone();
449
450 let mut importable = Vec::new();
451 let expected_count = pending_availability.len();
452
453 for (i, pending) in pending_availability.into_iter().enumerate() {
454 let candidate_hash = pending.hash();
455 let Some(relay_parent) =
456 fetch_block_info(ctx, cache, pending.descriptor.relay_parent()).await?
457 else {
458 let para_id = pending.descriptor.para_id();
459 gum::debug!(
460 target: LOG_TARGET,
461 ?candidate_hash,
462 ?para_id,
463 index = ?i,
464 ?expected_count,
465 "Had to stop processing pending candidates early due to missing info.",
466 );
467
468 break
469 };
470
471 let next_required_parent = pending.commitments.head_data.clone();
472 importable.push(ImportablePendingAvailability {
473 candidate: CommittedCandidateReceipt {
474 descriptor: pending.descriptor,
475 commitments: pending.commitments,
476 },
477 persisted_validation_data: PersistedValidationData {
478 parent_head: required_parent,
479 max_pov_size: constraints.max_pov_size as _,
480 relay_parent_number: relay_parent.number,
481 relay_parent_storage_root: relay_parent.storage_root,
482 },
483 compact: fragment_chain::PendingAvailability {
484 candidate_hash,
485 relay_parent: relay_parent.into(),
486 },
487 });
488
489 required_parent = next_required_parent;
490 }
491
492 Ok(importable)
493}
494
495async fn handle_introduce_seconded_candidate(
496 view: &mut View,
497 request: IntroduceSecondedCandidateRequest,
498 tx: oneshot::Sender<bool>,
499 metrics: &Metrics,
500) {
501 let _timer = metrics.time_introduce_seconded_candidate();
502
503 let IntroduceSecondedCandidateRequest {
504 candidate_para: para,
505 candidate_receipt: candidate,
506 persisted_validation_data: pvd,
507 } = request;
508
509 let candidate_hash = candidate.hash();
510 let candidate_entry = match CandidateEntry::new_seconded(candidate_hash, candidate, pvd) {
511 Ok(candidate) => candidate,
512 Err(err) => {
513 gum::warn!(
514 target: LOG_TARGET,
515 para = ?para,
516 "Cannot add seconded candidate: {}",
517 err
518 );
519
520 let _ = tx.send(false);
521 return
522 },
523 };
524
525 let mut added = Vec::with_capacity(view.per_relay_parent.len());
526 let mut para_scheduled = false;
527 for (relay_parent, rp_data) in view.per_relay_parent.iter_mut() {
530 let Some(chain) = rp_data.fragment_chains.get_mut(¶) else { continue };
531 let is_active_leaf = view.active_leaves.contains(relay_parent);
532
533 para_scheduled = true;
534
535 match chain.try_adding_seconded_candidate(&candidate_entry) {
536 Ok(()) => {
537 added.push(*relay_parent);
538 },
539 Err(FragmentChainError::CandidateAlreadyKnown) => {
540 gum::trace!(
541 target: LOG_TARGET,
542 ?para,
543 ?relay_parent,
544 ?is_active_leaf,
545 "Attempting to introduce an already known candidate: {:?}",
546 candidate_hash
547 );
548 added.push(*relay_parent);
549 },
550 Err(err) => {
551 gum::trace!(
552 target: LOG_TARGET,
553 ?para,
554 ?relay_parent,
555 ?candidate_hash,
556 ?is_active_leaf,
557 "Cannot introduce seconded candidate: {}",
558 err
559 )
560 },
561 }
562 }
563
564 if !para_scheduled {
565 gum::warn!(
566 target: LOG_TARGET,
567 para_id = ?para,
568 ?candidate_hash,
569 "Received seconded candidate for inactive para",
570 );
571 }
572
573 if added.is_empty() {
574 gum::debug!(
575 target: LOG_TARGET,
576 para = ?para,
577 candidate = ?candidate_hash,
578 "Newly-seconded candidate cannot be kept under any relay parent",
579 );
580 } else {
581 gum::debug!(
582 target: LOG_TARGET,
583 ?para,
584 "Added/Kept seconded candidate {:?} on relay parents: {:?}",
585 candidate_hash,
586 added
587 );
588 }
589
590 let _ = tx.send(!added.is_empty());
591}
592
593async fn handle_candidate_backed(
594 view: &mut View,
595 para: ParaId,
596 candidate_hash: CandidateHash,
597 metrics: &Metrics,
598) {
599 let _timer = metrics.time_candidate_backed();
600
601 let mut found_candidate = false;
602 let mut found_para = false;
603
604 for (relay_parent, rp_data) in view.per_relay_parent.iter_mut() {
607 let Some(chain) = rp_data.fragment_chains.get_mut(¶) else { continue };
608 let is_active_leaf = view.active_leaves.contains(relay_parent);
609
610 found_para = true;
611 if chain.is_candidate_backed(&candidate_hash) {
612 gum::debug!(
613 target: LOG_TARGET,
614 ?para,
615 ?candidate_hash,
616 ?is_active_leaf,
617 "Received redundant instruction to mark as backed an already backed candidate",
618 );
619 found_candidate = true;
620 } else if chain.contains_unconnected_candidate(&candidate_hash) {
621 found_candidate = true;
622 chain.candidate_backed(&candidate_hash);
624
625 gum::trace!(
626 target: LOG_TARGET,
627 ?relay_parent,
628 ?para,
629 ?is_active_leaf,
630 "Candidate backed. Candidate chain for para: {:?}",
631 chain.best_chain_vec()
632 );
633
634 gum::trace!(
635 target: LOG_TARGET,
636 ?relay_parent,
637 ?para,
638 ?is_active_leaf,
639 "Potential candidate storage for para: {:?}",
640 chain.unconnected().map(|candidate| candidate.hash()).collect::<Vec<_>>()
641 );
642 }
643 }
644
645 if !found_para {
646 gum::warn!(
647 target: LOG_TARGET,
648 ?para,
649 ?candidate_hash,
650 "Received instruction to back a candidate for unscheduled para",
651 );
652
653 return
654 }
655
656 if !found_candidate {
657 gum::debug!(
660 target: LOG_TARGET,
661 ?para,
662 ?candidate_hash,
663 "Received instruction to back unknown candidate",
664 );
665 }
666}
667
668fn answer_get_backable_candidates(
669 view: &View,
670 relay_parent: Hash,
671 para: ParaId,
672 count: u32,
673 ancestors: Ancestors,
674 tx: oneshot::Sender<Vec<(CandidateHash, Hash)>>,
675) {
676 if !view.active_leaves.contains(&relay_parent) {
677 gum::debug!(
678 target: LOG_TARGET,
679 ?relay_parent,
680 para_id = ?para,
681 "Requested backable candidate for inactive relay-parent."
682 );
683
684 let _ = tx.send(vec![]);
685 return
686 }
687 let Some(data) = view.per_relay_parent.get(&relay_parent) else {
688 gum::debug!(
689 target: LOG_TARGET,
690 ?relay_parent,
691 para_id = ?para,
692 "Requested backable candidate for inexistent relay-parent."
693 );
694
695 let _ = tx.send(vec![]);
696 return
697 };
698
699 let Some(chain) = data.fragment_chains.get(¶) else {
700 gum::debug!(
701 target: LOG_TARGET,
702 ?relay_parent,
703 para_id = ?para,
704 "Requested backable candidate for inactive para."
705 );
706
707 let _ = tx.send(vec![]);
708 return
709 };
710
711 gum::trace!(
712 target: LOG_TARGET,
713 ?relay_parent,
714 para_id = ?para,
715 "Candidate chain for para: {:?}",
716 chain.best_chain_vec()
717 );
718
719 gum::trace!(
720 target: LOG_TARGET,
721 ?relay_parent,
722 para_id = ?para,
723 "Potential candidate storage for para: {:?}",
724 chain.unconnected().map(|candidate| candidate.hash()).collect::<Vec<_>>()
725 );
726
727 let backable_candidates = chain.find_backable_chain(ancestors.clone(), count);
728
729 if backable_candidates.is_empty() {
730 gum::trace!(
731 target: LOG_TARGET,
732 ?ancestors,
733 para_id = ?para,
734 %relay_parent,
735 "Could not find any backable candidate",
736 );
737 } else {
738 gum::trace!(
739 target: LOG_TARGET,
740 ?relay_parent,
741 ?backable_candidates,
742 ?ancestors,
743 "Found backable candidates",
744 );
745 }
746
747 let _ = tx.send(backable_candidates);
748}
749
750fn answer_hypothetical_membership_request(
751 view: &View,
752 request: HypotheticalMembershipRequest,
753 tx: oneshot::Sender<Vec<(HypotheticalCandidate, HypotheticalMembership)>>,
754 metrics: &Metrics,
755) {
756 let _timer = metrics.time_hypothetical_membership_request();
757
758 let mut response = Vec::with_capacity(request.candidates.len());
759 for candidate in request.candidates {
760 response.push((candidate, vec![]));
761 }
762
763 let required_active_leaf = request.fragment_chain_relay_parent;
764 for active_leaf in view
765 .active_leaves
766 .iter()
767 .filter(|h| required_active_leaf.as_ref().map_or(true, |x| h == &x))
768 {
769 let Some(leaf_view) = view.per_relay_parent.get(&active_leaf) else { continue };
770 for &mut (ref candidate, ref mut membership) in &mut response {
771 let para_id = &candidate.candidate_para();
772 let Some(fragment_chain) = leaf_view.fragment_chains.get(para_id) else { continue };
773
774 let res = fragment_chain.can_add_candidate_as_potential(candidate);
775 match res {
776 Err(FragmentChainError::CandidateAlreadyKnown) | Ok(()) => {
777 membership.push(*active_leaf);
778 },
779 Err(err) => {
780 gum::trace!(
781 target: LOG_TARGET,
782 para = ?para_id,
783 leaf = ?active_leaf,
784 candidate = ?candidate.candidate_hash(),
785 "Candidate is not a hypothetical member on: {}",
786 err
787 )
788 },
789 };
790 }
791 }
792
793 for (candidate, membership) in &response {
794 if membership.is_empty() {
795 gum::debug!(
796 target: LOG_TARGET,
797 para = ?candidate.candidate_para(),
798 active_leaves = ?view.active_leaves,
799 ?required_active_leaf,
800 candidate = ?candidate.candidate_hash(),
801 "Candidate is not a hypothetical member on any of the active leaves",
802 )
803 }
804 }
805
806 let _ = tx.send(response);
807}
808
809fn answer_minimum_relay_parents_request(
810 view: &View,
811 relay_parent: Hash,
812 tx: oneshot::Sender<Vec<(ParaId, BlockNumber)>>,
813) {
814 let mut v = Vec::new();
815 if view.active_leaves.contains(&relay_parent) {
816 if let Some(leaf_data) = view.per_relay_parent.get(&relay_parent) {
817 for (para_id, fragment_chain) in &leaf_data.fragment_chains {
818 v.push((*para_id, fragment_chain.scope().earliest_relay_parent().number));
819 }
820 }
821 }
822
823 let _ = tx.send(v);
824}
825
826fn answer_prospective_validation_data_request(
827 view: &View,
828 request: ProspectiveValidationDataRequest,
829 tx: oneshot::Sender<Option<PersistedValidationData>>,
830) {
831 let (mut head_data, parent_head_data_hash) = match request.parent_head_data {
834 ParentHeadData::OnlyHash(parent_head_data_hash) => (None, parent_head_data_hash),
835 ParentHeadData::WithData { head_data, hash } => (Some(head_data), hash),
836 };
837
838 let mut relay_parent_info = None;
839 let mut max_pov_size = None;
840
841 for fragment_chain in view.active_leaves.iter().filter_map(|x| {
842 view.per_relay_parent
843 .get(&x)
844 .and_then(|data| data.fragment_chains.get(&request.para_id))
845 }) {
846 if head_data.is_some() && relay_parent_info.is_some() && max_pov_size.is_some() {
847 break
848 }
849 if relay_parent_info.is_none() {
850 relay_parent_info = fragment_chain.scope().ancestor(&request.candidate_relay_parent);
851 }
852 if head_data.is_none() {
853 head_data = fragment_chain.get_head_data_by_hash(&parent_head_data_hash);
854 }
855 if max_pov_size.is_none() {
856 let contains_ancestor =
857 fragment_chain.scope().ancestor(&request.candidate_relay_parent).is_some();
858 if contains_ancestor {
859 max_pov_size = Some(fragment_chain.scope().base_constraints().max_pov_size);
864 }
865 }
866 }
867
868 let _ = tx.send(match (head_data, relay_parent_info, max_pov_size) {
869 (Some(h), Some(i), Some(m)) => Some(PersistedValidationData {
870 parent_head: h,
871 relay_parent_number: i.number,
872 relay_parent_storage_root: i.storage_root,
873 max_pov_size: m as _,
874 }),
875 _ => None,
876 });
877}
878
879#[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)]
880async fn fetch_backing_state<Context>(
881 ctx: &mut Context,
882 relay_parent: Hash,
883 para_id: ParaId,
884) -> JfyiErrorResult<Option<(Constraints, Vec<CommittedCandidateReceipt>)>> {
885 let (tx, rx) = oneshot::channel();
886 ctx.send_message(RuntimeApiMessage::Request(
887 relay_parent,
888 RuntimeApiRequest::ParaBackingState(para_id, tx),
889 ))
890 .await;
891
892 Ok(rx.await.map_err(JfyiError::RuntimeApiRequestCanceled)??.map(|s| {
893 (
894 From::from(s.constraints),
895 s.pending_availability
896 .into_iter()
897 .map(|c| CommittedCandidateReceipt {
898 descriptor: c.descriptor,
899 commitments: c.commitments,
900 })
901 .collect(),
902 )
903 }))
904}
905
906#[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)]
907async fn fetch_backing_constraints_and_candidates<Context>(
908 ctx: &mut Context,
909 relay_parent: Hash,
910 para_id: ParaId,
911) -> JfyiErrorResult<Option<(Constraints, Vec<CommittedCandidateReceipt>)>> {
912 match fetch_backing_constraints_and_candidates_inner(ctx, relay_parent, para_id).await {
913 Err(error) => {
914 gum::debug!(
915 target: LOG_TARGET,
916 ?para_id,
917 ?relay_parent,
918 ?error,
919 "Failed to get constraints and candidates pending availability."
920 );
921
922 fetch_backing_state(ctx, relay_parent, para_id).await
924 },
925 Ok(maybe_constraints_and_candidatest) => Ok(maybe_constraints_and_candidatest),
926 }
927}
928
929#[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)]
930async fn fetch_backing_constraints_and_candidates_inner<Context>(
931 ctx: &mut Context,
932 relay_parent: Hash,
933 para_id: ParaId,
934) -> JfyiErrorResult<Option<(Constraints, Vec<CommittedCandidateReceipt>)>> {
935 let maybe_constraints = request_backing_constraints(relay_parent, para_id, ctx.sender())
936 .await
937 .await
938 .map_err(JfyiError::RuntimeApiRequestCanceled)??;
939
940 let Some(constraints) = maybe_constraints else { return Ok(None) };
941
942 let pending_availability =
943 request_candidates_pending_availability(relay_parent, para_id, ctx.sender())
944 .await
945 .await
946 .map_err(JfyiError::RuntimeApiRequestCanceled)??;
947
948 Ok(Some((From::from(constraints), pending_availability)))
949}
950
951#[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)]
953async fn fetch_ancestry<Context>(
954 ctx: &mut Context,
955 cache: &mut HashMap<Hash, Header>,
956 relay_hash: Hash,
957 ancestors: usize,
958 required_session: u32,
959) -> JfyiErrorResult<Vec<BlockInfo>> {
960 if ancestors == 0 {
961 return Ok(Vec::new())
962 }
963
964 let (tx, rx) = oneshot::channel();
965 ctx.send_message(ChainApiMessage::Ancestors {
966 hash: relay_hash,
967 k: ancestors,
968 response_channel: tx,
969 })
970 .await;
971
972 let hashes = rx.map_err(JfyiError::ChainApiRequestCanceled).await??;
973
974 let mut block_info = Vec::with_capacity(hashes.len());
975 for hash in hashes {
976 let info = match fetch_block_info(ctx, cache, hash).await? {
977 None => {
978 gum::warn!(
979 target: LOG_TARGET,
980 relay_hash = ?hash,
981 "Failed to fetch info for hash returned from ancestry.",
982 );
983
984 break
986 },
987 Some(info) => info,
988 };
989
990 let session = request_session_index_for_child(hash, ctx.sender())
995 .await
996 .await
997 .map_err(JfyiError::RuntimeApiRequestCanceled)??;
998
999 if session == required_session {
1000 block_info.push(info);
1001 } else {
1002 break
1003 }
1004 }
1005
1006 Ok(block_info)
1007}
1008
1009#[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)]
1010async fn fetch_block_header_with_cache<Context>(
1011 ctx: &mut Context,
1012 cache: &mut HashMap<Hash, Header>,
1013 relay_hash: Hash,
1014) -> JfyiErrorResult<Option<Header>> {
1015 if let Some(h) = cache.get(&relay_hash) {
1016 return Ok(Some(h.clone()))
1017 }
1018
1019 let (tx, rx) = oneshot::channel();
1020
1021 ctx.send_message(ChainApiMessage::BlockHeader(relay_hash, tx)).await;
1022 let header = rx.map_err(JfyiError::ChainApiRequestCanceled).await??;
1023 if let Some(ref h) = header {
1024 cache.insert(relay_hash, h.clone());
1025 }
1026 Ok(header)
1027}
1028
1029#[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)]
1030async fn fetch_block_info<Context>(
1031 ctx: &mut Context,
1032 cache: &mut HashMap<Hash, Header>,
1033 relay_hash: Hash,
1034) -> JfyiErrorResult<Option<BlockInfo>> {
1035 let header = fetch_block_header_with_cache(ctx, cache, relay_hash).await?;
1036
1037 Ok(header.map(|header| BlockInfo {
1038 hash: relay_hash,
1039 number: header.number,
1040 parent_hash: header.parent_hash,
1041 storage_root: header.state_root,
1042 }))
1043}