1#![deny(unused_crate_dependencies, unused_results)]
24#![warn(missing_docs)]
25
26use polkadot_node_core_pvf::{
27 InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError,
28 PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost,
29};
30use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult};
31use polkadot_node_subsystem::{
32 errors::RuntimeApiError,
33 messages::{
34 CandidateValidationMessage, ChainApiMessage, PreCheckOutcome, PvfExecKind,
35 RuntimeApiMessage, RuntimeApiRequest, ValidationFailed,
36 },
37 overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
38 SubsystemSender,
39};
40use polkadot_node_subsystem_util::{
41 self as util,
42 runtime::{fetch_scheduling_lookahead, ClaimQueueSnapshot},
43};
44use polkadot_overseer::{ActivatedLeaf, ActiveLeavesUpdate};
45use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult;
46use polkadot_primitives::{
47 executor_params::{
48 DEFAULT_APPROVAL_EXECUTION_TIMEOUT, DEFAULT_BACKING_EXECUTION_TIMEOUT,
49 DEFAULT_LENIENT_PREPARATION_TIMEOUT, DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
50 },
51 transpose_claim_queue, AuthorityDiscoveryId, CandidateCommitments,
52 CandidateDescriptorV2 as CandidateDescriptor, CandidateEvent,
53 CandidateReceiptV2 as CandidateReceipt,
54 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, ExecutorParams, Hash,
55 PersistedValidationData, PvfExecKind as RuntimePvfExecKind, PvfPrepKind, SessionIndex,
56 ValidationCode, ValidationCodeHash, ValidatorId,
57};
58use sp_application_crypto::{AppCrypto, ByteArray};
59use sp_keystore::KeystorePtr;
60
61use codec::Encode;
62
63use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
64
65use std::{
66 collections::HashSet,
67 path::PathBuf,
68 pin::Pin,
69 sync::Arc,
70 time::{Duration, Instant},
71};
72
73use async_trait::async_trait;
74
75mod metrics;
76use self::metrics::Metrics;
77
78#[cfg(test)]
79mod tests;
80
81const LOG_TARGET: &'static str = "parachain::candidate-validation";
82
83#[cfg(not(test))]
87const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
88#[cfg(test)]
89const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
90
91const TASK_LIMIT: usize = 30;
94
95#[derive(Clone, Default)]
97pub struct Config {
98 pub artifacts_cache_path: PathBuf,
100 pub node_version: Option<String>,
102 pub secure_validator_mode: bool,
104 pub prep_worker_path: PathBuf,
106 pub exec_worker_path: PathBuf,
108 pub pvf_execute_workers_max_num: usize,
110 pub pvf_prepare_workers_soft_max_num: usize,
113 pub pvf_prepare_workers_hard_max_num: usize,
115}
116
117pub struct CandidateValidationSubsystem {
119 keystore: KeystorePtr,
120 #[allow(missing_docs)]
121 pub metrics: Metrics,
122 #[allow(missing_docs)]
123 pub pvf_metrics: polkadot_node_core_pvf::Metrics,
124 config: Option<Config>,
125}
126
127impl CandidateValidationSubsystem {
128 pub fn with_config(
130 config: Option<Config>,
131 keystore: KeystorePtr,
132 metrics: Metrics,
133 pvf_metrics: polkadot_node_core_pvf::Metrics,
134 ) -> Self {
135 CandidateValidationSubsystem { keystore, config, metrics, pvf_metrics }
136 }
137}
138
139#[overseer::subsystem(CandidateValidation, error=SubsystemError, prefix=self::overseer)]
140impl<Context> CandidateValidationSubsystem {
141 fn start(self, ctx: Context) -> SpawnedSubsystem {
142 if let Some(config) = self.config {
143 let future = run(ctx, self.keystore, self.metrics, self.pvf_metrics, config)
144 .map_err(|e| SubsystemError::with_origin("candidate-validation", e))
145 .boxed();
146 SpawnedSubsystem { name: "candidate-validation-subsystem", future }
147 } else {
148 polkadot_overseer::DummySubsystem.start(ctx)
149 }
150 }
151}
152
153async fn claim_queue<Sender>(relay_parent: Hash, sender: &mut Sender) -> Option<ClaimQueueSnapshot>
155where
156 Sender: SubsystemSender<RuntimeApiMessage>,
157{
158 match util::runtime::fetch_claim_queue(sender, relay_parent).await {
159 Ok(cq) => Some(cq),
160 Err(err) => {
161 gum::warn!(
162 target: LOG_TARGET,
163 ?relay_parent,
164 ?err,
165 "Claim queue not available"
166 );
167 None
168 },
169 }
170}
171
172fn handle_validation_message<S>(
173 mut sender: S,
174 validation_host: ValidationHost,
175 metrics: Metrics,
176 msg: CandidateValidationMessage,
177) -> Pin<Box<dyn Future<Output = ()> + Send>>
178where
179 S: SubsystemSender<RuntimeApiMessage>,
180{
181 match msg {
182 CandidateValidationMessage::ValidateFromExhaustive {
183 validation_data,
184 validation_code,
185 candidate_receipt,
186 pov,
187 executor_params,
188 exec_kind,
189 response_sender,
190 ..
191 } => async move {
192 let _timer = metrics.time_validate_from_exhaustive();
193 let relay_parent = candidate_receipt.descriptor.relay_parent();
194
195 let maybe_claim_queue = claim_queue(relay_parent, &mut sender).await;
196 let Some(session_index) = get_session_index(&mut sender, relay_parent).await else {
197 let error = "cannot fetch session index from the runtime";
198 gum::warn!(
199 target: LOG_TARGET,
200 ?relay_parent,
201 error,
202 );
203
204 let _ = response_sender
205 .send(Err(ValidationFailed("Session index not found".to_string())));
206 return
207 };
208
209 let Ok(validation_code_bomb_limit) = util::runtime::fetch_validation_code_bomb_limit(
212 relay_parent,
213 session_index,
214 &mut sender,
215 )
216 .await
217 else {
218 let error = "cannot fetch validation code bomb limit from the runtime";
219 gum::warn!(
220 target: LOG_TARGET,
221 ?relay_parent,
222 error,
223 );
224
225 let _ = response_sender.send(Err(ValidationFailed(
226 "Validation code bomb limit not available".to_string(),
227 )));
228 return
229 };
230
231 let res = validate_candidate_exhaustive(
232 session_index,
233 validation_host,
234 validation_data,
235 validation_code,
236 candidate_receipt,
237 pov,
238 executor_params,
239 exec_kind,
240 &metrics,
241 maybe_claim_queue,
242 validation_code_bomb_limit,
243 )
244 .await;
245
246 metrics.on_validation_event(&res);
247 let _ = response_sender.send(res);
248 }
249 .boxed(),
250 CandidateValidationMessage::PreCheck {
251 relay_parent,
252 validation_code_hash,
253 response_sender,
254 ..
255 } => async move {
256 let Some(session_index) = get_session_index(&mut sender, relay_parent).await else {
257 let error = "cannot fetch session index from the runtime";
258 gum::warn!(
259 target: LOG_TARGET,
260 ?relay_parent,
261 error,
262 );
263
264 let _ = response_sender.send(PreCheckOutcome::Failed);
265 return
266 };
267
268 let Ok(validation_code_bomb_limit) = util::runtime::fetch_validation_code_bomb_limit(
271 relay_parent,
272 session_index,
273 &mut sender,
274 )
275 .await
276 else {
277 let error = "cannot fetch validation code bomb limit from the runtime";
278 gum::warn!(
279 target: LOG_TARGET,
280 ?relay_parent,
281 error,
282 );
283
284 let _ = response_sender.send(PreCheckOutcome::Failed);
285 return
286 };
287
288 let precheck_result = precheck_pvf(
289 &mut sender,
290 validation_host,
291 relay_parent,
292 validation_code_hash,
293 validation_code_bomb_limit,
294 )
295 .await;
296
297 let _ = response_sender.send(precheck_result);
298 }
299 .boxed(),
300 }
301}
302
303#[overseer::contextbounds(CandidateValidation, prefix = self::overseer)]
304async fn run<Context>(
305 mut ctx: Context,
306 keystore: KeystorePtr,
307 metrics: Metrics,
308 pvf_metrics: polkadot_node_core_pvf::Metrics,
309 Config {
310 artifacts_cache_path,
311 node_version,
312 secure_validator_mode,
313 prep_worker_path,
314 exec_worker_path,
315 pvf_execute_workers_max_num,
316 pvf_prepare_workers_soft_max_num,
317 pvf_prepare_workers_hard_max_num,
318 }: Config,
319) -> SubsystemResult<()> {
320 let (mut validation_host, task) = polkadot_node_core_pvf::start(
321 polkadot_node_core_pvf::Config::new(
322 artifacts_cache_path,
323 node_version,
324 secure_validator_mode,
325 prep_worker_path,
326 exec_worker_path,
327 pvf_execute_workers_max_num,
328 pvf_prepare_workers_soft_max_num,
329 pvf_prepare_workers_hard_max_num,
330 ),
331 pvf_metrics,
332 )
333 .await?;
334 ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
335
336 let mut tasks = FuturesUnordered::new();
337 let mut prepare_state = PrepareValidationState::default();
338
339 loop {
340 loop {
341 futures::select! {
342 comm = ctx.recv().fuse() => {
343 match comm {
344 Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update))) => {
345 handle_active_leaves_update(
346 ctx.sender(),
347 keystore.clone(),
348 &mut validation_host,
349 update,
350 &mut prepare_state,
351 ).await
352 },
353 Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {},
354 Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
355 Ok(FromOrchestra::Communication { msg }) => {
356 let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg);
357 tasks.push(task);
358 if tasks.len() >= TASK_LIMIT {
359 break
360 }
361 },
362 Err(e) => return Err(SubsystemError::from(e)),
363 }
364 },
365 _ = tasks.select_next_some() => ()
366 }
367 }
368
369 gum::debug!(target: LOG_TARGET, "Validation task limit hit");
370
371 loop {
372 futures::select! {
373 signal = ctx.recv_signal().fuse() => {
374 match signal {
375 Ok(OverseerSignal::ActiveLeaves(_)) => {},
376 Ok(OverseerSignal::BlockFinalized(..)) => {},
377 Ok(OverseerSignal::Conclude) => return Ok(()),
378 Err(e) => return Err(SubsystemError::from(e)),
379 }
380 },
381 _ = tasks.select_next_some() => {
382 if tasks.len() < TASK_LIMIT {
383 break
384 }
385 }
386 }
387 }
388 }
389}
390
391struct PrepareValidationState {
392 session_index: Option<SessionIndex>,
393 is_next_session_authority: bool,
394 already_prepared_code_hashes: HashSet<ValidationCodeHash>,
396 per_block_limit: usize,
398}
399
400impl Default for PrepareValidationState {
401 fn default() -> Self {
402 Self {
403 session_index: None,
404 is_next_session_authority: false,
405 already_prepared_code_hashes: HashSet::new(),
406 per_block_limit: 1,
407 }
408 }
409}
410
411async fn handle_active_leaves_update<Sender>(
412 sender: &mut Sender,
413 keystore: KeystorePtr,
414 validation_host: &mut impl ValidationBackend,
415 update: ActiveLeavesUpdate,
416 prepare_state: &mut PrepareValidationState,
417) where
418 Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
419{
420 let maybe_session_index = update_active_leaves(sender, validation_host, update.clone()).await;
421
422 if let Some(activated) = update.activated {
423 let maybe_new_session_index = match (prepare_state.session_index, maybe_session_index) {
424 (Some(existing_index), Some(new_index)) =>
425 (new_index > existing_index).then_some(new_index),
426 (None, Some(new_index)) => Some(new_index),
427 _ => None,
428 };
429 maybe_prepare_validation(
430 sender,
431 keystore.clone(),
432 validation_host,
433 activated,
434 prepare_state,
435 maybe_new_session_index,
436 )
437 .await;
438 }
439}
440
441async fn maybe_prepare_validation<Sender>(
442 sender: &mut Sender,
443 keystore: KeystorePtr,
444 validation_backend: &mut impl ValidationBackend,
445 leaf: ActivatedLeaf,
446 state: &mut PrepareValidationState,
447 new_session_index: Option<SessionIndex>,
448) where
449 Sender: SubsystemSender<RuntimeApiMessage>,
450{
451 if new_session_index.is_some() {
452 state.session_index = new_session_index;
453 state.already_prepared_code_hashes.clear();
454 state.is_next_session_authority = check_next_session_authority(
455 sender,
456 keystore,
457 leaf.hash,
458 state.session_index.expect("qed: just checked above"),
459 )
460 .await;
461 }
462
463 if state.is_next_session_authority {
465 let code_hashes = prepare_pvfs_for_backed_candidates(
466 sender,
467 validation_backend,
468 leaf.hash,
469 &state.already_prepared_code_hashes,
470 state.per_block_limit,
471 )
472 .await;
473 state.already_prepared_code_hashes.extend(code_hashes.unwrap_or_default());
474 }
475}
476
477async fn get_session_index<Sender>(sender: &mut Sender, relay_parent: Hash) -> Option<SessionIndex>
478where
479 Sender: SubsystemSender<RuntimeApiMessage>,
480{
481 let Ok(Ok(session_index)) =
482 util::request_session_index_for_child(relay_parent, sender).await.await
483 else {
484 gum::warn!(
485 target: LOG_TARGET,
486 ?relay_parent,
487 "cannot fetch session index from runtime API",
488 );
489 return None
490 };
491
492 Some(session_index)
493}
494
495async fn check_next_session_authority<Sender>(
497 sender: &mut Sender,
498 keystore: KeystorePtr,
499 relay_parent: Hash,
500 session_index: SessionIndex,
501) -> bool
502where
503 Sender: SubsystemSender<RuntimeApiMessage>,
504{
505 let Ok(Ok(authorities)) = util::request_authorities(relay_parent, sender).await.await else {
508 gum::warn!(
509 target: LOG_TARGET,
510 ?relay_parent,
511 "cannot fetch authorities from runtime API",
512 );
513 return false
514 };
515
516 let Ok(Ok(Some(session_info))) =
518 util::request_session_info(relay_parent, session_index, sender).await.await
519 else {
520 gum::warn!(
521 target: LOG_TARGET,
522 ?relay_parent,
523 "cannot fetch session info from runtime API",
524 );
525 return false
526 };
527
528 let is_past_present_or_future_authority = authorities
529 .iter()
530 .any(|v| keystore.has_keys(&[(v.to_raw_vec(), AuthorityDiscoveryId::ID)]));
531
532 let is_present_validator = session_info
534 .validators
535 .iter()
536 .any(|v| keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]));
537
538 is_past_present_or_future_authority && !is_present_validator
541}
542
543async fn prepare_pvfs_for_backed_candidates<Sender>(
545 sender: &mut Sender,
546 validation_backend: &mut impl ValidationBackend,
547 relay_parent: Hash,
548 already_prepared: &HashSet<ValidationCodeHash>,
549 per_block_limit: usize,
550) -> Option<Vec<ValidationCodeHash>>
551where
552 Sender: SubsystemSender<RuntimeApiMessage>,
553{
554 let Ok(Ok(events)) = util::request_candidate_events(relay_parent, sender).await.await else {
555 gum::warn!(
556 target: LOG_TARGET,
557 ?relay_parent,
558 "cannot fetch candidate events from runtime API",
559 );
560 return None
561 };
562 let code_hashes = events
563 .into_iter()
564 .filter_map(|e| match e {
565 CandidateEvent::CandidateBacked(receipt, ..) => {
566 let h = receipt.descriptor.validation_code_hash();
567 if already_prepared.contains(&h) {
568 None
569 } else {
570 Some(h)
571 }
572 },
573 _ => None,
574 })
575 .take(per_block_limit)
576 .collect::<Vec<_>>();
577
578 let Ok(executor_params) = util::executor_params_at_relay_parent(relay_parent, sender).await
579 else {
580 gum::warn!(
581 target: LOG_TARGET,
582 ?relay_parent,
583 "cannot fetch executor params for the session",
584 );
585 return None
586 };
587 let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
588
589 let mut active_pvfs = vec![];
590 let mut processed_code_hashes = vec![];
591 for code_hash in code_hashes {
592 let Ok(Ok(Some(validation_code))) =
593 util::request_validation_code_by_hash(relay_parent, code_hash, sender)
594 .await
595 .await
596 else {
597 gum::warn!(
598 target: LOG_TARGET,
599 ?relay_parent,
600 ?code_hash,
601 "cannot fetch validation code hash from runtime API",
602 );
603 continue;
604 };
605
606 let Some(session_index) = get_session_index(sender, relay_parent).await else { continue };
607
608 let validation_code_bomb_limit = match util::runtime::fetch_validation_code_bomb_limit(
609 relay_parent,
610 session_index,
611 sender,
612 )
613 .await
614 {
615 Ok(limit) => limit,
616 Err(err) => {
617 gum::warn!(
618 target: LOG_TARGET,
619 ?relay_parent,
620 ?err,
621 "cannot fetch validation code bomb limit from runtime API",
622 );
623 continue;
624 },
625 };
626
627 let pvf = PvfPrepData::from_code(
628 validation_code.0,
629 executor_params.clone(),
630 timeout,
631 PrepareJobKind::Prechecking,
632 validation_code_bomb_limit,
633 );
634
635 active_pvfs.push(pvf);
636 processed_code_hashes.push(code_hash);
637 }
638
639 if active_pvfs.is_empty() {
640 return None
641 }
642
643 if let Err(err) = validation_backend.heads_up(active_pvfs).await {
644 gum::warn!(
645 target: LOG_TARGET,
646 ?relay_parent,
647 ?err,
648 "cannot prepare PVF for the next session",
649 );
650 return None
651 };
652
653 gum::debug!(
654 target: LOG_TARGET,
655 ?relay_parent,
656 ?processed_code_hashes,
657 "Prepared PVF for the next session",
658 );
659
660 Some(processed_code_hashes)
661}
662
663async fn update_active_leaves<Sender>(
664 sender: &mut Sender,
665 validation_backend: &mut impl ValidationBackend,
666 update: ActiveLeavesUpdate,
667) -> Option<SessionIndex>
668where
669 Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
670{
671 let maybe_new_leaf = if let Some(activated) = &update.activated {
672 get_session_index(sender, activated.hash)
673 .await
674 .map(|index| (activated.hash, index))
675 } else {
676 None
677 };
678
679 let ancestors = get_block_ancestors(sender, maybe_new_leaf).await;
680 if let Err(err) = validation_backend.update_active_leaves(update, ancestors).await {
681 gum::warn!(
682 target: LOG_TARGET,
683 ?err,
684 "cannot update active leaves in validation backend",
685 );
686 };
687
688 maybe_new_leaf.map(|l| l.1)
689}
690
691async fn get_block_ancestors<Sender>(
692 sender: &mut Sender,
693 maybe_new_leaf: Option<(Hash, SessionIndex)>,
694) -> Vec<Hash>
695where
696 Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
697{
698 let Some((relay_parent, session_index)) = maybe_new_leaf else { return vec![] };
699 let scheduling_lookahead =
700 match fetch_scheduling_lookahead(relay_parent, session_index, sender).await {
701 Ok(scheduling_lookahead) => scheduling_lookahead,
702 res => {
703 gum::warn!(target: LOG_TARGET, ?res, "Failed to request scheduling lookahead");
704 return vec![]
705 },
706 };
707
708 let (tx, rx) = oneshot::channel();
709 sender
710 .send_message(ChainApiMessage::Ancestors {
711 hash: relay_parent,
712 k: scheduling_lookahead.saturating_sub(1) as usize,
714 response_channel: tx,
715 })
716 .await;
717 match rx.await {
718 Ok(Ok(x)) => x,
719 res => {
720 gum::warn!(target: LOG_TARGET, ?res, "cannot request ancestors");
721 vec![]
722 },
723 }
724}
725
726struct RuntimeRequestFailed;
727
728async fn runtime_api_request<T, Sender>(
729 sender: &mut Sender,
730 relay_parent: Hash,
731 request: RuntimeApiRequest,
732 receiver: oneshot::Receiver<Result<T, RuntimeApiError>>,
733) -> Result<T, RuntimeRequestFailed>
734where
735 Sender: SubsystemSender<RuntimeApiMessage>,
736{
737 sender
738 .send_message(RuntimeApiMessage::Request(relay_parent, request).into())
739 .await;
740
741 receiver
742 .await
743 .map_err(|_| {
744 gum::debug!(target: LOG_TARGET, ?relay_parent, "Runtime API request dropped");
745
746 RuntimeRequestFailed
747 })
748 .and_then(|res| {
749 res.map_err(|e| {
750 gum::debug!(
751 target: LOG_TARGET,
752 ?relay_parent,
753 err = ?e,
754 "Runtime API request internal error"
755 );
756
757 RuntimeRequestFailed
758 })
759 })
760}
761
762async fn request_validation_code_by_hash<Sender>(
763 sender: &mut Sender,
764 relay_parent: Hash,
765 validation_code_hash: ValidationCodeHash,
766) -> Result<Option<ValidationCode>, RuntimeRequestFailed>
767where
768 Sender: SubsystemSender<RuntimeApiMessage>,
769{
770 let (tx, rx) = oneshot::channel();
771 runtime_api_request(
772 sender,
773 relay_parent,
774 RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
775 rx,
776 )
777 .await
778}
779
780async fn precheck_pvf<Sender>(
781 sender: &mut Sender,
782 mut validation_backend: impl ValidationBackend,
783 relay_parent: Hash,
784 validation_code_hash: ValidationCodeHash,
785 validation_code_bomb_limit: u32,
786) -> PreCheckOutcome
787where
788 Sender: SubsystemSender<RuntimeApiMessage>,
789{
790 let validation_code =
791 match request_validation_code_by_hash(sender, relay_parent, validation_code_hash).await {
792 Ok(Some(code)) => code,
793 _ => {
794 gum::warn!(
798 target: LOG_TARGET,
799 ?relay_parent,
800 ?validation_code_hash,
801 "precheck: requested validation code is not found on-chain!",
802 );
803 return PreCheckOutcome::Failed
804 },
805 };
806
807 let executor_params = if let Ok(executor_params) =
808 util::executor_params_at_relay_parent(relay_parent, sender).await
809 {
810 gum::debug!(
811 target: LOG_TARGET,
812 ?relay_parent,
813 ?validation_code_hash,
814 "precheck: acquired executor params for the session: {:?}",
815 executor_params,
816 );
817 executor_params
818 } else {
819 gum::warn!(
820 target: LOG_TARGET,
821 ?relay_parent,
822 ?validation_code_hash,
823 "precheck: failed to acquire executor params for the session, thus voting against.",
824 );
825 return PreCheckOutcome::Invalid
826 };
827
828 let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Precheck);
829
830 let pvf = PvfPrepData::from_code(
831 validation_code.0,
832 executor_params,
833 timeout,
834 PrepareJobKind::Prechecking,
835 validation_code_bomb_limit,
836 );
837
838 match validation_backend.precheck_pvf(pvf).await {
839 Ok(_) => PreCheckOutcome::Valid,
840 Err(prepare_err) =>
841 if prepare_err.is_deterministic() {
842 PreCheckOutcome::Invalid
843 } else {
844 PreCheckOutcome::Failed
845 },
846 }
847}
848
849async fn validate_candidate_exhaustive(
850 expected_session_index: SessionIndex,
851 mut validation_backend: impl ValidationBackend + Send,
852 persisted_validation_data: PersistedValidationData,
853 validation_code: ValidationCode,
854 candidate_receipt: CandidateReceipt,
855 pov: Arc<PoV>,
856 executor_params: ExecutorParams,
857 exec_kind: PvfExecKind,
858 metrics: &Metrics,
859 maybe_claim_queue: Option<ClaimQueueSnapshot>,
860 validation_code_bomb_limit: u32,
861) -> Result<ValidationResult, ValidationFailed> {
862 let _timer = metrics.time_validate_candidate_exhaustive();
863 let validation_code_hash = validation_code.hash();
864 let relay_parent = candidate_receipt.descriptor.relay_parent();
865 let para_id = candidate_receipt.descriptor.para_id();
866 let candidate_hash = candidate_receipt.hash();
867
868 gum::debug!(
869 target: LOG_TARGET,
870 ?validation_code_hash,
871 ?candidate_hash,
872 ?para_id,
873 "About to validate a candidate.",
874 );
875
876 match (exec_kind, candidate_receipt.descriptor.session_index()) {
878 (PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_), Some(session_index)) =>
879 if session_index != expected_session_index {
880 return Ok(ValidationResult::Invalid(InvalidCandidate::InvalidSessionIndex))
881 },
882 (_, _) => {},
883 };
884
885 if let Err(e) = perform_basic_checks(
886 &candidate_receipt.descriptor,
887 persisted_validation_data.max_pov_size,
888 &pov,
889 &validation_code_hash,
890 ) {
891 gum::debug!(target: LOG_TARGET, ?para_id, ?candidate_hash, "Invalid candidate (basic checks)");
892 return Ok(ValidationResult::Invalid(e))
893 }
894
895 let persisted_validation_data = Arc::new(persisted_validation_data);
896 let result = match exec_kind {
897 PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_) => {
900 let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
901 let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind.into());
902 let pvf = PvfPrepData::from_code(
903 validation_code.0,
904 executor_params,
905 prep_timeout,
906 PrepareJobKind::Compilation,
907 validation_code_bomb_limit,
908 );
909
910 validation_backend
911 .validate_candidate(
912 pvf,
913 exec_timeout,
914 persisted_validation_data.clone(),
915 pov,
916 exec_kind.into(),
917 exec_kind,
918 )
919 .await
920 },
921 PvfExecKind::Approval | PvfExecKind::Dispute =>
922 validation_backend
923 .validate_candidate_with_retry(
924 validation_code.0,
925 pvf_exec_timeout(&executor_params, exec_kind.into()),
926 persisted_validation_data.clone(),
927 pov,
928 executor_params,
929 PVF_APPROVAL_EXECUTION_RETRY_DELAY,
930 exec_kind.into(),
931 exec_kind,
932 validation_code_bomb_limit,
933 )
934 .await,
935 };
936
937 if let Err(ref error) = result {
938 gum::info!(target: LOG_TARGET, ?para_id, ?candidate_hash, ?error, "Failed to validate candidate");
939 }
940
941 match result {
942 Err(ValidationError::Internal(e)) => {
943 gum::warn!(
944 target: LOG_TARGET,
945 ?para_id,
946 ?candidate_hash,
947 ?e,
948 "An internal error occurred during validation, will abstain from voting",
949 );
950 Err(ValidationFailed(e.to_string()))
951 },
952 Err(ValidationError::Invalid(WasmInvalidCandidate::HardTimeout)) =>
953 Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)),
954 Err(ValidationError::Invalid(WasmInvalidCandidate::WorkerReportedInvalid(e))) =>
955 Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))),
956 Err(ValidationError::Invalid(WasmInvalidCandidate::PoVDecompressionFailure)) =>
957 Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)),
958 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)) =>
959 Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
960 "ambiguous worker death".to_string(),
961 ))),
962 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))) =>
963 Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
964 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(err))) =>
965 Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
966 Err(ValidationError::PossiblyInvalid(err @ PossiblyInvalidError::CorruptedArtifact)) =>
967 Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err.to_string()))),
968
969 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))) =>
970 Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!(
971 "ambiguous job death: {err}"
972 )))),
973 Err(ValidationError::Preparation(e)) => {
974 gum::warn!(
975 target: LOG_TARGET,
976 ?para_id,
977 ?e,
978 "Deterministic error occurred during preparation (should have been ruled out by pre-checking phase)",
979 );
980 Err(ValidationFailed(e.to_string()))
981 },
982 Err(e @ ValidationError::ExecutionDeadline) => {
983 gum::warn!(
984 target: LOG_TARGET,
985 ?para_id,
986 ?e,
987 "Job assigned too late, execution queue probably overloaded",
988 );
989 Err(ValidationFailed(e.to_string()))
990 },
991 Ok(res) =>
992 if res.head_data.hash() != candidate_receipt.descriptor.para_head() {
993 gum::info!(target: LOG_TARGET, ?para_id, "Invalid candidate (para_head)");
994 Ok(ValidationResult::Invalid(InvalidCandidate::ParaHeadHashMismatch))
995 } else {
996 let committed_candidate_receipt = CommittedCandidateReceipt {
997 descriptor: candidate_receipt.descriptor.clone(),
998 commitments: CandidateCommitments {
999 head_data: res.head_data,
1000 upward_messages: res.upward_messages,
1001 horizontal_messages: res.horizontal_messages,
1002 new_validation_code: res.new_validation_code,
1003 processed_downward_messages: res.processed_downward_messages,
1004 hrmp_watermark: res.hrmp_watermark,
1005 },
1006 };
1007
1008 if candidate_receipt.commitments_hash !=
1009 committed_candidate_receipt.commitments.hash()
1010 {
1011 gum::info!(
1012 target: LOG_TARGET,
1013 ?para_id,
1014 ?candidate_hash,
1015 "Invalid candidate (commitments hash)"
1016 );
1017
1018 gum::trace!(
1019 target: LOG_TARGET,
1020 ?para_id,
1021 ?candidate_hash,
1022 produced_commitments = ?committed_candidate_receipt.commitments,
1023 "Invalid candidate commitments"
1024 );
1025
1026 Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))
1029 } else {
1030 match exec_kind {
1031 PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_) => {
1034 let Some(claim_queue) = maybe_claim_queue else {
1035 let error = "cannot fetch the claim queue from the runtime";
1036 gum::warn!(
1037 target: LOG_TARGET,
1038 ?relay_parent,
1039 error
1040 );
1041
1042 return Err(ValidationFailed(error.into()))
1043 };
1044
1045 if let Err(err) = committed_candidate_receipt
1046 .parse_ump_signals(&transpose_claim_queue(claim_queue.0))
1047 {
1048 gum::warn!(
1049 target: LOG_TARGET,
1050 candidate_hash = ?candidate_receipt.hash(),
1051 "Invalid UMP signals: {}",
1052 err
1053 );
1054 return Ok(ValidationResult::Invalid(
1055 InvalidCandidate::InvalidUMPSignals(err),
1056 ))
1057 }
1058 },
1059 _ => {},
1061 }
1062
1063 Ok(ValidationResult::Valid(
1064 committed_candidate_receipt.commitments,
1065 (*persisted_validation_data).clone(),
1066 ))
1067 }
1068 },
1069 }
1070}
1071
1072#[async_trait]
1073trait ValidationBackend {
1074 async fn validate_candidate(
1076 &mut self,
1077 pvf: PvfPrepData,
1078 exec_timeout: Duration,
1079 pvd: Arc<PersistedValidationData>,
1080 pov: Arc<PoV>,
1081 prepare_priority: polkadot_node_core_pvf::Priority,
1083 exec_kind: PvfExecKind,
1085 ) -> Result<WasmValidationResult, ValidationError>;
1086
1087 async fn validate_candidate_with_retry(
1096 &mut self,
1097 code: Vec<u8>,
1098 exec_timeout: Duration,
1099 pvd: Arc<PersistedValidationData>,
1100 pov: Arc<PoV>,
1101 executor_params: ExecutorParams,
1102 retry_delay: Duration,
1103 prepare_priority: polkadot_node_core_pvf::Priority,
1105 exec_kind: PvfExecKind,
1107 validation_code_bomb_limit: u32,
1108 ) -> Result<WasmValidationResult, ValidationError> {
1109 let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
1110 let pvf = PvfPrepData::from_code(
1112 code,
1113 executor_params,
1114 prep_timeout,
1115 PrepareJobKind::Compilation,
1116 validation_code_bomb_limit,
1117 );
1118 let total_time_start = Instant::now();
1121
1122 let mut validation_result = self
1124 .validate_candidate(
1125 pvf.clone(),
1126 exec_timeout,
1127 pvd.clone(),
1128 pov.clone(),
1129 prepare_priority,
1130 exec_kind,
1131 )
1132 .await;
1133 if validation_result.is_ok() {
1134 return validation_result
1135 }
1136
1137 macro_rules! break_if_no_retries_left {
1138 ($counter:ident) => {
1139 if $counter > 0 {
1140 $counter -= 1;
1141 } else {
1142 break
1143 }
1144 };
1145 }
1146
1147 let mut num_death_retries_left = 1;
1149 let mut num_job_error_retries_left = 1;
1150 let mut num_internal_retries_left = 1;
1151 let mut num_execution_error_retries_left = 1;
1152 loop {
1153 if total_time_start.elapsed() + retry_delay > exec_timeout {
1155 break
1156 }
1157 let mut retry_immediately = false;
1158 match validation_result {
1159 Err(ValidationError::PossiblyInvalid(
1160 PossiblyInvalidError::AmbiguousWorkerDeath |
1161 PossiblyInvalidError::AmbiguousJobDeath(_),
1162 )) => break_if_no_retries_left!(num_death_retries_left),
1163
1164 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(_))) =>
1165 break_if_no_retries_left!(num_job_error_retries_left),
1166
1167 Err(ValidationError::Internal(_)) =>
1168 break_if_no_retries_left!(num_internal_retries_left),
1169
1170 Err(ValidationError::PossiblyInvalid(
1171 PossiblyInvalidError::RuntimeConstruction(_) |
1172 PossiblyInvalidError::CorruptedArtifact,
1173 )) => {
1174 break_if_no_retries_left!(num_execution_error_retries_left);
1175 self.precheck_pvf(pvf.clone()).await?;
1176 retry_immediately = true;
1181 },
1182
1183 Ok(_) |
1184 Err(
1185 ValidationError::Invalid(_) |
1186 ValidationError::Preparation(_) |
1187 ValidationError::ExecutionDeadline,
1188 ) => break,
1189 }
1190
1191 {
1194 if !retry_immediately {
1197 futures_timer::Delay::new(retry_delay).await;
1198 }
1199
1200 let new_timeout = exec_timeout.saturating_sub(total_time_start.elapsed());
1201
1202 gum::warn!(
1203 target: LOG_TARGET,
1204 ?pvf,
1205 ?new_timeout,
1206 "Re-trying failed candidate validation due to possible transient error: {:?}",
1207 validation_result
1208 );
1209
1210 validation_result = self
1211 .validate_candidate(
1212 pvf.clone(),
1213 new_timeout,
1214 pvd.clone(),
1215 pov.clone(),
1216 prepare_priority,
1217 exec_kind,
1218 )
1219 .await;
1220 }
1221 }
1222
1223 validation_result
1224 }
1225
1226 async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError>;
1227
1228 async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String>;
1229
1230 async fn update_active_leaves(
1231 &mut self,
1232 update: ActiveLeavesUpdate,
1233 ancestors: Vec<Hash>,
1234 ) -> Result<(), String>;
1235}
1236
1237#[async_trait]
1238impl ValidationBackend for ValidationHost {
1239 async fn validate_candidate(
1241 &mut self,
1242 pvf: PvfPrepData,
1243 exec_timeout: Duration,
1244 pvd: Arc<PersistedValidationData>,
1245 pov: Arc<PoV>,
1246 prepare_priority: polkadot_node_core_pvf::Priority,
1248 exec_kind: PvfExecKind,
1250 ) -> Result<WasmValidationResult, ValidationError> {
1251 let (tx, rx) = oneshot::channel();
1252 if let Err(err) = self
1253 .execute_pvf(pvf, exec_timeout, pvd, pov, prepare_priority, exec_kind, tx)
1254 .await
1255 {
1256 return Err(InternalValidationError::HostCommunication(format!(
1257 "cannot send pvf to the validation host, it might have shut down: {:?}",
1258 err
1259 ))
1260 .into())
1261 }
1262
1263 rx.await.map_err(|_| {
1264 ValidationError::from(InternalValidationError::HostCommunication(
1265 "validation was cancelled".into(),
1266 ))
1267 })?
1268 }
1269
1270 async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError> {
1271 let (tx, rx) = oneshot::channel();
1272 if let Err(err) = self.precheck_pvf(pvf, tx).await {
1273 return Err(PrepareError::IoErr(err))
1275 }
1276
1277 let precheck_result = rx.await.map_err(|err| PrepareError::IoErr(err.to_string()))?;
1278
1279 precheck_result
1280 }
1281
1282 async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
1283 self.heads_up(active_pvfs).await
1284 }
1285
1286 async fn update_active_leaves(
1287 &mut self,
1288 update: ActiveLeavesUpdate,
1289 ancestors: Vec<Hash>,
1290 ) -> Result<(), String> {
1291 self.update_active_leaves(update, ancestors).await
1292 }
1293}
1294
1295fn perform_basic_checks(
1298 candidate: &CandidateDescriptor,
1299 max_pov_size: u32,
1300 pov: &PoV,
1301 validation_code_hash: &ValidationCodeHash,
1302) -> Result<(), InvalidCandidate> {
1303 let pov_hash = pov.hash();
1304
1305 let encoded_pov_size = pov.encoded_size();
1306 if encoded_pov_size > max_pov_size as usize {
1307 return Err(InvalidCandidate::ParamsTooLarge(encoded_pov_size as u64))
1308 }
1309
1310 if pov_hash != candidate.pov_hash() {
1311 return Err(InvalidCandidate::PoVHashMismatch)
1312 }
1313
1314 if *validation_code_hash != candidate.validation_code_hash() {
1315 return Err(InvalidCandidate::CodeHashMismatch)
1316 }
1317
1318 Ok(())
1319}
1320
1321fn pvf_prep_timeout(executor_params: &ExecutorParams, kind: PvfPrepKind) -> Duration {
1331 if let Some(timeout) = executor_params.pvf_prep_timeout(kind) {
1332 return timeout
1333 }
1334 match kind {
1335 PvfPrepKind::Precheck => DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
1336 PvfPrepKind::Prepare => DEFAULT_LENIENT_PREPARATION_TIMEOUT,
1337 }
1338}
1339
1340fn pvf_exec_timeout(executor_params: &ExecutorParams, kind: RuntimePvfExecKind) -> Duration {
1351 if let Some(timeout) = executor_params.pvf_exec_timeout(kind) {
1352 return timeout
1353 }
1354 match kind {
1355 RuntimePvfExecKind::Backing => DEFAULT_BACKING_EXECUTION_TIMEOUT,
1356 RuntimePvfExecKind::Approval => DEFAULT_APPROVAL_EXECUTION_TIMEOUT,
1357 }
1358}