1#![deny(unused_crate_dependencies)]
67
68use std::{
69 collections::{HashMap, HashSet},
70 sync::Arc,
71};
72
73use bitvec::vec::BitVec;
74use futures::{
75 channel::{mpsc, oneshot},
76 future::BoxFuture,
77 stream::FuturesOrdered,
78 FutureExt, SinkExt, StreamExt, TryFutureExt,
79};
80use schnellru::{ByLength, LruMap};
81
82use error::{Error, FatalResult};
83use polkadot_node_primitives::{
84 AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
85 ValidationResult, DISPUTE_WINDOW,
86};
87use polkadot_node_subsystem::{
88 messages::{
89 AvailabilityDistributionMessage, AvailabilityStoreMessage, BackableCandidateRef,
90 CanSecondRequest, CandidateBackingMessage, CandidateValidationMessage,
91 CollatorProtocolMessage, HypotheticalCandidate, HypotheticalMembershipRequest,
92 IntroduceSecondedCandidateRequest, ProspectiveParachainsMessage, ProvisionableData,
93 ProvisionerMessage, PvfExecKind, RuntimeApiMessage, RuntimeApiRequest,
94 StatementDistributionMessage, StoreAvailableDataError,
95 },
96 overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
97 SubsystemError,
98};
99use polkadot_node_subsystem_util::{
100 self as util,
101 backing_implicit_view::View as ImplicitView,
102 request_claim_queue, request_disabled_validators, request_min_backing_votes,
103 request_node_features, request_session_index_for_child, request_validator_groups,
104 request_validators,
105 runtime::{self, ClaimQueueSnapshot},
106 Validator,
107};
108use polkadot_parachain_primitives::primitives::IsSystem;
109use polkadot_primitives::{
110 node_features::FeatureIndex, BackedCandidate, CandidateCommitments, CandidateHash,
111 CandidateReceiptV2 as CandidateReceipt,
112 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, GroupIndex,
113 GroupRotationInfo, Hash, Id as ParaId, IndexedVec, NodeFeatures, PersistedValidationData,
114 SessionIndex, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, ValidatorSignature,
115 ValidityAttestation,
116};
117use polkadot_statement_table::{
118 generic::AttestedCandidate as TableAttestedCandidate,
119 v2::{
120 SignedStatement as TableSignedStatement, Statement as TableStatement,
121 Summary as TableSummary,
122 },
123 Context as TableContextTrait, Table,
124};
125use sp_keystore::KeystorePtr;
126
127mod error;
128
129mod metrics;
130use self::metrics::Metrics;
131
132#[cfg(test)]
133mod tests;
134
135const LOG_TARGET: &str = "parachain::candidate-backing";
136
137enum PoVData {
139 Ready(Arc<PoV>),
141 FetchFromValidator {
143 from_validator: ValidatorIndex,
144 candidate_hash: CandidateHash,
145 pov_hash: Hash,
146 },
147}
148
149enum ValidatedCandidateCommand {
150 Second(BackgroundValidationResult),
152 Attest(BackgroundValidationResult),
154 AttestNoPoV(CandidateHash),
156}
157
158impl std::fmt::Debug for ValidatedCandidateCommand {
159 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
160 let candidate_hash = self.candidate_hash();
161 match *self {
162 ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
163 ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
164 ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
165 }
166 }
167}
168
169impl ValidatedCandidateCommand {
170 fn candidate_hash(&self) -> CandidateHash {
171 match *self {
172 ValidatedCandidateCommand::Second(Ok(ref outputs)) => outputs.candidate.hash(),
173 ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
174 ValidatedCandidateCommand::Attest(Ok(ref outputs)) => outputs.candidate.hash(),
175 ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
176 ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
177 }
178 }
179}
180
181pub struct CandidateBackingSubsystem {
183 keystore: KeystorePtr,
184 metrics: Metrics,
185}
186
187impl CandidateBackingSubsystem {
188 pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
190 Self { keystore, metrics }
191 }
192}
193
194#[overseer::subsystem(CandidateBacking, error = SubsystemError, prefix = self::overseer)]
195impl<Context> CandidateBackingSubsystem
196where
197 Context: Send + Sync,
198{
199 fn start(self, ctx: Context) -> SpawnedSubsystem {
200 let future = async move {
201 run(ctx, self.keystore, self.metrics)
202 .await
203 .map_err(|e| SubsystemError::with_origin("candidate-backing", e))
204 }
205 .boxed();
206
207 SpawnedSubsystem { name: "candidate-backing-subsystem", future }
208 }
209}
210
211struct PerSchedulingParentState {
212 parent: Hash,
214 node_features: NodeFeatures,
216 assigned_core: Option<CoreIndex>,
218 backed: HashSet<CandidateHash>,
220 table: Table<TableContext>,
222 table_context: TableContext,
224 issued_statements: HashSet<CandidateHash>,
226 awaiting_validation: HashSet<CandidateHash>,
228 fallbacks: HashMap<CandidateHash, AttestingData>,
230 minimum_backing_votes: u32,
232 n_cores: u32,
234 claim_queue: ClaimQueueSnapshot,
237 validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
239 session_index: SessionIndex,
242 group_rotation_info: GroupRotationInfo,
244}
245
246struct PerCandidateState {
247 persisted_validation_data: PersistedValidationData,
248 seconded_locally: bool,
249 scheduling_parent: Hash,
250}
251
252struct PerSessionCache {
255 validators_cache: LruMap<SessionIndex, Arc<Vec<ValidatorId>>>,
257 node_features_cache: LruMap<SessionIndex, NodeFeatures>,
259 minimum_backing_votes_cache: LruMap<SessionIndex, u32>,
261 validator_to_group_cache:
263 LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
264}
265
266impl Default for PerSessionCache {
267 fn default() -> Self {
272 Self::new(DISPUTE_WINDOW.get())
273 }
274}
275
276impl PerSessionCache {
277 fn new(capacity: u32) -> Self {
279 PerSessionCache {
280 validators_cache: LruMap::new(ByLength::new(capacity)),
281 node_features_cache: LruMap::new(ByLength::new(capacity)),
282 minimum_backing_votes_cache: LruMap::new(ByLength::new(capacity)),
283 validator_to_group_cache: LruMap::new(ByLength::new(capacity)),
284 }
285 }
286
287 async fn validators(
289 &mut self,
290 session_index: SessionIndex,
291 parent: Hash,
292 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
293 ) -> Result<Arc<Vec<ValidatorId>>, RuntimeApiError> {
294 if let Some(validators) = self.validators_cache.get(&session_index) {
296 return Ok(Arc::clone(validators));
297 }
298
299 let validators: Vec<ValidatorId> =
301 request_validators(parent, sender).await.await.map_err(|err| {
302 RuntimeApiError::Execution { runtime_api_name: "Validators", source: Arc::new(err) }
303 })??;
304
305 let validators = Arc::new(validators);
307
308 self.validators_cache.insert(session_index, Arc::clone(&validators));
310
311 Ok(validators)
312 }
313
314 async fn node_features(
316 &mut self,
317 session_index: SessionIndex,
318 parent: Hash,
319 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
320 ) -> Result<NodeFeatures, RuntimeApiError> {
321 if let Some(node_features) = self.node_features_cache.get(&session_index) {
323 return Ok(node_features.clone());
324 }
325
326 let node_features = request_node_features(parent, session_index, sender)
328 .await
329 .await
330 .map_err(|err| RuntimeApiError::Execution {
331 runtime_api_name: "NodeFeatures",
332 source: Arc::new(err),
333 })??;
334
335 self.node_features_cache.insert(session_index, node_features.clone());
337
338 Ok(node_features)
339 }
340
341 async fn minimum_backing_votes(
344 &mut self,
345 session_index: SessionIndex,
346 parent: Hash,
347 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
348 ) -> Result<u32, RuntimeApiError> {
349 if let Some(minimum_backing_votes) = self.minimum_backing_votes_cache.get(&session_index) {
351 return Ok(*minimum_backing_votes);
352 }
353
354 let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender)
356 .await
357 .await
358 .map_err(|err| RuntimeApiError::Execution {
359 runtime_api_name: "MinimumBackingVotes",
360 source: Arc::new(err),
361 })??;
362
363 self.minimum_backing_votes_cache.insert(session_index, minimum_backing_votes);
365
366 Ok(minimum_backing_votes)
367 }
368
369 fn validator_to_group(
371 &mut self,
372 session_index: SessionIndex,
373 validators: &[ValidatorId],
374 validator_groups: &[Vec<ValidatorIndex>],
375 ) -> Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>> {
376 let validator_to_group = self
377 .validator_to_group_cache
378 .get_or_insert(session_index, || {
379 let mut vector = vec![None; validators.len()];
380
381 for (group_idx, validator_group) in validator_groups.iter().enumerate() {
382 for validator in validator_group {
383 vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
384 }
385 }
386
387 Arc::new(IndexedVec::<_, _>::from(vector))
388 })
389 .expect("Just inserted");
390
391 Arc::clone(validator_to_group)
392 }
393}
394
395struct State {
397 implicit_view: ImplicitView,
399 per_scheduling_parent: HashMap<Hash, PerSchedulingParentState>,
402 per_candidate: HashMap<CandidateHash, PerCandidateState>,
407 per_session_cache: PerSessionCache,
410 background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
413 keystore: KeystorePtr,
415 v3_ever_seen: bool,
426}
427
428impl State {
429 fn new(
430 background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
431 keystore: KeystorePtr,
432 ) -> Self {
433 State {
434 implicit_view: ImplicitView::default(),
435 per_scheduling_parent: HashMap::default(),
436 per_candidate: HashMap::new(),
437 per_session_cache: PerSessionCache::default(),
438 background_validation_tx,
439 keystore,
440 v3_ever_seen: false,
441 }
442 }
443}
444
445#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
446async fn run<Context>(
447 mut ctx: Context,
448 keystore: KeystorePtr,
449 metrics: Metrics,
450) -> FatalResult<()> {
451 let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
452 let mut state = State::new(background_validation_tx, keystore);
453
454 loop {
455 let res =
456 run_iteration(&mut ctx, &mut state, &metrics, &mut background_validation_rx).await;
457
458 match res {
459 Ok(()) => break,
460 Err(e) => crate::error::log_error(Err(e))?,
461 }
462 }
463
464 Ok(())
465}
466
467#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
468async fn run_iteration<Context>(
469 ctx: &mut Context,
470 state: &mut State,
471 metrics: &Metrics,
472 background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
473) -> Result<(), Error> {
474 loop {
475 futures::select!(
476 validated_command = background_validation_rx.next().fuse() => {
477 if let Some((scheduling_parent, command)) = validated_command {
478 handle_validated_candidate_command(
479 &mut *ctx,
480 state,
481 scheduling_parent,
482 command,
483 metrics,
484 ).await?;
485 } else {
486 panic!("background_validation_tx always alive at this point; qed");
487 }
488 }
489 from_overseer = ctx.recv().fuse() => {
490 match from_overseer.map_err(Error::OverseerExited)? {
491 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
492 handle_active_leaves_update(
493 &mut *ctx,
494 update,
495 state,
496 ).await?;
497 }
498 FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, _number)) => {
499 if !state.v3_ever_seen {
500 check_v3_on_finalized(&mut *ctx, state, hash).await?;
501 }
502 }
503 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
504 FromOrchestra::Communication { msg } => {
505 handle_communication(&mut *ctx, state, msg, metrics).await?;
506 }
507 }
508 }
509 )
510 }
511}
512
513#[derive(Clone)]
519struct AttestingData {
520 candidate: CandidateReceipt,
522 pov_hash: Hash,
524 from_validator: ValidatorIndex,
526 backing: Vec<ValidatorIndex>,
528}
529
530#[derive(Default, Debug)]
531struct TableContext {
532 validator: Option<Validator>,
533 groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
534 validators: Vec<ValidatorId>,
535 disabled_validators: Vec<ValidatorIndex>,
536}
537
538impl TableContext {
539 pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
541 self.disabled_validators
542 .iter()
543 .any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
544 }
545
546 pub fn local_validator_is_disabled(&self) -> Option<bool> {
548 self.validator.as_ref().map(|v| v.disabled())
549 }
550}
551
552impl TableContextTrait for TableContext {
553 type AuthorityId = ValidatorIndex;
554 type Digest = CandidateHash;
555 type GroupId = CoreIndex;
556 type Signature = ValidatorSignature;
557 type Candidate = CommittedCandidateReceipt;
558
559 fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
560 candidate.hash()
561 }
562
563 fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
564 self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
565 }
566
567 fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
568 self.groups.get(group).map(|g| g.len())
569 }
570}
571
572fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedStatement {
575 let statement = match s.payload() {
576 StatementWithPVD::Seconded(c, _) => TableStatement::Seconded(c.clone()),
577 StatementWithPVD::Valid(h) => TableStatement::Valid(*h),
578 };
579
580 TableSignedStatement {
581 statement,
582 signature: s.signature().clone(),
583 sender: s.validator_index(),
584 }
585}
586
587fn table_attested_to_backed(
588 attested: TableAttestedCandidate<
589 CoreIndex,
590 CommittedCandidateReceipt,
591 ValidatorIndex,
592 ValidatorSignature,
593 >,
594 table_context: &TableContext,
595) -> Option<BackedCandidate> {
596 let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
597
598 let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
599 validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
600
601 let group = table_context.groups.get(&core_index)?;
602
603 let mut validator_indices = BitVec::with_capacity(group.len());
604
605 validator_indices.resize(group.len(), false);
606
607 let mut vote_positions = Vec::with_capacity(validity_votes.len());
611 for (orig_idx, id) in ids.iter().enumerate() {
612 if let Some(position) = group.iter().position(|x| x == id) {
613 validator_indices.set(position, true);
614 vote_positions.push((orig_idx, position));
615 } else {
616 gum::warn!(
617 target: LOG_TARGET,
618 "Logic error: Validity vote from table does not correspond to group",
619 );
620
621 return None;
622 }
623 }
624 vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
625
626 Some(BackedCandidate::new(
627 candidate,
628 vote_positions
629 .into_iter()
630 .map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
631 .collect(),
632 validator_indices,
633 core_index,
634 ))
635}
636
637async fn store_available_data(
638 sender: &mut impl overseer::CandidateBackingSenderTrait,
639 n_validators: u32,
640 candidate_hash: CandidateHash,
641 available_data: AvailableData,
642 expected_erasure_root: Hash,
643 core_index: CoreIndex,
644 node_features: NodeFeatures,
645) -> Result<(), Error> {
646 let (tx, rx) = oneshot::channel();
647 sender
652 .send_message(AvailabilityStoreMessage::StoreAvailableData {
653 candidate_hash,
654 n_validators,
655 available_data,
656 expected_erasure_root,
657 core_index,
658 node_features,
659 tx,
660 })
661 .await;
662
663 rx.await
664 .map_err(Error::StoreAvailableDataChannel)?
665 .map_err(Error::StoreAvailableData)
666}
667
668async fn make_pov_available(
676 sender: &mut impl overseer::CandidateBackingSenderTrait,
677 n_validators: usize,
678 pov: Arc<PoV>,
679 candidate_hash: CandidateHash,
680 validation_data: PersistedValidationData,
681 expected_erasure_root: Hash,
682 core_index: CoreIndex,
683 node_features: NodeFeatures,
684) -> Result<(), Error> {
685 store_available_data(
686 sender,
687 n_validators as u32,
688 candidate_hash,
689 AvailableData { pov, validation_data },
690 expected_erasure_root,
691 core_index,
692 node_features,
693 )
694 .await
695}
696
697async fn request_pov(
698 sender: &mut impl overseer::CandidateBackingSenderTrait,
699 relay_parent: Hash,
700 from_validator: ValidatorIndex,
701 para_id: ParaId,
702 candidate_hash: CandidateHash,
703 pov_hash: Hash,
704) -> Result<Arc<PoV>, Error> {
705 let (tx, rx) = oneshot::channel();
706 sender
707 .send_message(AvailabilityDistributionMessage::FetchPoV {
708 relay_parent,
709 from_validator,
710 para_id,
711 candidate_hash,
712 pov_hash,
713 tx,
714 })
715 .await;
716
717 let pov = rx.await.map_err(|_| Error::FetchPoV)?;
718 Ok(Arc::new(pov))
719}
720
721async fn request_candidate_validation(
722 sender: &mut impl overseer::CandidateBackingSenderTrait,
723 validation_data: PersistedValidationData,
724 validation_code: ValidationCode,
725 candidate_receipt: CandidateReceipt,
726 pov: Arc<PoV>,
727 session_index: SessionIndex,
728) -> Result<ValidationResult, Error> {
729 let (tx, rx) = oneshot::channel();
730 let is_system = candidate_receipt.descriptor.para_id().is_system();
731 let scheduling_parent = candidate_receipt.descriptor.scheduling_parent();
733
734 sender
735 .send_message(CandidateValidationMessage::ValidateFromExhaustive {
736 validation_data,
737 validation_code,
738 candidate_receipt,
739 pov,
740 scheduling_session_index: session_index,
741 exec_kind: if is_system {
742 PvfExecKind::BackingSystemParas(scheduling_parent)
743 } else {
744 PvfExecKind::Backing(scheduling_parent)
745 },
746 response_sender: tx,
747 })
748 .await;
749
750 match rx.await {
751 Ok(Ok(validation_result)) => Ok(validation_result),
752 Ok(Err(err)) => Err(Error::ValidationFailed(err)),
753 Err(err) => Err(Error::ValidateFromExhaustive(err)),
754 }
755}
756
757struct BackgroundValidationOutputs {
758 candidate: CandidateReceipt,
759 commitments: CandidateCommitments,
760 persisted_validation_data: PersistedValidationData,
761}
762
763type BackgroundValidationResult = Result<BackgroundValidationOutputs, CandidateReceipt>;
764
765struct BackgroundValidationParams<S: overseer::CandidateBackingSenderTrait, F> {
766 sender: S,
767 tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
768 candidate: CandidateReceipt,
769 scheduling_parent: Hash,
773 session_index: SessionIndex,
774 node_features: NodeFeatures,
775 persisted_validation_data: PersistedValidationData,
776 pov: PoVData,
777 n_validators: usize,
778 make_command: F,
779}
780
781async fn validate_and_make_available(
782 params: BackgroundValidationParams<
783 impl overseer::CandidateBackingSenderTrait,
784 impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
785 >,
786 core_index: CoreIndex,
787) -> Result<(), Error> {
788 let BackgroundValidationParams {
789 mut sender,
790 mut tx_command,
791 candidate,
792 scheduling_parent,
793 session_index,
794 node_features,
795 persisted_validation_data,
796 pov,
797 n_validators,
798 make_command,
799 } = params;
800
801 let validation_code = {
802 let validation_code_hash = candidate.descriptor().validation_code_hash();
803 let (tx, rx) = oneshot::channel();
804 sender
805 .send_message(RuntimeApiMessage::Request(
806 scheduling_parent,
807 RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
808 ))
809 .await;
810
811 let code = rx.await.map_err(Error::RuntimeApiUnavailable)?;
812 match code {
813 Err(e) => return Err(Error::FetchValidationCode(validation_code_hash, e)),
814 Ok(None) => return Err(Error::NoValidationCode(validation_code_hash)),
815 Ok(Some(c)) => c,
816 }
817 };
818
819 let pov = match pov {
820 PoVData::Ready(pov) => pov,
821 PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } => {
822 match request_pov(
823 &mut sender,
824 scheduling_parent,
825 from_validator,
826 candidate.descriptor.para_id(),
827 candidate_hash,
828 pov_hash,
829 )
830 .await
831 {
832 Err(Error::FetchPoV) => {
833 tx_command
834 .send((
835 scheduling_parent,
836 ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
837 ))
838 .await
839 .map_err(Error::BackgroundValidationMpsc)?;
840 return Ok(());
841 },
842 Err(err) => return Err(err),
843 Ok(pov) => pov,
844 }
845 },
846 };
847
848 let v = {
849 request_candidate_validation(
850 &mut sender,
851 persisted_validation_data,
852 validation_code,
853 candidate.clone(),
854 pov.clone(),
855 session_index,
856 )
857 .await?
858 };
859
860 let res = match v {
861 ValidationResult::Valid(commitments, validation_data) => {
862 gum::debug!(
863 target: LOG_TARGET,
864 candidate_hash = ?candidate.hash(),
865 "Validation successful",
866 );
867
868 let erasure_valid = make_pov_available(
869 &mut sender,
870 n_validators,
871 pov.clone(),
872 candidate.hash(),
873 validation_data.clone(),
874 candidate.descriptor.erasure_root(),
875 core_index,
876 node_features,
877 )
878 .await;
879
880 match erasure_valid {
881 Ok(()) => Ok(BackgroundValidationOutputs {
882 candidate,
883 commitments,
884 persisted_validation_data: validation_data,
885 }),
886 Err(Error::StoreAvailableData(StoreAvailableDataError::InvalidErasureRoot)) => {
887 gum::debug!(
888 target: LOG_TARGET,
889 candidate_hash = ?candidate.hash(),
890 actual_commitments = ?commitments,
891 "Erasure root doesn't match the announced by the candidate receipt",
892 );
893 Err(candidate)
894 },
895 Err(e) => return Err(e),
897 }
898 },
899 ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch) => {
900 gum::warn!(
902 target: LOG_TARGET,
903 candidate_hash = ?candidate.hash(),
904 "Validation yielded different commitments",
905 );
906 Err(candidate)
907 },
908 ValidationResult::Invalid(reason) => {
909 gum::warn!(
910 target: LOG_TARGET,
911 candidate_hash = ?candidate.hash(),
912 reason = ?reason,
913 "Validation yielded an invalid candidate",
914 );
915 Err(candidate)
916 },
917 };
918
919 tx_command
920 .send((scheduling_parent, make_command(res)))
921 .await
922 .map_err(Into::into)
923}
924
925#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
926async fn handle_communication<Context>(
927 ctx: &mut Context,
928 state: &mut State,
929 message: CandidateBackingMessage,
930 metrics: &Metrics,
931) -> Result<(), Error> {
932 match message {
933 CandidateBackingMessage::Second { scheduling_parent: _, candidate, pvd, pov } => {
934 handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
935 },
936 CandidateBackingMessage::Statement { scheduling_parent, statement } => {
937 handle_statement_message(ctx, state, scheduling_parent, statement, metrics).await?;
938 },
939 CandidateBackingMessage::GetBackableCandidates { candidates, sender } => {
940 handle_get_backable_candidates_message(state, candidates, sender, metrics)?
941 },
942 CandidateBackingMessage::CanSecond(request, tx) => {
943 handle_can_second_request(ctx, state, request, tx).await
944 },
945 }
946
947 Ok(())
948}
949
950#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
951async fn handle_active_leaves_update<Context>(
952 ctx: &mut Context,
953 update: ActiveLeavesUpdate,
954 state: &mut State,
955) -> Result<(), Error> {
956 let res = if let Some(leaf) = update.activated {
959 let leaf_hash = leaf.hash;
960 Some((leaf, state.implicit_view.activate_leaf(ctx.sender(), leaf_hash).await.map(|_| ())))
961 } else {
962 None
963 };
964
965 for deactivated in update.deactivated {
966 state.implicit_view.deactivate_leaf(deactivated);
967 }
968
969 {
973 let remaining: HashSet<_> = state.implicit_view.all_allowed_relay_parents().collect();
974
975 state.per_scheduling_parent.retain(|r, _| remaining.contains(&r));
976 }
977
978 state
981 .per_candidate
982 .retain(|_, pc| state.per_scheduling_parent.contains_key(&pc.scheduling_parent));
983
984 let fresh_relay_parents = match res {
987 None => return Ok(()),
988 Some((leaf, Ok(_))) => {
989 let fresh_relay_parents =
990 state.implicit_view.known_allowed_relay_parents_under(&leaf.hash);
991
992 let fresh_relay_parent = match fresh_relay_parents {
993 Some(f) => f.to_vec(),
994 None => {
995 gum::warn!(
996 target: LOG_TARGET,
997 leaf_hash = ?leaf.hash,
998 "Implicit view gave no relay-parents"
999 );
1000
1001 vec![leaf.hash]
1002 },
1003 };
1004 fresh_relay_parent
1005 },
1006 Some((leaf, Err(e))) => {
1007 gum::debug!(
1008 target: LOG_TARGET,
1009 leaf_hash = ?leaf.hash,
1010 err = ?e,
1011 "Failed to load implicit view for leaf."
1012 );
1013
1014 return Ok(());
1015 },
1016 };
1017
1018 for maybe_new in fresh_relay_parents {
1020 if state.per_scheduling_parent.contains_key(&maybe_new) {
1021 continue;
1022 }
1023
1024 let per = construct_per_scheduling_parent_state(
1027 ctx,
1028 maybe_new,
1029 &state.keystore,
1030 &mut state.per_session_cache,
1031 )
1032 .await?;
1033
1034 if let Some(per) = per {
1035 state.per_scheduling_parent.insert(maybe_new, per);
1036 }
1037 }
1038
1039 Ok(())
1040}
1041
1042#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1045async fn check_v3_on_finalized<Context>(
1046 ctx: &mut Context,
1047 state: &mut State,
1048 finalized_hash: Hash,
1049) -> Result<(), Error> {
1050 let session_index = request_session_index_for_child(finalized_hash, ctx.sender())
1051 .await
1052 .await
1053 .map_err(runtime::Error::from)?
1054 .map_err(runtime::Error::from)?;
1055
1056 let node_features = state
1057 .per_session_cache
1058 .node_features(session_index, finalized_hash, ctx.sender())
1059 .await
1060 .map_err(runtime::Error::from)?;
1061
1062 if FeatureIndex::CandidateReceiptV3.is_set(&node_features) {
1063 gum::info!(
1064 target: LOG_TARGET,
1065 ?session_index,
1066 "CandidateReceiptV3 node feature detected in finalized block, \
1067 enabling V3 candidate support",
1068 );
1069 state.v3_ever_seen = true;
1070 }
1071
1072 Ok(())
1073}
1074
1075macro_rules! try_runtime_api {
1076 ($x: expr) => {
1077 match $x {
1078 Ok(x) => x,
1079 Err(err) => {
1080 error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
1082
1083 return Ok(None);
1087 },
1088 }
1089 };
1090}
1091
1092fn core_index_from_statement(
1093 sp_state: &PerSchedulingParentState,
1094 statement: &SignedFullStatementWithPVD,
1095) -> Option<CoreIndex> {
1096 let compact_statement = statement.as_unchecked();
1097 let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
1098
1099 gum::trace!(
1100 target: LOG_TARGET,
1101 group_rotation_info = ?sp_state.group_rotation_info,
1102 ?statement,
1103 validator_to_group = ?sp_state.validator_to_group,
1104 n_cores = sp_state.n_cores,
1105 ?candidate_hash,
1106 "Extracting core index from statement"
1107 );
1108
1109 let statement_validator_index = statement.validator_index();
1110 let Some(Some(group_index)) = sp_state.validator_to_group.get(statement_validator_index) else {
1111 gum::debug!(
1112 target: LOG_TARGET,
1113 group_rotation_info = ?sp_state.group_rotation_info,
1114 ?statement,
1115 validator_to_group = ?sp_state.validator_to_group,
1116 n_cores = sp_state.n_cores,
1117 ?candidate_hash,
1118 "Invalid validator index: {:?}",
1119 statement_validator_index
1120 );
1121 return None;
1122 };
1123
1124 let core_index =
1126 sp_state.group_rotation_info.core_for_group(*group_index, sp_state.n_cores as _);
1127
1128 if core_index.0 > sp_state.n_cores {
1129 gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores = sp_state.n_cores, "Invalid CoreIndex");
1130 return None;
1131 }
1132
1133 if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
1134 let candidate_para_id = candidate.descriptor.para_id();
1135 let mut assigned_paras = sp_state.claim_queue.iter_claims_for_core(&core_index);
1136
1137 if !assigned_paras.any(|id| id == &candidate_para_id) {
1138 gum::debug!(
1139 target: LOG_TARGET,
1140 ?candidate_hash,
1141 ?core_index,
1142 assigned_paras = ?sp_state.claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
1143 ?candidate_para_id,
1144 "Invalid CoreIndex, core is not assigned to this para_id"
1145 );
1146 return None;
1147 }
1148 Some(core_index)
1149 } else {
1150 Some(core_index)
1151 }
1152}
1153
1154#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1156async fn construct_per_scheduling_parent_state<Context>(
1157 ctx: &mut Context,
1158 relay_parent: Hash,
1159 keystore: &KeystorePtr,
1160 per_session_cache: &mut PerSessionCache,
1161) -> Result<Option<PerSchedulingParentState>, Error> {
1162 let parent = relay_parent;
1163
1164 let (session_index, groups, claim_queue, disabled_validators) = futures::try_join!(
1165 request_session_index_for_child(parent, ctx.sender()).await,
1166 request_validator_groups(parent, ctx.sender()).await,
1167 request_claim_queue(parent, ctx.sender()).await,
1168 request_disabled_validators(parent, ctx.sender()).await,
1169 )
1170 .map_err(Error::JoinMultiple)?;
1171
1172 let session_index = try_runtime_api!(session_index);
1173
1174 let validators = per_session_cache.validators(session_index, parent, ctx.sender()).await;
1175 let validators = try_runtime_api!(validators);
1176
1177 let node_features = per_session_cache.node_features(session_index, parent, ctx.sender()).await;
1178 let node_features = try_runtime_api!(node_features);
1179
1180 gum::debug!(target: LOG_TARGET, ?parent, "New state");
1181
1182 let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
1183
1184 let minimum_backing_votes = per_session_cache
1185 .minimum_backing_votes(session_index, parent, ctx.sender())
1186 .await;
1187 let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);
1188 let claim_queue = try_runtime_api!(claim_queue);
1189 let disabled_validators = try_runtime_api!(disabled_validators);
1190
1191 let signing_context = SigningContext { parent_hash: parent, session_index };
1192 let validator = match Validator::construct(
1193 &validators,
1194 &disabled_validators,
1195 signing_context.clone(),
1196 keystore.clone(),
1197 ) {
1198 Ok(v) => Some(v),
1199 Err(util::Error::NotAValidator) => None,
1200 Err(e) => {
1201 gum::warn!(
1202 target: LOG_TARGET,
1203 err = ?e,
1204 "Cannot participate in candidate backing",
1205 );
1206
1207 return Ok(None);
1208 },
1209 };
1210
1211 let n_cores = validator_groups.len();
1212
1213 let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
1214 let mut assigned_core = None;
1215
1216 for idx in 0..n_cores {
1217 let core_index = CoreIndex(idx as _);
1218
1219 if !claim_queue.contains_key(&core_index) {
1220 continue;
1221 }
1222
1223 let group_index = group_rotation_info.group_for_core(core_index, n_cores);
1224 if let Some(g) = validator_groups.get(group_index.0 as usize) {
1225 if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
1226 assigned_core = Some(core_index);
1227 }
1228 groups.insert(core_index, g.clone());
1229 }
1230 }
1231 gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
1232
1233 let validator_to_group =
1234 per_session_cache.validator_to_group(session_index, &validators, &validator_groups);
1235
1236 let table_context =
1237 TableContext { validator, groups, validators: validators.to_vec(), disabled_validators };
1238
1239 Ok(Some(PerSchedulingParentState {
1240 parent,
1241 node_features,
1242 assigned_core,
1243 backed: HashSet::new(),
1244 table: Table::new(),
1245 table_context,
1246 issued_statements: HashSet::new(),
1247 awaiting_validation: HashSet::new(),
1248 fallbacks: HashMap::new(),
1249 minimum_backing_votes,
1250 n_cores: validator_groups.len() as u32,
1251 claim_queue: ClaimQueueSnapshot::from(claim_queue),
1252 validator_to_group,
1253 session_index,
1254 group_rotation_info,
1255 }))
1256}
1257
1258enum SecondingAllowed {
1259 No,
1260 Yes(Vec<Hash>),
1262}
1263
1264#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1267async fn seconding_sanity_check<Context>(
1268 ctx: &mut Context,
1269 implicit_view: &ImplicitView,
1270 hypothetical_candidate: HypotheticalCandidate,
1271) -> SecondingAllowed {
1272 let mut leaves_for_seconding = Vec::new();
1273 let mut responses = FuturesOrdered::<BoxFuture<'_, Result<_, oneshot::Canceled>>>::new();
1274
1275 let candidate_scheduling_parent = hypothetical_candidate.scheduling_parent();
1278 let candidate_hash = hypothetical_candidate.candidate_hash();
1279
1280 for head in implicit_view.leaves() {
1281 let allowed_parents_for_para = implicit_view.known_allowed_relay_parents_under(head);
1283 if !allowed_parents_for_para
1284 .unwrap_or_default()
1285 .contains(&candidate_scheduling_parent)
1286 {
1287 continue;
1288 }
1289
1290 let (tx, rx) = oneshot::channel();
1291 ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
1292 HypotheticalMembershipRequest {
1293 candidates: vec![hypothetical_candidate.clone()],
1294 fragment_chain_relay_parent: Some(*head),
1295 },
1296 tx,
1297 ))
1298 .await;
1299 let response = rx.map_ok(move |candidate_memberships| {
1300 let is_member_or_potential = candidate_memberships
1301 .into_iter()
1302 .find_map(|(candidate, leaves)| {
1303 (candidate.candidate_hash() == candidate_hash).then_some(leaves)
1304 })
1305 .and_then(|leaves| leaves.into_iter().find(|leaf| leaf == head))
1306 .is_some();
1307
1308 (is_member_or_potential, head)
1309 });
1310 responses.push_back(response.boxed());
1311 }
1312
1313 if responses.is_empty() {
1314 return SecondingAllowed::No;
1315 }
1316
1317 while let Some(response) = responses.next().await {
1318 match response {
1319 Err(oneshot::Canceled) => {
1320 gum::warn!(
1321 target: LOG_TARGET,
1322 "Failed to reach prospective parachains subsystem for hypothetical membership",
1323 );
1324
1325 return SecondingAllowed::No;
1326 },
1327 Ok((is_member_or_potential, head)) => match is_member_or_potential {
1328 false => {
1329 gum::debug!(
1330 target: LOG_TARGET,
1331 ?candidate_hash,
1332 leaf_hash = ?head,
1333 "Refusing to second candidate at leaf. Is not a potential member.",
1334 );
1335 },
1336 true => {
1337 leaves_for_seconding.push(*head);
1338 },
1339 },
1340 }
1341 }
1342
1343 if leaves_for_seconding.is_empty() {
1344 SecondingAllowed::No
1345 } else {
1346 SecondingAllowed::Yes(leaves_for_seconding)
1347 }
1348}
1349
1350#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1352async fn handle_can_second_request<Context>(
1353 ctx: &mut Context,
1354 state: &State,
1355 request: CanSecondRequest,
1356 tx: oneshot::Sender<bool>,
1357) {
1358 let scheduling_parent = request.candidate_scheduling_parent;
1359 let response = if state.per_scheduling_parent.get(&scheduling_parent).is_some() {
1360 let hypothetical_candidate = HypotheticalCandidate::Incomplete {
1361 candidate_hash: request.candidate_hash,
1362 candidate_para: request.candidate_para_id,
1363 parent_head_data_hash: request.parent_head_data_hash,
1364 candidate_scheduling_parent: scheduling_parent,
1365 };
1366
1367 let result =
1368 seconding_sanity_check(ctx, &state.implicit_view, hypothetical_candidate).await;
1369
1370 match result {
1371 SecondingAllowed::No => false,
1372 SecondingAllowed::Yes(leaves) => !leaves.is_empty(),
1373 }
1374 } else {
1375 false
1377 };
1378
1379 let _ = tx.send(response);
1380}
1381
1382#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1383async fn handle_validated_candidate_command<Context>(
1384 ctx: &mut Context,
1385 state: &mut State,
1386 scheduling_parent: Hash,
1387 command: ValidatedCandidateCommand,
1388 metrics: &Metrics,
1389) -> Result<(), Error> {
1390 match state.per_scheduling_parent.get_mut(&scheduling_parent) {
1391 Some(sp_state) => {
1392 let candidate_hash = command.candidate_hash();
1393 sp_state.awaiting_validation.remove(&candidate_hash);
1394
1395 match command {
1396 ValidatedCandidateCommand::Second(res) => match res {
1397 Ok(outputs) => {
1398 let BackgroundValidationOutputs {
1399 candidate,
1400 commitments,
1401 persisted_validation_data,
1402 } = outputs;
1403
1404 if sp_state.issued_statements.contains(&candidate_hash) {
1405 return Ok(());
1406 }
1407
1408 let receipt = CommittedCandidateReceipt {
1409 descriptor: candidate.descriptor.clone(),
1410 commitments,
1411 };
1412
1413 let hypothetical_candidate = HypotheticalCandidate::Complete {
1414 candidate_hash,
1415 receipt: Arc::new(receipt.clone()),
1416 persisted_validation_data: persisted_validation_data.clone(),
1417 };
1418 if let SecondingAllowed::No = seconding_sanity_check(
1422 ctx,
1423 &state.implicit_view,
1424 hypothetical_candidate,
1425 )
1426 .await
1427 {
1428 return Ok(());
1429 };
1430
1431 let statement =
1432 StatementWithPVD::Seconded(receipt, persisted_validation_data);
1433
1434 let res = sign_import_and_distribute_statement(
1438 ctx,
1439 sp_state,
1440 &mut state.per_candidate,
1441 statement,
1442 state.keystore.clone(),
1443 metrics,
1444 )
1445 .await;
1446
1447 if let Err(Error::RejectedByProspectiveParachains) = res {
1448 let candidate_hash = candidate.hash();
1449 gum::debug!(
1450 target: LOG_TARGET,
1451 scheduling_parent = ?candidate.descriptor().scheduling_parent(),
1452 ?candidate_hash,
1453 "Attempted to second candidate but was rejected by prospective parachains",
1454 );
1455
1456 ctx.send_message(CollatorProtocolMessage::Invalid(
1457 candidate.descriptor().scheduling_parent(),
1458 candidate,
1459 ))
1460 .await;
1461
1462 return Ok(());
1463 }
1464
1465 if let Some(stmt) = res? {
1466 match state.per_candidate.get_mut(&candidate_hash) {
1467 None => {
1468 gum::warn!(
1469 target: LOG_TARGET,
1470 ?candidate_hash,
1471 "Missing `per_candidate` for seconded candidate.",
1472 );
1473 },
1474 Some(p) => p.seconded_locally = true,
1475 }
1476
1477 sp_state.issued_statements.insert(candidate_hash);
1478
1479 metrics.on_candidate_seconded();
1480 ctx.send_message(CollatorProtocolMessage::Seconded(
1481 sp_state.parent,
1482 StatementWithPVD::drop_pvd_from_signed(stmt),
1483 ))
1484 .await;
1485 }
1486 },
1487 Err(candidate) => {
1488 ctx.send_message(CollatorProtocolMessage::Invalid(
1489 sp_state.parent,
1490 candidate,
1491 ))
1492 .await;
1493 },
1494 },
1495 ValidatedCandidateCommand::Attest(res) => {
1496 sp_state.fallbacks.remove(&candidate_hash);
1498 if !sp_state.issued_statements.contains(&candidate_hash) {
1500 if res.is_ok() {
1501 let statement = StatementWithPVD::Valid(candidate_hash);
1502
1503 sign_import_and_distribute_statement(
1504 ctx,
1505 sp_state,
1506 &mut state.per_candidate,
1507 statement,
1508 state.keystore.clone(),
1509 metrics,
1510 )
1511 .await?;
1512 }
1513 sp_state.issued_statements.insert(candidate_hash);
1514 }
1515 },
1516 ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
1517 if let Some(attesting) = sp_state.fallbacks.get_mut(&candidate_hash) {
1518 if let Some(index) = attesting.backing.pop() {
1519 attesting.from_validator = index;
1520 let attesting = attesting.clone();
1521
1522 if let Some(pvd) = state
1528 .per_candidate
1529 .get(&candidate_hash)
1530 .map(|pc| pc.persisted_validation_data.clone())
1531 {
1532 kick_off_validation_work(
1533 ctx,
1534 sp_state,
1535 pvd,
1536 &state.background_validation_tx,
1537 attesting,
1538 )
1539 .await?;
1540 }
1541 }
1542 } else {
1543 gum::warn!(
1544 target: LOG_TARGET,
1545 "AttestNoPoV was triggered without fallback being available."
1546 );
1547 debug_assert!(false);
1548 }
1549 },
1550 }
1551 },
1552 None => {
1553 },
1556 }
1557
1558 Ok(())
1559}
1560
1561fn sign_statement(
1562 sp_state: &PerSchedulingParentState,
1563 statement: StatementWithPVD,
1564 keystore: KeystorePtr,
1565 metrics: &Metrics,
1566) -> Option<SignedFullStatementWithPVD> {
1567 let signed = sp_state
1568 .table_context
1569 .validator
1570 .as_ref()?
1571 .sign(keystore, statement)
1572 .ok()
1573 .flatten()?;
1574 metrics.on_statement_signed();
1575 Some(signed)
1576}
1577
1578#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1586async fn import_statement<Context>(
1587 ctx: &mut Context,
1588 sp_state: &mut PerSchedulingParentState,
1589 per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1590 statement: &SignedFullStatementWithPVD,
1591) -> Result<Option<TableSummary>, Error> {
1592 let candidate_hash = statement.payload().candidate_hash();
1593
1594 gum::debug!(
1595 target: LOG_TARGET,
1596 statement = ?statement.payload().to_compact(),
1597 validator_index = statement.validator_index().0,
1598 ?candidate_hash,
1599 "Importing statement",
1600 );
1601
1602 if let StatementWithPVD::Seconded(candidate, pvd) = statement.payload() {
1615 if !per_candidate.contains_key(&candidate_hash) {
1616 let (tx, rx) = oneshot::channel();
1617 ctx.send_message(ProspectiveParachainsMessage::IntroduceSecondedCandidate(
1618 IntroduceSecondedCandidateRequest {
1619 candidate_para: candidate.descriptor.para_id(),
1620 candidate_receipt: candidate.clone(),
1621 persisted_validation_data: pvd.clone(),
1622 },
1623 tx,
1624 ))
1625 .await;
1626
1627 match rx.await {
1628 Err(oneshot::Canceled) => {
1629 gum::warn!(
1630 target: LOG_TARGET,
1631 "Could not reach the Prospective Parachains subsystem."
1632 );
1633
1634 return Err(Error::RejectedByProspectiveParachains);
1635 },
1636 Ok(false) => return Err(Error::RejectedByProspectiveParachains),
1637 Ok(true) => {},
1638 }
1639
1640 per_candidate.insert(
1642 candidate_hash,
1643 PerCandidateState {
1644 persisted_validation_data: pvd.clone(),
1645 seconded_locally: false,
1647 scheduling_parent: candidate.descriptor.scheduling_parent(),
1649 },
1650 );
1651 }
1652 }
1653
1654 let stmt = primitive_statement_to_table(statement);
1655
1656 let core = core_index_from_statement(sp_state, statement).ok_or(Error::CoreIndexUnavailable)?;
1657
1658 Ok(sp_state.table.import_statement(&sp_state.table_context, core, stmt))
1659}
1660
1661#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1664async fn post_import_statement_actions<Context>(
1665 ctx: &mut Context,
1666 sp_state: &mut PerSchedulingParentState,
1667 summary: Option<&TableSummary>,
1668) {
1669 if let Some(attested) = summary.as_ref().and_then(|s| {
1670 sp_state.table.attested_candidate(
1671 &s.candidate,
1672 &sp_state.table_context,
1673 sp_state.minimum_backing_votes,
1674 )
1675 }) {
1676 let candidate_hash = attested.candidate.hash();
1677
1678 if sp_state.backed.insert(candidate_hash) {
1680 if let Some(backed) = table_attested_to_backed(attested, &sp_state.table_context) {
1681 let para_id = backed.candidate().descriptor.para_id();
1682 gum::debug!(
1683 target: LOG_TARGET,
1684 candidate_hash = ?candidate_hash,
1685 scheduling_parent = ?sp_state.parent,
1686 %para_id,
1687 "Candidate backed",
1688 );
1689
1690 ctx.send_message(ProspectiveParachainsMessage::CandidateBacked(
1693 para_id,
1694 candidate_hash,
1695 ))
1696 .await;
1697 ctx.send_message(StatementDistributionMessage::Backed(candidate_hash)).await;
1699 } else {
1700 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
1701 }
1702 } else {
1703 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
1704 }
1705 } else {
1706 gum::debug!(target: LOG_TARGET, "No attested candidate");
1707 }
1708
1709 issue_new_misbehaviors(ctx, sp_state.parent, &mut sp_state.table);
1710}
1711
1712#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1714fn issue_new_misbehaviors<Context>(
1715 ctx: &mut Context,
1716 relay_parent: Hash,
1717 table: &mut Table<TableContext>,
1718) {
1719 let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
1721 for (validator_id, report) in misbehaviors {
1722 ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
1728 relay_parent,
1729 ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
1730 ));
1731 }
1732}
1733
1734#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1736async fn sign_import_and_distribute_statement<Context>(
1737 ctx: &mut Context,
1738 sp_state: &mut PerSchedulingParentState,
1739 per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1740 statement: StatementWithPVD,
1741 keystore: KeystorePtr,
1742 metrics: &Metrics,
1743) -> Result<Option<SignedFullStatementWithPVD>, Error> {
1744 if let Some(signed_statement) = sign_statement(&*sp_state, statement, keystore, metrics) {
1745 let summary = import_statement(ctx, sp_state, per_candidate, &signed_statement).await?;
1746
1747 let smsg = StatementDistributionMessage::Share(sp_state.parent, signed_statement.clone());
1750 ctx.send_unbounded_message(smsg);
1751
1752 post_import_statement_actions(ctx, sp_state, summary.as_ref()).await;
1753
1754 Ok(Some(signed_statement))
1755 } else {
1756 Ok(None)
1757 }
1758}
1759
1760#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1761async fn background_validate_and_make_available<Context>(
1762 ctx: &mut Context,
1763 sp_state: &mut PerSchedulingParentState,
1764 params: BackgroundValidationParams<
1765 impl overseer::CandidateBackingSenderTrait,
1766 impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
1767 >,
1768) -> Result<(), Error> {
1769 let candidate_hash = params.candidate.hash();
1770 let Some(core_index) = sp_state.assigned_core else { return Ok(()) };
1771 if sp_state.awaiting_validation.insert(candidate_hash) {
1772 let bg = async move {
1774 if let Err(error) = validate_and_make_available(params, core_index).await {
1775 if let Error::BackgroundValidationMpsc(error) = error {
1776 gum::debug!(
1777 target: LOG_TARGET,
1778 ?candidate_hash,
1779 ?error,
1780 "Mpsc background validation mpsc died during validation- leaf no longer active?"
1781 );
1782 } else {
1783 gum::error!(
1784 target: LOG_TARGET,
1785 ?candidate_hash,
1786 ?error,
1787 "Failed to validate and make available",
1788 );
1789 }
1790 }
1791 };
1792
1793 ctx.spawn("backing-validation", bg.boxed())
1794 .map_err(|_| Error::FailedToSpawnBackgroundTask)?;
1795 }
1796
1797 Ok(())
1798}
1799
1800#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1802async fn kick_off_validation_work<Context>(
1803 ctx: &mut Context,
1804 sp_state: &mut PerSchedulingParentState,
1805 persisted_validation_data: PersistedValidationData,
1806 background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1807 attesting: AttestingData,
1808) -> Result<(), Error> {
1809 match sp_state.table_context.local_validator_is_disabled() {
1811 Some(true) => {
1812 gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
1813 return Ok(());
1814 },
1815 Some(false) => {}, None => {
1817 gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
1818 return Ok(());
1819 },
1820 }
1821
1822 let candidate_hash = attesting.candidate.hash();
1823 if sp_state.issued_statements.contains(&candidate_hash) {
1824 return Ok(());
1825 }
1826
1827 gum::debug!(
1828 target: LOG_TARGET,
1829 candidate_hash = ?candidate_hash,
1830 candidate_receipt = ?attesting.candidate,
1831 "Kicking off validation",
1832 );
1833
1834 let bg_sender = ctx.sender().clone();
1835 let pov = PoVData::FetchFromValidator {
1836 from_validator: attesting.from_validator,
1837 candidate_hash,
1838 pov_hash: attesting.pov_hash,
1839 };
1840 let scheduling_parent = attesting.candidate.descriptor().scheduling_parent();
1841
1842 background_validate_and_make_available(
1843 ctx,
1844 sp_state,
1845 BackgroundValidationParams {
1846 sender: bg_sender,
1847 tx_command: background_validation_tx.clone(),
1848 candidate: attesting.candidate,
1849 scheduling_parent,
1850 session_index: sp_state.session_index,
1851 node_features: sp_state.node_features.clone(),
1852 persisted_validation_data,
1853 pov,
1854 n_validators: sp_state.table_context.validators.len(),
1855 make_command: ValidatedCandidateCommand::Attest,
1856 },
1857 )
1858 .await
1859}
1860
1861#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1863async fn maybe_validate_and_import<Context>(
1864 ctx: &mut Context,
1865 state: &mut State,
1866 scheduling_parent: Hash,
1867 statement: SignedFullStatementWithPVD,
1868) -> Result<(), Error> {
1869 let sp_state = match state.per_scheduling_parent.get_mut(&scheduling_parent) {
1870 Some(r) => r,
1871 None => {
1872 gum::trace!(
1873 target: LOG_TARGET,
1874 ?scheduling_parent,
1875 "Received statement for unknown scheduling parent"
1876 );
1877
1878 return Ok(());
1879 },
1880 };
1881
1882 if sp_state.table_context.validator_is_disabled(&statement.validator_index()) {
1884 gum::debug!(
1885 target: LOG_TARGET,
1886 sender_validator_idx = ?statement.validator_index(),
1887 "Not importing statement because the sender is disabled"
1888 );
1889 return Ok(());
1890 }
1891
1892 if let StatementWithPVD::Seconded(receipt, _) = statement.payload() {
1894 if let Err(reason) = receipt.descriptor.check_version_acceptance(state.v3_ever_seen) {
1895 gum::debug!(
1896 target: LOG_TARGET,
1897 ?scheduling_parent,
1898 "Not importing Seconded statement: {}",
1899 reason,
1900 );
1901 return Ok(());
1902 }
1903 }
1904
1905 let res = import_statement(ctx, sp_state, &mut state.per_candidate, &statement).await;
1906
1907 if let Err(Error::RejectedByProspectiveParachains) = res {
1910 gum::debug!(
1911 target: LOG_TARGET,
1912 ?scheduling_parent,
1913 "Statement rejected by prospective parachains."
1914 );
1915
1916 return Ok(());
1917 }
1918
1919 let summary = res?;
1920 post_import_statement_actions(ctx, sp_state, summary.as_ref()).await;
1921
1922 if let Some(summary) = summary {
1923 let candidate_hash = summary.candidate;
1928
1929 if Some(summary.group_id) != sp_state.assigned_core {
1930 return Ok(());
1931 }
1932
1933 let attesting = match statement.payload() {
1934 StatementWithPVD::Seconded(receipt, _) => {
1935 let attesting = AttestingData {
1936 candidate: sp_state
1937 .table
1938 .get_candidate(&candidate_hash)
1939 .ok_or(Error::CandidateNotFound)?
1940 .to_plain(),
1941 pov_hash: receipt.descriptor.pov_hash(),
1942 from_validator: statement.validator_index(),
1943 backing: Vec::new(),
1944 };
1945 sp_state.fallbacks.insert(summary.candidate, attesting.clone());
1946 attesting
1947 },
1948 StatementWithPVD::Valid(candidate_hash) => {
1949 if let Some(attesting) = sp_state.fallbacks.get_mut(candidate_hash) {
1950 let our_index = sp_state.table_context.validator.as_ref().map(|v| v.index());
1951 if our_index == Some(statement.validator_index()) {
1952 return Ok(());
1953 }
1954
1955 if sp_state.awaiting_validation.contains(candidate_hash) {
1956 attesting.backing.push(statement.validator_index());
1958 return Ok(());
1959 } else {
1960 attesting.from_validator = statement.validator_index();
1962 attesting.clone()
1963 }
1964 } else {
1965 return Ok(());
1966 }
1967 },
1968 };
1969
1970 match sp_state.table_context.local_validator_is_disabled() {
1972 Some(true) => return Ok(()),
1973 None => return Ok(()),
1974 Some(false) => {},
1975 }
1976
1977 if let Some(pvd) = state
1980 .per_candidate
1981 .get(&candidate_hash)
1982 .map(|pc| pc.persisted_validation_data.clone())
1983 {
1984 kick_off_validation_work(
1985 ctx,
1986 sp_state,
1987 pvd,
1988 &state.background_validation_tx,
1989 attesting,
1990 )
1991 .await?;
1992 }
1993 }
1994 Ok(())
1995}
1996
1997#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1999async fn validate_and_second<Context>(
2000 ctx: &mut Context,
2001 sp_state: &mut PerSchedulingParentState,
2002 persisted_validation_data: PersistedValidationData,
2003 candidate: &CandidateReceipt,
2004 pov: Arc<PoV>,
2005 background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
2006) -> Result<(), Error> {
2007 let candidate_hash = candidate.hash();
2008
2009 gum::debug!(
2010 target: LOG_TARGET,
2011 candidate_hash = ?candidate_hash,
2012 candidate_receipt = ?candidate,
2013 "Validate and second candidate",
2014 );
2015
2016 let bg_sender = ctx.sender().clone();
2017 let scheduling_parent = candidate.descriptor.scheduling_parent();
2018 background_validate_and_make_available(
2019 ctx,
2020 sp_state,
2021 BackgroundValidationParams {
2022 sender: bg_sender,
2023 tx_command: background_validation_tx.clone(),
2024 candidate: candidate.clone(),
2025 scheduling_parent,
2026 session_index: sp_state.session_index,
2027 node_features: sp_state.node_features.clone(),
2028 persisted_validation_data,
2029 pov: PoVData::Ready(pov),
2030 n_validators: sp_state.table_context.validators.len(),
2031 make_command: ValidatedCandidateCommand::Second,
2032 },
2033 )
2034 .await?;
2035
2036 Ok(())
2037}
2038
2039#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2040async fn handle_second_message<Context>(
2041 ctx: &mut Context,
2042 state: &mut State,
2043 candidate: CandidateReceipt,
2044 persisted_validation_data: PersistedValidationData,
2045 pov: PoV,
2046 metrics: &Metrics,
2047) -> Result<(), Error> {
2048 let _timer = metrics.time_process_second();
2049
2050 let candidate_hash = candidate.hash();
2051
2052 if candidate.descriptor().persisted_validation_data_hash() != persisted_validation_data.hash() {
2053 gum::warn!(
2054 target: LOG_TARGET,
2055 ?candidate_hash,
2056 "Candidate backing was asked to second candidate with wrong PVD",
2057 );
2058
2059 return Ok(());
2060 }
2061
2062 if let Err(reason) = candidate.descriptor().check_version_acceptance(state.v3_ever_seen) {
2064 gum::debug!(
2065 target: LOG_TARGET,
2066 ?candidate_hash,
2067 "Not seconding candidate: {}",
2068 reason,
2069 );
2070 ctx.send_message(CollatorProtocolMessage::Invalid(
2071 candidate.descriptor().scheduling_parent(),
2072 candidate,
2073 ))
2074 .await;
2075 return Ok(());
2076 }
2077
2078 let scheduling_parent = candidate.descriptor().scheduling_parent();
2080
2081 let sp_state = match state.per_scheduling_parent.get_mut(&scheduling_parent) {
2083 None => {
2084 gum::trace!(
2085 target: LOG_TARGET,
2086 ?scheduling_parent,
2087 ?candidate_hash,
2088 "Candidate has scheduling_parent outside of our view."
2089 );
2090
2091 return Ok(());
2092 },
2093 Some(r) => r,
2094 };
2095
2096 let assigned_core = sp_state.assigned_core;
2098 let claim_queue = &sp_state.claim_queue;
2099
2100 if sp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
2103 gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
2104 return Ok(());
2105 }
2106
2107 let assigned_paras = assigned_core.and_then(|core| claim_queue.0.get(&core));
2109
2110 if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id())) {
2112 gum::debug!(
2113 target: LOG_TARGET,
2114 our_assignment_core = ?assigned_core,
2115 our_assignment_paras = ?assigned_paras,
2116 collation = ?candidate.descriptor().para_id(),
2117 "Subsystem asked to second for para outside of our assignment",
2118 );
2119 return Ok(());
2120 }
2121
2122 gum::debug!(
2123 target: LOG_TARGET,
2124 our_assignment_core = ?assigned_core,
2125 our_assignment_paras = ?assigned_paras,
2126 collation = ?candidate.descriptor().para_id(),
2127 "Current assignments vs collation",
2128 );
2129
2130 if !sp_state.issued_statements.contains(&candidate_hash) {
2138 let pov = Arc::new(pov);
2139
2140 validate_and_second(
2141 ctx,
2142 sp_state,
2143 persisted_validation_data,
2144 &candidate,
2145 pov,
2146 &state.background_validation_tx,
2147 )
2148 .await?;
2149 }
2150
2151 Ok(())
2152}
2153
2154#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2155async fn handle_statement_message<Context>(
2156 ctx: &mut Context,
2157 state: &mut State,
2158 scheduling_parent: Hash,
2159 statement: SignedFullStatementWithPVD,
2160 metrics: &Metrics,
2161) -> Result<(), Error> {
2162 let _timer = metrics.time_process_statement();
2163
2164 match maybe_validate_and_import(ctx, state, scheduling_parent, statement).await {
2166 Err(Error::ValidationFailed(_)) => Ok(()),
2167 Err(e) => Err(e),
2168 Ok(()) => Ok(()),
2169 }
2170}
2171
2172fn handle_get_backable_candidates_message(
2173 state: &State,
2174 requested_candidates: HashMap<ParaId, Vec<BackableCandidateRef>>,
2175 tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
2176 metrics: &Metrics,
2177) -> Result<(), Error> {
2178 let _timer = metrics.time_get_backed_candidates();
2179
2180 let mut backed = HashMap::with_capacity(requested_candidates.len());
2181
2182 for (para_id, para_candidates) in requested_candidates {
2183 for candidate_ref in para_candidates.iter() {
2184 let candidate_hash = candidate_ref.candidate_hash;
2185 let scheduling_parent = candidate_ref.scheduling_parent;
2186
2187 let sp_state = match state.per_scheduling_parent.get(&scheduling_parent) {
2188 Some(sp_state) => sp_state,
2189 None => {
2190 gum::debug!(
2191 target: LOG_TARGET,
2192 ?scheduling_parent,
2193 ?candidate_hash,
2194 "Requested candidate's scheduling parent is out of view",
2195 );
2196 break;
2197 },
2198 };
2199 let maybe_backed_candidate = sp_state
2200 .table
2201 .attested_candidate(
2202 &candidate_hash,
2203 &sp_state.table_context,
2204 sp_state.minimum_backing_votes,
2205 )
2206 .and_then(|attested| table_attested_to_backed(attested, &sp_state.table_context));
2207
2208 if let Some(backed_candidate) = maybe_backed_candidate {
2209 backed
2210 .entry(para_id)
2211 .or_insert_with(|| Vec::with_capacity(para_candidates.len()))
2212 .push(backed_candidate);
2213 } else {
2214 break;
2215 }
2216 }
2217 }
2218
2219 tx.send(backed).map_err(|data| Error::Send(data))?;
2220 Ok(())
2221}