1#![deny(unused_crate_dependencies)]
67
68use std::{
69 collections::{HashMap, HashSet},
70 sync::Arc,
71};
72
73use bitvec::vec::BitVec;
74use futures::{
75 channel::{mpsc, oneshot},
76 future::BoxFuture,
77 stream::FuturesOrdered,
78 FutureExt, SinkExt, StreamExt, TryFutureExt,
79};
80use schnellru::{ByLength, LruMap};
81
82use error::{Error, FatalResult};
83use polkadot_node_primitives::{
84 AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
85 ValidationResult,
86};
87use polkadot_node_subsystem::{
88 messages::{
89 AvailabilityDistributionMessage, AvailabilityStoreMessage, CanSecondRequest,
90 CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage,
91 HypotheticalCandidate, HypotheticalMembershipRequest, IntroduceSecondedCandidateRequest,
92 ProspectiveParachainsMessage, ProvisionableData, ProvisionerMessage, PvfExecKind,
93 RuntimeApiMessage, RuntimeApiRequest, StatementDistributionMessage,
94 StoreAvailableDataError,
95 },
96 overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
97 SubsystemError,
98};
99use polkadot_node_subsystem_util::{
100 self as util,
101 backing_implicit_view::View as ImplicitView,
102 request_claim_queue, request_disabled_validators, request_min_backing_votes,
103 request_node_features, request_session_executor_params, request_session_index_for_child,
104 request_validator_groups, request_validators,
105 runtime::{self, ClaimQueueSnapshot},
106 Validator,
107};
108use polkadot_parachain_primitives::primitives::IsSystem;
109use polkadot_primitives::{
110 BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceiptV2 as CandidateReceipt,
111 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, ExecutorParams,
112 GroupIndex, GroupRotationInfo, Hash, Id as ParaId, IndexedVec, NodeFeatures,
113 PersistedValidationData, SessionIndex, SigningContext, ValidationCode, ValidatorId,
114 ValidatorIndex, ValidatorSignature, ValidityAttestation,
115};
116use polkadot_statement_table::{
117 generic::AttestedCandidate as TableAttestedCandidate,
118 v2::{
119 SignedStatement as TableSignedStatement, Statement as TableStatement,
120 Summary as TableSummary,
121 },
122 Context as TableContextTrait, Table,
123};
124use sp_keystore::KeystorePtr;
125
126mod error;
127
128mod metrics;
129use self::metrics::Metrics;
130
131#[cfg(test)]
132mod tests;
133
134const LOG_TARGET: &str = "parachain::candidate-backing";
135
136enum PoVData {
138 Ready(Arc<PoV>),
140 FetchFromValidator {
142 from_validator: ValidatorIndex,
143 candidate_hash: CandidateHash,
144 pov_hash: Hash,
145 },
146}
147
148enum ValidatedCandidateCommand {
149 Second(BackgroundValidationResult),
151 Attest(BackgroundValidationResult),
153 AttestNoPoV(CandidateHash),
155}
156
157impl std::fmt::Debug for ValidatedCandidateCommand {
158 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
159 let candidate_hash = self.candidate_hash();
160 match *self {
161 ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
162 ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
163 ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
164 }
165 }
166}
167
168impl ValidatedCandidateCommand {
169 fn candidate_hash(&self) -> CandidateHash {
170 match *self {
171 ValidatedCandidateCommand::Second(Ok(ref outputs)) => outputs.candidate.hash(),
172 ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
173 ValidatedCandidateCommand::Attest(Ok(ref outputs)) => outputs.candidate.hash(),
174 ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
175 ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
176 }
177 }
178}
179
180pub struct CandidateBackingSubsystem {
182 keystore: KeystorePtr,
183 metrics: Metrics,
184}
185
186impl CandidateBackingSubsystem {
187 pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
189 Self { keystore, metrics }
190 }
191}
192
193#[overseer::subsystem(CandidateBacking, error = SubsystemError, prefix = self::overseer)]
194impl<Context> CandidateBackingSubsystem
195where
196 Context: Send + Sync,
197{
198 fn start(self, ctx: Context) -> SpawnedSubsystem {
199 let future = async move {
200 run(ctx, self.keystore, self.metrics)
201 .await
202 .map_err(|e| SubsystemError::with_origin("candidate-backing", e))
203 }
204 .boxed();
205
206 SpawnedSubsystem { name: "candidate-backing-subsystem", future }
207 }
208}
209
210struct PerRelayParentState {
211 parent: Hash,
213 node_features: NodeFeatures,
215 executor_params: Arc<ExecutorParams>,
217 assigned_core: Option<CoreIndex>,
219 backed: HashSet<CandidateHash>,
221 table: Table<TableContext>,
223 table_context: TableContext,
225 issued_statements: HashSet<CandidateHash>,
227 awaiting_validation: HashSet<CandidateHash>,
229 fallbacks: HashMap<CandidateHash, AttestingData>,
231 minimum_backing_votes: u32,
233 n_cores: u32,
235 claim_queue: ClaimQueueSnapshot,
238 validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
240 group_rotation_info: GroupRotationInfo,
242}
243
244struct PerCandidateState {
245 persisted_validation_data: PersistedValidationData,
246 seconded_locally: bool,
247 relay_parent: Hash,
248}
249
250struct PerSessionCache {
253 validators_cache: LruMap<SessionIndex, Arc<Vec<ValidatorId>>>,
255 node_features_cache: LruMap<SessionIndex, NodeFeatures>,
257 executor_params_cache: LruMap<SessionIndex, Arc<ExecutorParams>>,
259 minimum_backing_votes_cache: LruMap<SessionIndex, u32>,
261 validator_to_group_cache:
263 LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
264}
265
266impl Default for PerSessionCache {
267 fn default() -> Self {
269 Self::new(2)
270 }
271}
272
273impl PerSessionCache {
274 fn new(capacity: u32) -> Self {
276 PerSessionCache {
277 validators_cache: LruMap::new(ByLength::new(capacity)),
278 node_features_cache: LruMap::new(ByLength::new(capacity)),
279 executor_params_cache: LruMap::new(ByLength::new(capacity)),
280 minimum_backing_votes_cache: LruMap::new(ByLength::new(capacity)),
281 validator_to_group_cache: LruMap::new(ByLength::new(capacity)),
282 }
283 }
284
285 async fn validators(
287 &mut self,
288 session_index: SessionIndex,
289 parent: Hash,
290 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
291 ) -> Result<Arc<Vec<ValidatorId>>, RuntimeApiError> {
292 if let Some(validators) = self.validators_cache.get(&session_index) {
294 return Ok(Arc::clone(validators));
295 }
296
297 let validators: Vec<ValidatorId> =
299 request_validators(parent, sender).await.await.map_err(|err| {
300 RuntimeApiError::Execution { runtime_api_name: "Validators", source: Arc::new(err) }
301 })??;
302
303 let validators = Arc::new(validators);
305
306 self.validators_cache.insert(session_index, Arc::clone(&validators));
308
309 Ok(validators)
310 }
311
312 async fn node_features(
314 &mut self,
315 session_index: SessionIndex,
316 parent: Hash,
317 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
318 ) -> Result<NodeFeatures, RuntimeApiError> {
319 if let Some(node_features) = self.node_features_cache.get(&session_index) {
321 return Ok(node_features.clone());
322 }
323
324 let node_features = request_node_features(parent, session_index, sender)
326 .await
327 .await
328 .map_err(|err| RuntimeApiError::Execution {
329 runtime_api_name: "NodeFeatures",
330 source: Arc::new(err),
331 })??;
332
333 self.node_features_cache.insert(session_index, node_features.clone());
335
336 Ok(node_features)
337 }
338
339 async fn executor_params(
342 &mut self,
343 session_index: SessionIndex,
344 parent: Hash,
345 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
346 ) -> Result<Arc<ExecutorParams>, RuntimeApiError> {
347 if let Some(executor_params) = self.executor_params_cache.get(&session_index) {
349 return Ok(Arc::clone(executor_params));
350 }
351
352 let executor_params = request_session_executor_params(parent, session_index, sender)
354 .await
355 .await
356 .map_err(|err| RuntimeApiError::Execution {
357 runtime_api_name: "SessionExecutorParams",
358 source: Arc::new(err),
359 })??
360 .ok_or_else(|| RuntimeApiError::Execution {
361 runtime_api_name: "SessionExecutorParams",
362 source: Arc::new(Error::MissingExecutorParams),
363 })?;
364
365 let executor_params = Arc::new(executor_params);
367
368 self.executor_params_cache.insert(session_index, Arc::clone(&executor_params));
370
371 Ok(executor_params)
372 }
373
374 async fn minimum_backing_votes(
377 &mut self,
378 session_index: SessionIndex,
379 parent: Hash,
380 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
381 ) -> Result<u32, RuntimeApiError> {
382 if let Some(minimum_backing_votes) = self.minimum_backing_votes_cache.get(&session_index) {
384 return Ok(*minimum_backing_votes);
385 }
386
387 let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender)
389 .await
390 .await
391 .map_err(|err| RuntimeApiError::Execution {
392 runtime_api_name: "MinimumBackingVotes",
393 source: Arc::new(err),
394 })??;
395
396 self.minimum_backing_votes_cache.insert(session_index, minimum_backing_votes);
398
399 Ok(minimum_backing_votes)
400 }
401
402 fn validator_to_group(
404 &mut self,
405 session_index: SessionIndex,
406 validators: &[ValidatorId],
407 validator_groups: &[Vec<ValidatorIndex>],
408 ) -> Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>> {
409 let validator_to_group = self
410 .validator_to_group_cache
411 .get_or_insert(session_index, || {
412 let mut vector = vec![None; validators.len()];
413
414 for (group_idx, validator_group) in validator_groups.iter().enumerate() {
415 for validator in validator_group {
416 vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
417 }
418 }
419
420 Arc::new(IndexedVec::<_, _>::from(vector))
421 })
422 .expect("Just inserted");
423
424 Arc::clone(validator_to_group)
425 }
426}
427
428struct State {
430 implicit_view: ImplicitView,
432 per_relay_parent: HashMap<Hash, PerRelayParentState>,
435 per_candidate: HashMap<CandidateHash, PerCandidateState>,
440 per_session_cache: PerSessionCache,
443 background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
446 keystore: KeystorePtr,
448}
449
450impl State {
451 fn new(
452 background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
453 keystore: KeystorePtr,
454 ) -> Self {
455 State {
456 implicit_view: ImplicitView::default(),
457 per_relay_parent: HashMap::default(),
458 per_candidate: HashMap::new(),
459 per_session_cache: PerSessionCache::default(),
460 background_validation_tx,
461 keystore,
462 }
463 }
464}
465
466#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
467async fn run<Context>(
468 mut ctx: Context,
469 keystore: KeystorePtr,
470 metrics: Metrics,
471) -> FatalResult<()> {
472 let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
473 let mut state = State::new(background_validation_tx, keystore);
474
475 loop {
476 let res =
477 run_iteration(&mut ctx, &mut state, &metrics, &mut background_validation_rx).await;
478
479 match res {
480 Ok(()) => break,
481 Err(e) => crate::error::log_error(Err(e))?,
482 }
483 }
484
485 Ok(())
486}
487
488#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
489async fn run_iteration<Context>(
490 ctx: &mut Context,
491 state: &mut State,
492 metrics: &Metrics,
493 background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
494) -> Result<(), Error> {
495 loop {
496 futures::select!(
497 validated_command = background_validation_rx.next().fuse() => {
498 if let Some((relay_parent, command)) = validated_command {
499 handle_validated_candidate_command(
500 &mut *ctx,
501 state,
502 relay_parent,
503 command,
504 metrics,
505 ).await?;
506 } else {
507 panic!("background_validation_tx always alive at this point; qed");
508 }
509 }
510 from_overseer = ctx.recv().fuse() => {
511 match from_overseer.map_err(Error::OverseerExited)? {
512 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
513 handle_active_leaves_update(
514 &mut *ctx,
515 update,
516 state,
517 ).await?;
518 }
519 FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}
520 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
521 FromOrchestra::Communication { msg } => {
522 handle_communication(&mut *ctx, state, msg, metrics).await?;
523 }
524 }
525 }
526 )
527 }
528}
529
530#[derive(Clone)]
536struct AttestingData {
537 candidate: CandidateReceipt,
539 pov_hash: Hash,
541 from_validator: ValidatorIndex,
543 backing: Vec<ValidatorIndex>,
545}
546
547#[derive(Default, Debug)]
548struct TableContext {
549 validator: Option<Validator>,
550 groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
551 validators: Vec<ValidatorId>,
552 disabled_validators: Vec<ValidatorIndex>,
553}
554
555impl TableContext {
556 pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
558 self.disabled_validators
559 .iter()
560 .any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
561 }
562
563 pub fn local_validator_is_disabled(&self) -> Option<bool> {
565 self.validator.as_ref().map(|v| v.disabled())
566 }
567}
568
569impl TableContextTrait for TableContext {
570 type AuthorityId = ValidatorIndex;
571 type Digest = CandidateHash;
572 type GroupId = CoreIndex;
573 type Signature = ValidatorSignature;
574 type Candidate = CommittedCandidateReceipt;
575
576 fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
577 candidate.hash()
578 }
579
580 fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
581 self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
582 }
583
584 fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
585 self.groups.get(group).map(|g| g.len())
586 }
587}
588
589fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedStatement {
592 let statement = match s.payload() {
593 StatementWithPVD::Seconded(c, _) => TableStatement::Seconded(c.clone()),
594 StatementWithPVD::Valid(h) => TableStatement::Valid(*h),
595 };
596
597 TableSignedStatement {
598 statement,
599 signature: s.signature().clone(),
600 sender: s.validator_index(),
601 }
602}
603
604fn table_attested_to_backed(
605 attested: TableAttestedCandidate<
606 CoreIndex,
607 CommittedCandidateReceipt,
608 ValidatorIndex,
609 ValidatorSignature,
610 >,
611 table_context: &TableContext,
612) -> Option<BackedCandidate> {
613 let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
614
615 let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
616 validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
617
618 let group = table_context.groups.get(&core_index)?;
619
620 let mut validator_indices = BitVec::with_capacity(group.len());
621
622 validator_indices.resize(group.len(), false);
623
624 let mut vote_positions = Vec::with_capacity(validity_votes.len());
628 for (orig_idx, id) in ids.iter().enumerate() {
629 if let Some(position) = group.iter().position(|x| x == id) {
630 validator_indices.set(position, true);
631 vote_positions.push((orig_idx, position));
632 } else {
633 gum::warn!(
634 target: LOG_TARGET,
635 "Logic error: Validity vote from table does not correspond to group",
636 );
637
638 return None
639 }
640 }
641 vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
642
643 Some(BackedCandidate::new(
644 candidate,
645 vote_positions
646 .into_iter()
647 .map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
648 .collect(),
649 validator_indices,
650 core_index,
651 ))
652}
653
654async fn store_available_data(
655 sender: &mut impl overseer::CandidateBackingSenderTrait,
656 n_validators: u32,
657 candidate_hash: CandidateHash,
658 available_data: AvailableData,
659 expected_erasure_root: Hash,
660 core_index: CoreIndex,
661 node_features: NodeFeatures,
662) -> Result<(), Error> {
663 let (tx, rx) = oneshot::channel();
664 sender
669 .send_message(AvailabilityStoreMessage::StoreAvailableData {
670 candidate_hash,
671 n_validators,
672 available_data,
673 expected_erasure_root,
674 core_index,
675 node_features,
676 tx,
677 })
678 .await;
679
680 rx.await
681 .map_err(Error::StoreAvailableDataChannel)?
682 .map_err(Error::StoreAvailableData)
683}
684
685async fn make_pov_available(
693 sender: &mut impl overseer::CandidateBackingSenderTrait,
694 n_validators: usize,
695 pov: Arc<PoV>,
696 candidate_hash: CandidateHash,
697 validation_data: PersistedValidationData,
698 expected_erasure_root: Hash,
699 core_index: CoreIndex,
700 node_features: NodeFeatures,
701) -> Result<(), Error> {
702 store_available_data(
703 sender,
704 n_validators as u32,
705 candidate_hash,
706 AvailableData { pov, validation_data },
707 expected_erasure_root,
708 core_index,
709 node_features,
710 )
711 .await
712}
713
714async fn request_pov(
715 sender: &mut impl overseer::CandidateBackingSenderTrait,
716 relay_parent: Hash,
717 from_validator: ValidatorIndex,
718 para_id: ParaId,
719 candidate_hash: CandidateHash,
720 pov_hash: Hash,
721) -> Result<Arc<PoV>, Error> {
722 let (tx, rx) = oneshot::channel();
723 sender
724 .send_message(AvailabilityDistributionMessage::FetchPoV {
725 relay_parent,
726 from_validator,
727 para_id,
728 candidate_hash,
729 pov_hash,
730 tx,
731 })
732 .await;
733
734 let pov = rx.await.map_err(|_| Error::FetchPoV)?;
735 Ok(Arc::new(pov))
736}
737
738async fn request_candidate_validation(
739 sender: &mut impl overseer::CandidateBackingSenderTrait,
740 validation_data: PersistedValidationData,
741 validation_code: ValidationCode,
742 candidate_receipt: CandidateReceipt,
743 pov: Arc<PoV>,
744 executor_params: ExecutorParams,
745) -> Result<ValidationResult, Error> {
746 let (tx, rx) = oneshot::channel();
747 let is_system = candidate_receipt.descriptor.para_id().is_system();
748 let relay_parent = candidate_receipt.descriptor.relay_parent();
749
750 sender
751 .send_message(CandidateValidationMessage::ValidateFromExhaustive {
752 validation_data,
753 validation_code,
754 candidate_receipt,
755 pov,
756 executor_params,
757 exec_kind: if is_system {
758 PvfExecKind::BackingSystemParas(relay_parent)
759 } else {
760 PvfExecKind::Backing(relay_parent)
761 },
762 response_sender: tx,
763 })
764 .await;
765
766 match rx.await {
767 Ok(Ok(validation_result)) => Ok(validation_result),
768 Ok(Err(err)) => Err(Error::ValidationFailed(err)),
769 Err(err) => Err(Error::ValidateFromExhaustive(err)),
770 }
771}
772
773struct BackgroundValidationOutputs {
774 candidate: CandidateReceipt,
775 commitments: CandidateCommitments,
776 persisted_validation_data: PersistedValidationData,
777}
778
779type BackgroundValidationResult = Result<BackgroundValidationOutputs, CandidateReceipt>;
780
781struct BackgroundValidationParams<S: overseer::CandidateBackingSenderTrait, F> {
782 sender: S,
783 tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
784 candidate: CandidateReceipt,
785 relay_parent: Hash,
786 node_features: NodeFeatures,
787 executor_params: Arc<ExecutorParams>,
788 persisted_validation_data: PersistedValidationData,
789 pov: PoVData,
790 n_validators: usize,
791 make_command: F,
792}
793
794async fn validate_and_make_available(
795 params: BackgroundValidationParams<
796 impl overseer::CandidateBackingSenderTrait,
797 impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
798 >,
799 core_index: CoreIndex,
800) -> Result<(), Error> {
801 let BackgroundValidationParams {
802 mut sender,
803 mut tx_command,
804 candidate,
805 relay_parent,
806 node_features,
807 executor_params,
808 persisted_validation_data,
809 pov,
810 n_validators,
811 make_command,
812 } = params;
813
814 let validation_code = {
815 let validation_code_hash = candidate.descriptor().validation_code_hash();
816 let (tx, rx) = oneshot::channel();
817 sender
818 .send_message(RuntimeApiMessage::Request(
819 relay_parent,
820 RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
821 ))
822 .await;
823
824 let code = rx.await.map_err(Error::RuntimeApiUnavailable)?;
825 match code {
826 Err(e) => return Err(Error::FetchValidationCode(validation_code_hash, e)),
827 Ok(None) => return Err(Error::NoValidationCode(validation_code_hash)),
828 Ok(Some(c)) => c,
829 }
830 };
831
832 let pov = match pov {
833 PoVData::Ready(pov) => pov,
834 PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } =>
835 match request_pov(
836 &mut sender,
837 relay_parent,
838 from_validator,
839 candidate.descriptor.para_id(),
840 candidate_hash,
841 pov_hash,
842 )
843 .await
844 {
845 Err(Error::FetchPoV) => {
846 tx_command
847 .send((
848 relay_parent,
849 ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
850 ))
851 .await
852 .map_err(Error::BackgroundValidationMpsc)?;
853 return Ok(())
854 },
855 Err(err) => return Err(err),
856 Ok(pov) => pov,
857 },
858 };
859
860 let v = {
861 request_candidate_validation(
862 &mut sender,
863 persisted_validation_data,
864 validation_code,
865 candidate.clone(),
866 pov.clone(),
867 executor_params.as_ref().clone(),
868 )
869 .await?
870 };
871
872 let res = match v {
873 ValidationResult::Valid(commitments, validation_data) => {
874 gum::debug!(
875 target: LOG_TARGET,
876 candidate_hash = ?candidate.hash(),
877 "Validation successful",
878 );
879
880 let erasure_valid = make_pov_available(
881 &mut sender,
882 n_validators,
883 pov.clone(),
884 candidate.hash(),
885 validation_data.clone(),
886 candidate.descriptor.erasure_root(),
887 core_index,
888 node_features,
889 )
890 .await;
891
892 match erasure_valid {
893 Ok(()) => Ok(BackgroundValidationOutputs {
894 candidate,
895 commitments,
896 persisted_validation_data: validation_data,
897 }),
898 Err(Error::StoreAvailableData(StoreAvailableDataError::InvalidErasureRoot)) => {
899 gum::debug!(
900 target: LOG_TARGET,
901 candidate_hash = ?candidate.hash(),
902 actual_commitments = ?commitments,
903 "Erasure root doesn't match the announced by the candidate receipt",
904 );
905 Err(candidate)
906 },
907 Err(e) => return Err(e),
909 }
910 },
911 ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch) => {
912 gum::warn!(
914 target: LOG_TARGET,
915 candidate_hash = ?candidate.hash(),
916 "Validation yielded different commitments",
917 );
918 Err(candidate)
919 },
920 ValidationResult::Invalid(reason) => {
921 gum::warn!(
922 target: LOG_TARGET,
923 candidate_hash = ?candidate.hash(),
924 reason = ?reason,
925 "Validation yielded an invalid candidate",
926 );
927 Err(candidate)
928 },
929 };
930
931 tx_command.send((relay_parent, make_command(res))).await.map_err(Into::into)
932}
933
934#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
935async fn handle_communication<Context>(
936 ctx: &mut Context,
937 state: &mut State,
938 message: CandidateBackingMessage,
939 metrics: &Metrics,
940) -> Result<(), Error> {
941 match message {
942 CandidateBackingMessage::Second(_relay_parent, candidate, pvd, pov) => {
943 handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
944 },
945 CandidateBackingMessage::Statement(relay_parent, statement) => {
946 handle_statement_message(ctx, state, relay_parent, statement, metrics).await?;
947 },
948 CandidateBackingMessage::GetBackableCandidates(requested_candidates, tx) =>
949 handle_get_backable_candidates_message(state, requested_candidates, tx, metrics)?,
950 CandidateBackingMessage::CanSecond(request, tx) =>
951 handle_can_second_request(ctx, state, request, tx).await,
952 }
953
954 Ok(())
955}
956
957#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
958async fn handle_active_leaves_update<Context>(
959 ctx: &mut Context,
960 update: ActiveLeavesUpdate,
961 state: &mut State,
962) -> Result<(), Error> {
963 let res = if let Some(leaf) = update.activated {
966 let leaf_hash = leaf.hash;
967 Some((leaf, state.implicit_view.activate_leaf(ctx.sender(), leaf_hash).await.map(|_| ())))
968 } else {
969 None
970 };
971
972 for deactivated in update.deactivated {
973 state.implicit_view.deactivate_leaf(deactivated);
974 }
975
976 {
980 let remaining: HashSet<_> = state.implicit_view.all_allowed_relay_parents().collect();
981
982 state.per_relay_parent.retain(|r, _| remaining.contains(&r));
983 }
984
985 state
988 .per_candidate
989 .retain(|_, pc| state.per_relay_parent.contains_key(&pc.relay_parent));
990
991 let fresh_relay_parents = match res {
994 None => return Ok(()),
995 Some((leaf, Ok(_))) => {
996 let fresh_relay_parents =
997 state.implicit_view.known_allowed_relay_parents_under(&leaf.hash, None);
998
999 let fresh_relay_parent = match fresh_relay_parents {
1000 Some(f) => f.to_vec(),
1001 None => {
1002 gum::warn!(
1003 target: LOG_TARGET,
1004 leaf_hash = ?leaf.hash,
1005 "Implicit view gave no relay-parents"
1006 );
1007
1008 vec![leaf.hash]
1009 },
1010 };
1011 fresh_relay_parent
1012 },
1013 Some((leaf, Err(e))) => {
1014 gum::debug!(
1015 target: LOG_TARGET,
1016 leaf_hash = ?leaf.hash,
1017 err = ?e,
1018 "Failed to load implicit view for leaf."
1019 );
1020
1021 return Ok(())
1022 },
1023 };
1024
1025 for maybe_new in fresh_relay_parents {
1027 if state.per_relay_parent.contains_key(&maybe_new) {
1028 continue
1029 }
1030
1031 let per = construct_per_relay_parent_state(
1034 ctx,
1035 maybe_new,
1036 &state.keystore,
1037 &mut state.per_session_cache,
1038 )
1039 .await?;
1040
1041 if let Some(per) = per {
1042 state.per_relay_parent.insert(maybe_new, per);
1043 }
1044 }
1045
1046 Ok(())
1047}
1048
1049macro_rules! try_runtime_api {
1050 ($x: expr) => {
1051 match $x {
1052 Ok(x) => x,
1053 Err(err) => {
1054 error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
1056
1057 return Ok(None)
1061 },
1062 }
1063 };
1064}
1065
1066fn core_index_from_statement(
1067 validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
1068 group_rotation_info: &GroupRotationInfo,
1069 n_cores: u32,
1070 claim_queue: &ClaimQueueSnapshot,
1071 statement: &SignedFullStatementWithPVD,
1072) -> Option<CoreIndex> {
1073 let compact_statement = statement.as_unchecked();
1074 let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
1075
1076 gum::trace!(
1077 target:LOG_TARGET,
1078 ?group_rotation_info,
1079 ?statement,
1080 ?validator_to_group,
1081 n_cores,
1082 ?candidate_hash,
1083 "Extracting core index from statement"
1084 );
1085
1086 let statement_validator_index = statement.validator_index();
1087 let Some(Some(group_index)) = validator_to_group.get(statement_validator_index) else {
1088 gum::debug!(
1089 target: LOG_TARGET,
1090 ?group_rotation_info,
1091 ?statement,
1092 ?validator_to_group,
1093 n_cores,
1094 ?candidate_hash,
1095 "Invalid validator index: {:?}",
1096 statement_validator_index
1097 );
1098 return None
1099 };
1100
1101 let core_index = group_rotation_info.core_for_group(*group_index, n_cores as _);
1103
1104 if core_index.0 > n_cores {
1105 gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
1106 return None
1107 }
1108
1109 if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
1110 let candidate_para_id = candidate.descriptor.para_id();
1111 let mut assigned_paras = claim_queue.iter_claims_for_core(&core_index);
1112
1113 if !assigned_paras.any(|id| id == &candidate_para_id) {
1114 gum::debug!(
1115 target: LOG_TARGET,
1116 ?candidate_hash,
1117 ?core_index,
1118 assigned_paras = ?claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
1119 ?candidate_para_id,
1120 "Invalid CoreIndex, core is not assigned to this para_id"
1121 );
1122 return None
1123 }
1124 return Some(core_index)
1125 } else {
1126 return Some(core_index)
1127 }
1128}
1129
1130#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1132async fn construct_per_relay_parent_state<Context>(
1133 ctx: &mut Context,
1134 relay_parent: Hash,
1135 keystore: &KeystorePtr,
1136 per_session_cache: &mut PerSessionCache,
1137) -> Result<Option<PerRelayParentState>, Error> {
1138 let parent = relay_parent;
1139
1140 let (session_index, groups, claim_queue, disabled_validators) = futures::try_join!(
1141 request_session_index_for_child(parent, ctx.sender()).await,
1142 request_validator_groups(parent, ctx.sender()).await,
1143 request_claim_queue(parent, ctx.sender()).await,
1144 request_disabled_validators(parent, ctx.sender()).await,
1145 )
1146 .map_err(Error::JoinMultiple)?;
1147
1148 let session_index = try_runtime_api!(session_index);
1149
1150 let validators = per_session_cache.validators(session_index, parent, ctx.sender()).await;
1151 let validators = try_runtime_api!(validators);
1152
1153 let node_features = per_session_cache.node_features(session_index, parent, ctx.sender()).await;
1154 let node_features = try_runtime_api!(node_features);
1155
1156 let executor_params =
1157 per_session_cache.executor_params(session_index, parent, ctx.sender()).await;
1158 let executor_params = try_runtime_api!(executor_params);
1159
1160 gum::debug!(target: LOG_TARGET, ?parent, "New state");
1161
1162 let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
1163
1164 let minimum_backing_votes = per_session_cache
1165 .minimum_backing_votes(session_index, parent, ctx.sender())
1166 .await;
1167 let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);
1168 let claim_queue = try_runtime_api!(claim_queue);
1169 let disabled_validators = try_runtime_api!(disabled_validators);
1170
1171 let signing_context = SigningContext { parent_hash: parent, session_index };
1172 let validator = match Validator::construct(
1173 &validators,
1174 &disabled_validators,
1175 signing_context.clone(),
1176 keystore.clone(),
1177 ) {
1178 Ok(v) => Some(v),
1179 Err(util::Error::NotAValidator) => None,
1180 Err(e) => {
1181 gum::warn!(
1182 target: LOG_TARGET,
1183 err = ?e,
1184 "Cannot participate in candidate backing",
1185 );
1186
1187 return Ok(None)
1188 },
1189 };
1190
1191 let n_cores = validator_groups.len();
1192
1193 let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
1194 let mut assigned_core = None;
1195
1196 for idx in 0..n_cores {
1197 let core_index = CoreIndex(idx as _);
1198
1199 if !claim_queue.contains_key(&core_index) {
1200 continue
1201 }
1202
1203 let group_index = group_rotation_info.group_for_core(core_index, n_cores);
1204 if let Some(g) = validator_groups.get(group_index.0 as usize) {
1205 if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
1206 assigned_core = Some(core_index);
1207 }
1208 groups.insert(core_index, g.clone());
1209 }
1210 }
1211 gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
1212
1213 let validator_to_group =
1214 per_session_cache.validator_to_group(session_index, &validators, &validator_groups);
1215
1216 let table_context =
1217 TableContext { validator, groups, validators: validators.to_vec(), disabled_validators };
1218
1219 Ok(Some(PerRelayParentState {
1220 parent,
1221 node_features,
1222 executor_params,
1223 assigned_core,
1224 backed: HashSet::new(),
1225 table: Table::new(),
1226 table_context,
1227 issued_statements: HashSet::new(),
1228 awaiting_validation: HashSet::new(),
1229 fallbacks: HashMap::new(),
1230 minimum_backing_votes,
1231 n_cores: validator_groups.len() as u32,
1232 claim_queue: ClaimQueueSnapshot::from(claim_queue),
1233 validator_to_group,
1234 group_rotation_info,
1235 }))
1236}
1237
1238enum SecondingAllowed {
1239 No,
1240 Yes(Vec<Hash>),
1242}
1243
1244#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1247async fn seconding_sanity_check<Context>(
1248 ctx: &mut Context,
1249 implicit_view: &ImplicitView,
1250 hypothetical_candidate: HypotheticalCandidate,
1251) -> SecondingAllowed {
1252 let mut leaves_for_seconding = Vec::new();
1253 let mut responses = FuturesOrdered::<BoxFuture<'_, Result<_, oneshot::Canceled>>>::new();
1254
1255 let candidate_para = hypothetical_candidate.candidate_para();
1256 let candidate_relay_parent = hypothetical_candidate.relay_parent();
1257 let candidate_hash = hypothetical_candidate.candidate_hash();
1258
1259 for head in implicit_view.leaves() {
1260 let allowed_parents_for_para =
1263 implicit_view.known_allowed_relay_parents_under(head, Some(candidate_para));
1264 if !allowed_parents_for_para.unwrap_or_default().contains(&candidate_relay_parent) {
1265 continue
1266 }
1267
1268 let (tx, rx) = oneshot::channel();
1269 ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
1270 HypotheticalMembershipRequest {
1271 candidates: vec![hypothetical_candidate.clone()],
1272 fragment_chain_relay_parent: Some(*head),
1273 },
1274 tx,
1275 ))
1276 .await;
1277 let response = rx.map_ok(move |candidate_memberships| {
1278 let is_member_or_potential = candidate_memberships
1279 .into_iter()
1280 .find_map(|(candidate, leaves)| {
1281 (candidate.candidate_hash() == candidate_hash).then_some(leaves)
1282 })
1283 .and_then(|leaves| leaves.into_iter().find(|leaf| leaf == head))
1284 .is_some();
1285
1286 (is_member_or_potential, head)
1287 });
1288 responses.push_back(response.boxed());
1289 }
1290
1291 if responses.is_empty() {
1292 return SecondingAllowed::No
1293 }
1294
1295 while let Some(response) = responses.next().await {
1296 match response {
1297 Err(oneshot::Canceled) => {
1298 gum::warn!(
1299 target: LOG_TARGET,
1300 "Failed to reach prospective parachains subsystem for hypothetical membership",
1301 );
1302
1303 return SecondingAllowed::No
1304 },
1305 Ok((is_member_or_potential, head)) => match is_member_or_potential {
1306 false => {
1307 gum::debug!(
1308 target: LOG_TARGET,
1309 ?candidate_hash,
1310 leaf_hash = ?head,
1311 "Refusing to second candidate at leaf. Is not a potential member.",
1312 );
1313 },
1314 true => {
1315 leaves_for_seconding.push(*head);
1316 },
1317 },
1318 }
1319 }
1320
1321 if leaves_for_seconding.is_empty() {
1322 SecondingAllowed::No
1323 } else {
1324 SecondingAllowed::Yes(leaves_for_seconding)
1325 }
1326}
1327
1328#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1330async fn handle_can_second_request<Context>(
1331 ctx: &mut Context,
1332 state: &State,
1333 request: CanSecondRequest,
1334 tx: oneshot::Sender<bool>,
1335) {
1336 let relay_parent = request.candidate_relay_parent;
1337 let response = if state.per_relay_parent.get(&relay_parent).is_some() {
1338 let hypothetical_candidate = HypotheticalCandidate::Incomplete {
1339 candidate_hash: request.candidate_hash,
1340 candidate_para: request.candidate_para_id,
1341 parent_head_data_hash: request.parent_head_data_hash,
1342 candidate_relay_parent: relay_parent,
1343 };
1344
1345 let result =
1346 seconding_sanity_check(ctx, &state.implicit_view, hypothetical_candidate).await;
1347
1348 match result {
1349 SecondingAllowed::No => false,
1350 SecondingAllowed::Yes(leaves) => !leaves.is_empty(),
1351 }
1352 } else {
1353 false
1355 };
1356
1357 let _ = tx.send(response);
1358}
1359
1360#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1361async fn handle_validated_candidate_command<Context>(
1362 ctx: &mut Context,
1363 state: &mut State,
1364 relay_parent: Hash,
1365 command: ValidatedCandidateCommand,
1366 metrics: &Metrics,
1367) -> Result<(), Error> {
1368 match state.per_relay_parent.get_mut(&relay_parent) {
1369 Some(rp_state) => {
1370 let candidate_hash = command.candidate_hash();
1371 rp_state.awaiting_validation.remove(&candidate_hash);
1372
1373 match command {
1374 ValidatedCandidateCommand::Second(res) => match res {
1375 Ok(outputs) => {
1376 let BackgroundValidationOutputs {
1377 candidate,
1378 commitments,
1379 persisted_validation_data,
1380 } = outputs;
1381
1382 if rp_state.issued_statements.contains(&candidate_hash) {
1383 return Ok(())
1384 }
1385
1386 let receipt = CommittedCandidateReceipt {
1387 descriptor: candidate.descriptor.clone(),
1388 commitments,
1389 };
1390
1391 let hypothetical_candidate = HypotheticalCandidate::Complete {
1392 candidate_hash,
1393 receipt: Arc::new(receipt.clone()),
1394 persisted_validation_data: persisted_validation_data.clone(),
1395 };
1396 if let SecondingAllowed::No = seconding_sanity_check(
1400 ctx,
1401 &state.implicit_view,
1402 hypothetical_candidate,
1403 )
1404 .await
1405 {
1406 return Ok(())
1407 };
1408
1409 let statement =
1410 StatementWithPVD::Seconded(receipt, persisted_validation_data);
1411
1412 let res = sign_import_and_distribute_statement(
1416 ctx,
1417 rp_state,
1418 &mut state.per_candidate,
1419 statement,
1420 state.keystore.clone(),
1421 metrics,
1422 )
1423 .await;
1424
1425 if let Err(Error::RejectedByProspectiveParachains) = res {
1426 let candidate_hash = candidate.hash();
1427 gum::debug!(
1428 target: LOG_TARGET,
1429 relay_parent = ?candidate.descriptor().relay_parent(),
1430 ?candidate_hash,
1431 "Attempted to second candidate but was rejected by prospective parachains",
1432 );
1433
1434 ctx.send_message(CollatorProtocolMessage::Invalid(
1436 candidate.descriptor().relay_parent(),
1437 candidate,
1438 ))
1439 .await;
1440
1441 return Ok(())
1442 }
1443
1444 if let Some(stmt) = res? {
1445 match state.per_candidate.get_mut(&candidate_hash) {
1446 None => {
1447 gum::warn!(
1448 target: LOG_TARGET,
1449 ?candidate_hash,
1450 "Missing `per_candidate` for seconded candidate.",
1451 );
1452 },
1453 Some(p) => p.seconded_locally = true,
1454 }
1455
1456 rp_state.issued_statements.insert(candidate_hash);
1457
1458 metrics.on_candidate_seconded();
1459 ctx.send_message(CollatorProtocolMessage::Seconded(
1460 rp_state.parent,
1461 StatementWithPVD::drop_pvd_from_signed(stmt),
1462 ))
1463 .await;
1464 }
1465 },
1466 Err(candidate) => {
1467 ctx.send_message(CollatorProtocolMessage::Invalid(
1468 rp_state.parent,
1469 candidate,
1470 ))
1471 .await;
1472 },
1473 },
1474 ValidatedCandidateCommand::Attest(res) => {
1475 rp_state.fallbacks.remove(&candidate_hash);
1477 if !rp_state.issued_statements.contains(&candidate_hash) {
1479 if res.is_ok() {
1480 let statement = StatementWithPVD::Valid(candidate_hash);
1481
1482 sign_import_and_distribute_statement(
1483 ctx,
1484 rp_state,
1485 &mut state.per_candidate,
1486 statement,
1487 state.keystore.clone(),
1488 metrics,
1489 )
1490 .await?;
1491 }
1492 rp_state.issued_statements.insert(candidate_hash);
1493 }
1494 },
1495 ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
1496 if let Some(attesting) = rp_state.fallbacks.get_mut(&candidate_hash) {
1497 if let Some(index) = attesting.backing.pop() {
1498 attesting.from_validator = index;
1499 let attesting = attesting.clone();
1500
1501 if let Some(pvd) = state
1507 .per_candidate
1508 .get(&candidate_hash)
1509 .map(|pc| pc.persisted_validation_data.clone())
1510 {
1511 kick_off_validation_work(
1512 ctx,
1513 rp_state,
1514 pvd,
1515 &state.background_validation_tx,
1516 attesting,
1517 )
1518 .await?;
1519 }
1520 }
1521 } else {
1522 gum::warn!(
1523 target: LOG_TARGET,
1524 "AttestNoPoV was triggered without fallback being available."
1525 );
1526 debug_assert!(false);
1527 }
1528 },
1529 }
1530 },
1531 None => {
1532 },
1535 }
1536
1537 Ok(())
1538}
1539
1540fn sign_statement(
1541 rp_state: &PerRelayParentState,
1542 statement: StatementWithPVD,
1543 keystore: KeystorePtr,
1544 metrics: &Metrics,
1545) -> Option<SignedFullStatementWithPVD> {
1546 let signed = rp_state
1547 .table_context
1548 .validator
1549 .as_ref()?
1550 .sign(keystore, statement)
1551 .ok()
1552 .flatten()?;
1553 metrics.on_statement_signed();
1554 Some(signed)
1555}
1556
1557#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1565async fn import_statement<Context>(
1566 ctx: &mut Context,
1567 rp_state: &mut PerRelayParentState,
1568 per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1569 statement: &SignedFullStatementWithPVD,
1570) -> Result<Option<TableSummary>, Error> {
1571 let candidate_hash = statement.payload().candidate_hash();
1572
1573 gum::debug!(
1574 target: LOG_TARGET,
1575 statement = ?statement.payload().to_compact(),
1576 validator_index = statement.validator_index().0,
1577 ?candidate_hash,
1578 "Importing statement",
1579 );
1580
1581 if let StatementWithPVD::Seconded(candidate, pvd) = statement.payload() {
1594 if !per_candidate.contains_key(&candidate_hash) {
1595 let (tx, rx) = oneshot::channel();
1596 ctx.send_message(ProspectiveParachainsMessage::IntroduceSecondedCandidate(
1597 IntroduceSecondedCandidateRequest {
1598 candidate_para: candidate.descriptor.para_id(),
1599 candidate_receipt: candidate.clone(),
1600 persisted_validation_data: pvd.clone(),
1601 },
1602 tx,
1603 ))
1604 .await;
1605
1606 match rx.await {
1607 Err(oneshot::Canceled) => {
1608 gum::warn!(
1609 target: LOG_TARGET,
1610 "Could not reach the Prospective Parachains subsystem."
1611 );
1612
1613 return Err(Error::RejectedByProspectiveParachains)
1614 },
1615 Ok(false) => return Err(Error::RejectedByProspectiveParachains),
1616 Ok(true) => {},
1617 }
1618
1619 per_candidate.insert(
1621 candidate_hash,
1622 PerCandidateState {
1623 persisted_validation_data: pvd.clone(),
1624 seconded_locally: false,
1626 relay_parent: candidate.descriptor.relay_parent(),
1627 },
1628 );
1629 }
1630 }
1631
1632 let stmt = primitive_statement_to_table(statement);
1633
1634 let core = core_index_from_statement(
1635 &rp_state.validator_to_group,
1636 &rp_state.group_rotation_info,
1637 rp_state.n_cores,
1638 &rp_state.claim_queue,
1639 statement,
1640 )
1641 .ok_or(Error::CoreIndexUnavailable)?;
1642
1643 Ok(rp_state.table.import_statement(&rp_state.table_context, core, stmt))
1644}
1645
1646#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1649async fn post_import_statement_actions<Context>(
1650 ctx: &mut Context,
1651 rp_state: &mut PerRelayParentState,
1652 summary: Option<&TableSummary>,
1653) {
1654 if let Some(attested) = summary.as_ref().and_then(|s| {
1655 rp_state.table.attested_candidate(
1656 &s.candidate,
1657 &rp_state.table_context,
1658 rp_state.minimum_backing_votes,
1659 )
1660 }) {
1661 let candidate_hash = attested.candidate.hash();
1662
1663 if rp_state.backed.insert(candidate_hash) {
1665 if let Some(backed) = table_attested_to_backed(attested, &rp_state.table_context) {
1666 let para_id = backed.candidate().descriptor.para_id();
1667 gum::debug!(
1668 target: LOG_TARGET,
1669 candidate_hash = ?candidate_hash,
1670 relay_parent = ?rp_state.parent,
1671 %para_id,
1672 "Candidate backed",
1673 );
1674
1675 ctx.send_message(ProspectiveParachainsMessage::CandidateBacked(
1678 para_id,
1679 candidate_hash,
1680 ))
1681 .await;
1682 ctx.send_message(StatementDistributionMessage::Backed(candidate_hash)).await;
1684 } else {
1685 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
1686 }
1687 } else {
1688 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
1689 }
1690 } else {
1691 gum::debug!(target: LOG_TARGET, "No attested candidate");
1692 }
1693
1694 issue_new_misbehaviors(ctx, rp_state.parent, &mut rp_state.table);
1695}
1696
1697#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1699fn issue_new_misbehaviors<Context>(
1700 ctx: &mut Context,
1701 relay_parent: Hash,
1702 table: &mut Table<TableContext>,
1703) {
1704 let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
1706 for (validator_id, report) in misbehaviors {
1707 ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
1713 relay_parent,
1714 ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
1715 ));
1716 }
1717}
1718
1719#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1721async fn sign_import_and_distribute_statement<Context>(
1722 ctx: &mut Context,
1723 rp_state: &mut PerRelayParentState,
1724 per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1725 statement: StatementWithPVD,
1726 keystore: KeystorePtr,
1727 metrics: &Metrics,
1728) -> Result<Option<SignedFullStatementWithPVD>, Error> {
1729 if let Some(signed_statement) = sign_statement(&*rp_state, statement, keystore, metrics) {
1730 let summary = import_statement(ctx, rp_state, per_candidate, &signed_statement).await?;
1731
1732 let smsg = StatementDistributionMessage::Share(rp_state.parent, signed_statement.clone());
1735 ctx.send_unbounded_message(smsg);
1736
1737 post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1738
1739 Ok(Some(signed_statement))
1740 } else {
1741 Ok(None)
1742 }
1743}
1744
1745#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1746async fn background_validate_and_make_available<Context>(
1747 ctx: &mut Context,
1748 rp_state: &mut PerRelayParentState,
1749 params: BackgroundValidationParams<
1750 impl overseer::CandidateBackingSenderTrait,
1751 impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
1752 >,
1753) -> Result<(), Error> {
1754 let candidate_hash = params.candidate.hash();
1755 let Some(core_index) = rp_state.assigned_core else { return Ok(()) };
1756 if rp_state.awaiting_validation.insert(candidate_hash) {
1757 let bg = async move {
1759 if let Err(error) = validate_and_make_available(params, core_index).await {
1760 if let Error::BackgroundValidationMpsc(error) = error {
1761 gum::debug!(
1762 target: LOG_TARGET,
1763 ?candidate_hash,
1764 ?error,
1765 "Mpsc background validation mpsc died during validation- leaf no longer active?"
1766 );
1767 } else {
1768 gum::error!(
1769 target: LOG_TARGET,
1770 ?candidate_hash,
1771 ?error,
1772 "Failed to validate and make available",
1773 );
1774 }
1775 }
1776 };
1777
1778 ctx.spawn("backing-validation", bg.boxed())
1779 .map_err(|_| Error::FailedToSpawnBackgroundTask)?;
1780 }
1781
1782 Ok(())
1783}
1784
1785#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1787async fn kick_off_validation_work<Context>(
1788 ctx: &mut Context,
1789 rp_state: &mut PerRelayParentState,
1790 persisted_validation_data: PersistedValidationData,
1791 background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1792 attesting: AttestingData,
1793) -> Result<(), Error> {
1794 match rp_state.table_context.local_validator_is_disabled() {
1796 Some(true) => {
1797 gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
1798 return Ok(())
1799 },
1800 Some(false) => {}, None => {
1802 gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
1803 return Ok(())
1804 },
1805 }
1806
1807 let candidate_hash = attesting.candidate.hash();
1808 if rp_state.issued_statements.contains(&candidate_hash) {
1809 return Ok(())
1810 }
1811
1812 gum::debug!(
1813 target: LOG_TARGET,
1814 candidate_hash = ?candidate_hash,
1815 candidate_receipt = ?attesting.candidate,
1816 "Kicking off validation",
1817 );
1818
1819 let bg_sender = ctx.sender().clone();
1820 let pov = PoVData::FetchFromValidator {
1821 from_validator: attesting.from_validator,
1822 candidate_hash,
1823 pov_hash: attesting.pov_hash,
1824 };
1825
1826 background_validate_and_make_available(
1827 ctx,
1828 rp_state,
1829 BackgroundValidationParams {
1830 sender: bg_sender,
1831 tx_command: background_validation_tx.clone(),
1832 candidate: attesting.candidate,
1833 relay_parent: rp_state.parent,
1834 node_features: rp_state.node_features.clone(),
1835 executor_params: Arc::clone(&rp_state.executor_params),
1836 persisted_validation_data,
1837 pov,
1838 n_validators: rp_state.table_context.validators.len(),
1839 make_command: ValidatedCandidateCommand::Attest,
1840 },
1841 )
1842 .await
1843}
1844
1845#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1847async fn maybe_validate_and_import<Context>(
1848 ctx: &mut Context,
1849 state: &mut State,
1850 relay_parent: Hash,
1851 statement: SignedFullStatementWithPVD,
1852) -> Result<(), Error> {
1853 let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
1854 Some(r) => r,
1855 None => {
1856 gum::trace!(
1857 target: LOG_TARGET,
1858 ?relay_parent,
1859 "Received statement for unknown relay-parent"
1860 );
1861
1862 return Ok(())
1863 },
1864 };
1865
1866 if rp_state.table_context.validator_is_disabled(&statement.validator_index()) {
1868 gum::debug!(
1869 target: LOG_TARGET,
1870 sender_validator_idx = ?statement.validator_index(),
1871 "Not importing statement because the sender is disabled"
1872 );
1873 return Ok(())
1874 }
1875
1876 let res = import_statement(ctx, rp_state, &mut state.per_candidate, &statement).await;
1877
1878 if let Err(Error::RejectedByProspectiveParachains) = res {
1881 gum::debug!(
1882 target: LOG_TARGET,
1883 ?relay_parent,
1884 "Statement rejected by prospective parachains."
1885 );
1886
1887 return Ok(())
1888 }
1889
1890 let summary = res?;
1891 post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1892
1893 if let Some(summary) = summary {
1894 let candidate_hash = summary.candidate;
1899
1900 if Some(summary.group_id) != rp_state.assigned_core {
1901 return Ok(())
1902 }
1903
1904 let attesting = match statement.payload() {
1905 StatementWithPVD::Seconded(receipt, _) => {
1906 let attesting = AttestingData {
1907 candidate: rp_state
1908 .table
1909 .get_candidate(&candidate_hash)
1910 .ok_or(Error::CandidateNotFound)?
1911 .to_plain(),
1912 pov_hash: receipt.descriptor.pov_hash(),
1913 from_validator: statement.validator_index(),
1914 backing: Vec::new(),
1915 };
1916 rp_state.fallbacks.insert(summary.candidate, attesting.clone());
1917 attesting
1918 },
1919 StatementWithPVD::Valid(candidate_hash) => {
1920 if let Some(attesting) = rp_state.fallbacks.get_mut(candidate_hash) {
1921 let our_index = rp_state.table_context.validator.as_ref().map(|v| v.index());
1922 if our_index == Some(statement.validator_index()) {
1923 return Ok(())
1924 }
1925
1926 if rp_state.awaiting_validation.contains(candidate_hash) {
1927 attesting.backing.push(statement.validator_index());
1929 return Ok(())
1930 } else {
1931 attesting.from_validator = statement.validator_index();
1933 attesting.clone()
1934 }
1935 } else {
1936 return Ok(())
1937 }
1938 },
1939 };
1940
1941 if let Some(pvd) = state
1944 .per_candidate
1945 .get(&candidate_hash)
1946 .map(|pc| pc.persisted_validation_data.clone())
1947 {
1948 kick_off_validation_work(
1949 ctx,
1950 rp_state,
1951 pvd,
1952 &state.background_validation_tx,
1953 attesting,
1954 )
1955 .await?;
1956 }
1957 }
1958 Ok(())
1959}
1960
1961#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1963async fn validate_and_second<Context>(
1964 ctx: &mut Context,
1965 rp_state: &mut PerRelayParentState,
1966 persisted_validation_data: PersistedValidationData,
1967 candidate: &CandidateReceipt,
1968 pov: Arc<PoV>,
1969 background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1970) -> Result<(), Error> {
1971 let candidate_hash = candidate.hash();
1972
1973 gum::debug!(
1974 target: LOG_TARGET,
1975 candidate_hash = ?candidate_hash,
1976 candidate_receipt = ?candidate,
1977 "Validate and second candidate",
1978 );
1979
1980 let bg_sender = ctx.sender().clone();
1981 background_validate_and_make_available(
1982 ctx,
1983 rp_state,
1984 BackgroundValidationParams {
1985 sender: bg_sender,
1986 tx_command: background_validation_tx.clone(),
1987 candidate: candidate.clone(),
1988 relay_parent: rp_state.parent,
1989 node_features: rp_state.node_features.clone(),
1990 executor_params: Arc::clone(&rp_state.executor_params),
1991 persisted_validation_data,
1992 pov: PoVData::Ready(pov),
1993 n_validators: rp_state.table_context.validators.len(),
1994 make_command: ValidatedCandidateCommand::Second,
1995 },
1996 )
1997 .await?;
1998
1999 Ok(())
2000}
2001
2002#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2003async fn handle_second_message<Context>(
2004 ctx: &mut Context,
2005 state: &mut State,
2006 candidate: CandidateReceipt,
2007 persisted_validation_data: PersistedValidationData,
2008 pov: PoV,
2009 metrics: &Metrics,
2010) -> Result<(), Error> {
2011 let _timer = metrics.time_process_second();
2012
2013 let candidate_hash = candidate.hash();
2014 let relay_parent = candidate.descriptor().relay_parent();
2015
2016 if candidate.descriptor().persisted_validation_data_hash() != persisted_validation_data.hash() {
2017 gum::warn!(
2018 target: LOG_TARGET,
2019 ?candidate_hash,
2020 "Candidate backing was asked to second candidate with wrong PVD",
2021 );
2022
2023 return Ok(())
2024 }
2025
2026 let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
2027 None => {
2028 gum::trace!(
2029 target: LOG_TARGET,
2030 ?relay_parent,
2031 ?candidate_hash,
2032 "We were asked to second a candidate outside of our view."
2033 );
2034
2035 return Ok(())
2036 },
2037 Some(r) => r,
2038 };
2039
2040 if rp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
2043 gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
2044 return Ok(())
2045 }
2046
2047 let assigned_paras = rp_state.assigned_core.and_then(|core| rp_state.claim_queue.0.get(&core));
2048
2049 if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id())) {
2051 gum::debug!(
2052 target: LOG_TARGET,
2053 our_assignment_core = ?rp_state.assigned_core,
2054 our_assignment_paras = ?assigned_paras,
2055 collation = ?candidate.descriptor().para_id(),
2056 "Subsystem asked to second for para outside of our assignment",
2057 );
2058 return Ok(());
2059 }
2060
2061 gum::debug!(
2062 target: LOG_TARGET,
2063 our_assignment_core = ?rp_state.assigned_core,
2064 our_assignment_paras = ?assigned_paras,
2065 collation = ?candidate.descriptor().para_id(),
2066 "Current assignments vs collation",
2067 );
2068
2069 if !rp_state.issued_statements.contains(&candidate_hash) {
2077 let pov = Arc::new(pov);
2078
2079 validate_and_second(
2080 ctx,
2081 rp_state,
2082 persisted_validation_data,
2083 &candidate,
2084 pov,
2085 &state.background_validation_tx,
2086 )
2087 .await?;
2088 }
2089
2090 Ok(())
2091}
2092
2093#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2094async fn handle_statement_message<Context>(
2095 ctx: &mut Context,
2096 state: &mut State,
2097 relay_parent: Hash,
2098 statement: SignedFullStatementWithPVD,
2099 metrics: &Metrics,
2100) -> Result<(), Error> {
2101 let _timer = metrics.time_process_statement();
2102
2103 match maybe_validate_and_import(ctx, state, relay_parent, statement).await {
2105 Err(Error::ValidationFailed(_)) => Ok(()),
2106 Err(e) => Err(e),
2107 Ok(()) => Ok(()),
2108 }
2109}
2110
2111fn handle_get_backable_candidates_message(
2112 state: &State,
2113 requested_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>>,
2114 tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
2115 metrics: &Metrics,
2116) -> Result<(), Error> {
2117 let _timer = metrics.time_get_backed_candidates();
2118
2119 let mut backed = HashMap::with_capacity(requested_candidates.len());
2120
2121 for (para_id, para_candidates) in requested_candidates {
2122 for (candidate_hash, relay_parent) in para_candidates.iter() {
2123 let rp_state = match state.per_relay_parent.get(&relay_parent) {
2124 Some(rp_state) => rp_state,
2125 None => {
2126 gum::debug!(
2127 target: LOG_TARGET,
2128 ?relay_parent,
2129 ?candidate_hash,
2130 "Requested candidate's relay parent is out of view",
2131 );
2132 break
2133 },
2134 };
2135 let maybe_backed_candidate = rp_state
2136 .table
2137 .attested_candidate(
2138 candidate_hash,
2139 &rp_state.table_context,
2140 rp_state.minimum_backing_votes,
2141 )
2142 .and_then(|attested| table_attested_to_backed(attested, &rp_state.table_context));
2143
2144 if let Some(backed_candidate) = maybe_backed_candidate {
2145 backed
2146 .entry(para_id)
2147 .or_insert_with(|| Vec::with_capacity(para_candidates.len()))
2148 .push(backed_candidate);
2149 } else {
2150 break
2151 }
2152 }
2153 }
2154
2155 tx.send(backed).map_err(|data| Error::Send(data))?;
2156 Ok(())
2157}