referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_candidate_validation/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The Candidate Validation subsystem.
18//!
19//! This handles incoming requests from other subsystems to validate candidates
20//! according to a validation function. This delegates validation to an underlying
21//! pool of processes used for execution of the Wasm.
22
23#![deny(unused_crate_dependencies, unused_results)]
24#![warn(missing_docs)]
25
26use polkadot_node_core_pvf::{
27	InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError,
28	PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost,
29};
30use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult};
31use polkadot_node_subsystem::{
32	errors::RuntimeApiError,
33	messages::{
34		CandidateValidationMessage, ChainApiMessage, PreCheckOutcome, PvfExecKind,
35		RuntimeApiMessage, RuntimeApiRequest, ValidationFailed,
36	},
37	overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
38	SubsystemSender,
39};
40use polkadot_node_subsystem_util::{
41	self as util,
42	runtime::{fetch_scheduling_lookahead, ClaimQueueSnapshot},
43};
44use polkadot_overseer::{ActivatedLeaf, ActiveLeavesUpdate};
45use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult;
46use polkadot_primitives::{
47	executor_params::{
48		DEFAULT_APPROVAL_EXECUTION_TIMEOUT, DEFAULT_BACKING_EXECUTION_TIMEOUT,
49		DEFAULT_LENIENT_PREPARATION_TIMEOUT, DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
50	},
51	transpose_claim_queue, AuthorityDiscoveryId, CandidateCommitments,
52	CandidateDescriptorV2 as CandidateDescriptor, CandidateEvent,
53	CandidateReceiptV2 as CandidateReceipt,
54	CommittedCandidateReceiptV2 as CommittedCandidateReceipt, ExecutorParams, Hash,
55	PersistedValidationData, PvfExecKind as RuntimePvfExecKind, PvfPrepKind, SessionIndex,
56	ValidationCode, ValidationCodeHash, ValidatorId,
57};
58use sp_application_crypto::{AppCrypto, ByteArray};
59use sp_keystore::KeystorePtr;
60
61use codec::Encode;
62
63use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
64
65use std::{
66	collections::HashSet,
67	path::PathBuf,
68	pin::Pin,
69	sync::Arc,
70	time::{Duration, Instant},
71};
72
73use async_trait::async_trait;
74
75mod metrics;
76use self::metrics::Metrics;
77
78#[cfg(test)]
79mod tests;
80
81const LOG_TARGET: &'static str = "parachain::candidate-validation";
82
83/// The amount of time to wait before retrying after a retry-able approval validation error. We use
84/// a higher value for the approval case since we have more time, and if we wait longer it is more
85/// likely that transient conditions will resolve.
86#[cfg(not(test))]
87const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
88#[cfg(test)]
89const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
90
91// The task queue size is chosen to be somewhat bigger than the PVF host incoming queue size
92// to allow exhaustive validation messages to fall through in case the tasks are clogged
93const TASK_LIMIT: usize = 30;
94
95/// Configuration for the candidate validation subsystem
96#[derive(Clone, Default)]
97pub struct Config {
98	/// The path where candidate validation can store compiled artifacts for PVFs.
99	pub artifacts_cache_path: PathBuf,
100	/// The version of the node. `None` can be passed to skip the version check (only for tests).
101	pub node_version: Option<String>,
102	/// Whether the node is attempting to run as a secure validator.
103	pub secure_validator_mode: bool,
104	/// Path to the preparation worker binary
105	pub prep_worker_path: PathBuf,
106	/// Path to the execution worker binary
107	pub exec_worker_path: PathBuf,
108	/// The maximum number of pvf execution workers.
109	pub pvf_execute_workers_max_num: usize,
110	/// The maximum number of pvf workers that can be spawned in the pvf prepare pool for tasks
111	/// with the priority below critical.
112	pub pvf_prepare_workers_soft_max_num: usize,
113	/// The absolute number of pvf workers that can be spawned in the pvf prepare pool.
114	pub pvf_prepare_workers_hard_max_num: usize,
115}
116
117/// The candidate validation subsystem.
118pub struct CandidateValidationSubsystem {
119	keystore: KeystorePtr,
120	#[allow(missing_docs)]
121	pub metrics: Metrics,
122	#[allow(missing_docs)]
123	pub pvf_metrics: polkadot_node_core_pvf::Metrics,
124	config: Option<Config>,
125}
126
127impl CandidateValidationSubsystem {
128	/// Create a new `CandidateValidationSubsystem`.
129	pub fn with_config(
130		config: Option<Config>,
131		keystore: KeystorePtr,
132		metrics: Metrics,
133		pvf_metrics: polkadot_node_core_pvf::Metrics,
134	) -> Self {
135		CandidateValidationSubsystem { keystore, config, metrics, pvf_metrics }
136	}
137}
138
139#[overseer::subsystem(CandidateValidation, error=SubsystemError, prefix=self::overseer)]
140impl<Context> CandidateValidationSubsystem {
141	fn start(self, ctx: Context) -> SpawnedSubsystem {
142		if let Some(config) = self.config {
143			let future = run(ctx, self.keystore, self.metrics, self.pvf_metrics, config)
144				.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
145				.boxed();
146			SpawnedSubsystem { name: "candidate-validation-subsystem", future }
147		} else {
148			polkadot_overseer::DummySubsystem.start(ctx)
149		}
150	}
151}
152
153// Returns the claim queue at relay parent and logs a warning if it is not available.
154async fn claim_queue<Sender>(relay_parent: Hash, sender: &mut Sender) -> Option<ClaimQueueSnapshot>
155where
156	Sender: SubsystemSender<RuntimeApiMessage>,
157{
158	match util::runtime::fetch_claim_queue(sender, relay_parent).await {
159		Ok(cq) => Some(cq),
160		Err(err) => {
161			gum::warn!(
162				target: LOG_TARGET,
163				?relay_parent,
164				?err,
165				"Claim queue not available"
166			);
167			None
168		},
169	}
170}
171
172fn handle_validation_message<S>(
173	mut sender: S,
174	validation_host: ValidationHost,
175	metrics: Metrics,
176	msg: CandidateValidationMessage,
177) -> Pin<Box<dyn Future<Output = ()> + Send>>
178where
179	S: SubsystemSender<RuntimeApiMessage>,
180{
181	match msg {
182		CandidateValidationMessage::ValidateFromExhaustive {
183			validation_data,
184			validation_code,
185			candidate_receipt,
186			pov,
187			executor_params,
188			exec_kind,
189			response_sender,
190			..
191		} => async move {
192			let _timer = metrics.time_validate_from_exhaustive();
193			let relay_parent = candidate_receipt.descriptor.relay_parent();
194
195			let maybe_claim_queue = claim_queue(relay_parent, &mut sender).await;
196			let Some(session_index) = get_session_index(&mut sender, relay_parent).await else {
197				let error = "cannot fetch session index from the runtime";
198				gum::warn!(
199					target: LOG_TARGET,
200					?relay_parent,
201					error,
202				);
203
204				let _ = response_sender
205					.send(Err(ValidationFailed("Session index not found".to_string())));
206				return
207			};
208
209			// This will return a default value for the limit if runtime API is not available.
210			// however we still error out if there is a weird runtime API error.
211			let Ok(validation_code_bomb_limit) = util::runtime::fetch_validation_code_bomb_limit(
212				relay_parent,
213				session_index,
214				&mut sender,
215			)
216			.await
217			else {
218				let error = "cannot fetch validation code bomb limit from the runtime";
219				gum::warn!(
220					target: LOG_TARGET,
221					?relay_parent,
222					error,
223				);
224
225				let _ = response_sender.send(Err(ValidationFailed(
226					"Validation code bomb limit not available".to_string(),
227				)));
228				return
229			};
230
231			let res = validate_candidate_exhaustive(
232				session_index,
233				validation_host,
234				validation_data,
235				validation_code,
236				candidate_receipt,
237				pov,
238				executor_params,
239				exec_kind,
240				&metrics,
241				maybe_claim_queue,
242				validation_code_bomb_limit,
243			)
244			.await;
245
246			metrics.on_validation_event(&res);
247			let _ = response_sender.send(res);
248		}
249		.boxed(),
250		CandidateValidationMessage::PreCheck {
251			relay_parent,
252			validation_code_hash,
253			response_sender,
254			..
255		} => async move {
256			let Some(session_index) = get_session_index(&mut sender, relay_parent).await else {
257				let error = "cannot fetch session index from the runtime";
258				gum::warn!(
259					target: LOG_TARGET,
260					?relay_parent,
261					error,
262				);
263
264				let _ = response_sender.send(PreCheckOutcome::Failed);
265				return
266			};
267
268			// This will return a default value for the limit if runtime API is not available.
269			// however we still error out if there is a weird runtime API error.
270			let Ok(validation_code_bomb_limit) = util::runtime::fetch_validation_code_bomb_limit(
271				relay_parent,
272				session_index,
273				&mut sender,
274			)
275			.await
276			else {
277				let error = "cannot fetch validation code bomb limit from the runtime";
278				gum::warn!(
279					target: LOG_TARGET,
280					?relay_parent,
281					error,
282				);
283
284				let _ = response_sender.send(PreCheckOutcome::Failed);
285				return
286			};
287
288			let precheck_result = precheck_pvf(
289				&mut sender,
290				validation_host,
291				relay_parent,
292				validation_code_hash,
293				validation_code_bomb_limit,
294			)
295			.await;
296
297			let _ = response_sender.send(precheck_result);
298		}
299		.boxed(),
300	}
301}
302
303#[overseer::contextbounds(CandidateValidation, prefix = self::overseer)]
304async fn run<Context>(
305	mut ctx: Context,
306	keystore: KeystorePtr,
307	metrics: Metrics,
308	pvf_metrics: polkadot_node_core_pvf::Metrics,
309	Config {
310		artifacts_cache_path,
311		node_version,
312		secure_validator_mode,
313		prep_worker_path,
314		exec_worker_path,
315		pvf_execute_workers_max_num,
316		pvf_prepare_workers_soft_max_num,
317		pvf_prepare_workers_hard_max_num,
318	}: Config,
319) -> SubsystemResult<()> {
320	let (mut validation_host, task) = polkadot_node_core_pvf::start(
321		polkadot_node_core_pvf::Config::new(
322			artifacts_cache_path,
323			node_version,
324			secure_validator_mode,
325			prep_worker_path,
326			exec_worker_path,
327			pvf_execute_workers_max_num,
328			pvf_prepare_workers_soft_max_num,
329			pvf_prepare_workers_hard_max_num,
330		),
331		pvf_metrics,
332	)
333	.await?;
334	ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
335
336	let mut tasks = FuturesUnordered::new();
337	let mut prepare_state = PrepareValidationState::default();
338
339	loop {
340		loop {
341			futures::select! {
342				comm = ctx.recv().fuse() => {
343					match comm {
344						Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update))) => {
345							handle_active_leaves_update(
346								ctx.sender(),
347								keystore.clone(),
348								&mut validation_host,
349								update,
350								&mut prepare_state,
351							).await
352						},
353						Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {},
354						Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
355						Ok(FromOrchestra::Communication { msg }) => {
356							let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg);
357							tasks.push(task);
358							if tasks.len() >= TASK_LIMIT {
359								break
360							}
361						},
362						Err(e) => return Err(SubsystemError::from(e)),
363					}
364				},
365				_ = tasks.select_next_some() => ()
366			}
367		}
368
369		gum::debug!(target: LOG_TARGET, "Validation task limit hit");
370
371		loop {
372			futures::select! {
373				signal = ctx.recv_signal().fuse() => {
374					match signal {
375						Ok(OverseerSignal::ActiveLeaves(_)) => {},
376						Ok(OverseerSignal::BlockFinalized(..)) => {},
377						Ok(OverseerSignal::Conclude) => return Ok(()),
378						Err(e) => return Err(SubsystemError::from(e)),
379					}
380				},
381				_ = tasks.select_next_some() => {
382					if tasks.len() < TASK_LIMIT {
383						break
384					}
385				}
386			}
387		}
388	}
389}
390
391struct PrepareValidationState {
392	session_index: Option<SessionIndex>,
393	is_next_session_authority: bool,
394	// PVF host won't prepare the same code hash twice, so here we just avoid extra communication
395	already_prepared_code_hashes: HashSet<ValidationCodeHash>,
396	// How many PVFs per block we take to prepare themselves for the next session validation
397	per_block_limit: usize,
398}
399
400impl Default for PrepareValidationState {
401	fn default() -> Self {
402		Self {
403			session_index: None,
404			is_next_session_authority: false,
405			already_prepared_code_hashes: HashSet::new(),
406			per_block_limit: 1,
407		}
408	}
409}
410
411async fn handle_active_leaves_update<Sender>(
412	sender: &mut Sender,
413	keystore: KeystorePtr,
414	validation_host: &mut impl ValidationBackend,
415	update: ActiveLeavesUpdate,
416	prepare_state: &mut PrepareValidationState,
417) where
418	Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
419{
420	let maybe_session_index = update_active_leaves(sender, validation_host, update.clone()).await;
421
422	if let Some(activated) = update.activated {
423		let maybe_new_session_index = match (prepare_state.session_index, maybe_session_index) {
424			(Some(existing_index), Some(new_index)) =>
425				(new_index > existing_index).then_some(new_index),
426			(None, Some(new_index)) => Some(new_index),
427			_ => None,
428		};
429		maybe_prepare_validation(
430			sender,
431			keystore.clone(),
432			validation_host,
433			activated,
434			prepare_state,
435			maybe_new_session_index,
436		)
437		.await;
438	}
439}
440
441async fn maybe_prepare_validation<Sender>(
442	sender: &mut Sender,
443	keystore: KeystorePtr,
444	validation_backend: &mut impl ValidationBackend,
445	leaf: ActivatedLeaf,
446	state: &mut PrepareValidationState,
447	new_session_index: Option<SessionIndex>,
448) where
449	Sender: SubsystemSender<RuntimeApiMessage>,
450{
451	if new_session_index.is_some() {
452		state.session_index = new_session_index;
453		state.already_prepared_code_hashes.clear();
454		state.is_next_session_authority = check_next_session_authority(
455			sender,
456			keystore,
457			leaf.hash,
458			state.session_index.expect("qed: just checked above"),
459		)
460		.await;
461	}
462
463	// On every active leaf check candidates and prepare PVFs our node doesn't have yet.
464	if state.is_next_session_authority {
465		let code_hashes = prepare_pvfs_for_backed_candidates(
466			sender,
467			validation_backend,
468			leaf.hash,
469			&state.already_prepared_code_hashes,
470			state.per_block_limit,
471		)
472		.await;
473		state.already_prepared_code_hashes.extend(code_hashes.unwrap_or_default());
474	}
475}
476
477async fn get_session_index<Sender>(sender: &mut Sender, relay_parent: Hash) -> Option<SessionIndex>
478where
479	Sender: SubsystemSender<RuntimeApiMessage>,
480{
481	let Ok(Ok(session_index)) =
482		util::request_session_index_for_child(relay_parent, sender).await.await
483	else {
484		gum::warn!(
485			target: LOG_TARGET,
486			?relay_parent,
487			"cannot fetch session index from runtime API",
488		);
489		return None
490	};
491
492	Some(session_index)
493}
494
495// Returns true if the node is an authority in the next session.
496async fn check_next_session_authority<Sender>(
497	sender: &mut Sender,
498	keystore: KeystorePtr,
499	relay_parent: Hash,
500	session_index: SessionIndex,
501) -> bool
502where
503	Sender: SubsystemSender<RuntimeApiMessage>,
504{
505	// In spite of function name here we request past, present and future authorities.
506	// It's ok to stil prepare PVFs in other cases, but better to request only future ones.
507	let Ok(Ok(authorities)) = util::request_authorities(relay_parent, sender).await.await else {
508		gum::warn!(
509			target: LOG_TARGET,
510			?relay_parent,
511			"cannot fetch authorities from runtime API",
512		);
513		return false
514	};
515
516	// We need to exclude at least current session authority from the previous request
517	let Ok(Ok(Some(session_info))) =
518		util::request_session_info(relay_parent, session_index, sender).await.await
519	else {
520		gum::warn!(
521			target: LOG_TARGET,
522			?relay_parent,
523			"cannot fetch session info from runtime API",
524		);
525		return false
526	};
527
528	let is_past_present_or_future_authority = authorities
529		.iter()
530		.any(|v| keystore.has_keys(&[(v.to_raw_vec(), AuthorityDiscoveryId::ID)]));
531
532	// We could've checked discovery_keys but on Kusama validators.len() < discovery_keys.len().
533	let is_present_validator = session_info
534		.validators
535		.iter()
536		.any(|v| keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]));
537
538	// There is still a chance to be a previous session authority, but this extra work does not
539	// affect the finalization.
540	is_past_present_or_future_authority && !is_present_validator
541}
542
543// Sends PVF with unknown code hashes to the validation host returning the list of code hashes sent.
544async fn prepare_pvfs_for_backed_candidates<Sender>(
545	sender: &mut Sender,
546	validation_backend: &mut impl ValidationBackend,
547	relay_parent: Hash,
548	already_prepared: &HashSet<ValidationCodeHash>,
549	per_block_limit: usize,
550) -> Option<Vec<ValidationCodeHash>>
551where
552	Sender: SubsystemSender<RuntimeApiMessage>,
553{
554	let Ok(Ok(events)) = util::request_candidate_events(relay_parent, sender).await.await else {
555		gum::warn!(
556			target: LOG_TARGET,
557			?relay_parent,
558			"cannot fetch candidate events from runtime API",
559		);
560		return None
561	};
562	let code_hashes = events
563		.into_iter()
564		.filter_map(|e| match e {
565			CandidateEvent::CandidateBacked(receipt, ..) => {
566				let h = receipt.descriptor.validation_code_hash();
567				if already_prepared.contains(&h) {
568					None
569				} else {
570					Some(h)
571				}
572			},
573			_ => None,
574		})
575		.take(per_block_limit)
576		.collect::<Vec<_>>();
577
578	let Ok(executor_params) = util::executor_params_at_relay_parent(relay_parent, sender).await
579	else {
580		gum::warn!(
581			target: LOG_TARGET,
582			?relay_parent,
583			"cannot fetch executor params for the session",
584		);
585		return None
586	};
587	let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
588
589	let mut active_pvfs = vec![];
590	let mut processed_code_hashes = vec![];
591	for code_hash in code_hashes {
592		let Ok(Ok(Some(validation_code))) =
593			util::request_validation_code_by_hash(relay_parent, code_hash, sender)
594				.await
595				.await
596		else {
597			gum::warn!(
598				target: LOG_TARGET,
599				?relay_parent,
600				?code_hash,
601				"cannot fetch validation code hash from runtime API",
602			);
603			continue;
604		};
605
606		let Some(session_index) = get_session_index(sender, relay_parent).await else { continue };
607
608		let validation_code_bomb_limit = match util::runtime::fetch_validation_code_bomb_limit(
609			relay_parent,
610			session_index,
611			sender,
612		)
613		.await
614		{
615			Ok(limit) => limit,
616			Err(err) => {
617				gum::warn!(
618					target: LOG_TARGET,
619					?relay_parent,
620					?err,
621					"cannot fetch validation code bomb limit from runtime API",
622				);
623				continue;
624			},
625		};
626
627		let pvf = PvfPrepData::from_code(
628			validation_code.0,
629			executor_params.clone(),
630			timeout,
631			PrepareJobKind::Prechecking,
632			validation_code_bomb_limit,
633		);
634
635		active_pvfs.push(pvf);
636		processed_code_hashes.push(code_hash);
637	}
638
639	if active_pvfs.is_empty() {
640		return None
641	}
642
643	if let Err(err) = validation_backend.heads_up(active_pvfs).await {
644		gum::warn!(
645			target: LOG_TARGET,
646			?relay_parent,
647			?err,
648			"cannot prepare PVF for the next session",
649		);
650		return None
651	};
652
653	gum::debug!(
654		target: LOG_TARGET,
655		?relay_parent,
656		?processed_code_hashes,
657		"Prepared PVF for the next session",
658	);
659
660	Some(processed_code_hashes)
661}
662
663async fn update_active_leaves<Sender>(
664	sender: &mut Sender,
665	validation_backend: &mut impl ValidationBackend,
666	update: ActiveLeavesUpdate,
667) -> Option<SessionIndex>
668where
669	Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
670{
671	let maybe_new_leaf = if let Some(activated) = &update.activated {
672		get_session_index(sender, activated.hash)
673			.await
674			.map(|index| (activated.hash, index))
675	} else {
676		None
677	};
678
679	let ancestors = get_block_ancestors(sender, maybe_new_leaf).await;
680	if let Err(err) = validation_backend.update_active_leaves(update, ancestors).await {
681		gum::warn!(
682			target: LOG_TARGET,
683			?err,
684			"cannot update active leaves in validation backend",
685		);
686	};
687
688	maybe_new_leaf.map(|l| l.1)
689}
690
691async fn get_block_ancestors<Sender>(
692	sender: &mut Sender,
693	maybe_new_leaf: Option<(Hash, SessionIndex)>,
694) -> Vec<Hash>
695where
696	Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
697{
698	let Some((relay_parent, session_index)) = maybe_new_leaf else { return vec![] };
699	let scheduling_lookahead =
700		match fetch_scheduling_lookahead(relay_parent, session_index, sender).await {
701			Ok(scheduling_lookahead) => scheduling_lookahead,
702			res => {
703				gum::warn!(target: LOG_TARGET, ?res, "Failed to request scheduling lookahead");
704				return vec![]
705			},
706		};
707
708	let (tx, rx) = oneshot::channel();
709	sender
710		.send_message(ChainApiMessage::Ancestors {
711			hash: relay_parent,
712			// Subtract 1 from the claim queue length, as it includes current `relay_parent`.
713			k: scheduling_lookahead.saturating_sub(1) as usize,
714			response_channel: tx,
715		})
716		.await;
717	match rx.await {
718		Ok(Ok(x)) => x,
719		res => {
720			gum::warn!(target: LOG_TARGET, ?res, "cannot request ancestors");
721			vec![]
722		},
723	}
724}
725
726struct RuntimeRequestFailed;
727
728async fn runtime_api_request<T, Sender>(
729	sender: &mut Sender,
730	relay_parent: Hash,
731	request: RuntimeApiRequest,
732	receiver: oneshot::Receiver<Result<T, RuntimeApiError>>,
733) -> Result<T, RuntimeRequestFailed>
734where
735	Sender: SubsystemSender<RuntimeApiMessage>,
736{
737	sender
738		.send_message(RuntimeApiMessage::Request(relay_parent, request).into())
739		.await;
740
741	receiver
742		.await
743		.map_err(|_| {
744			gum::debug!(target: LOG_TARGET, ?relay_parent, "Runtime API request dropped");
745
746			RuntimeRequestFailed
747		})
748		.and_then(|res| {
749			res.map_err(|e| {
750				gum::debug!(
751					target: LOG_TARGET,
752					?relay_parent,
753					err = ?e,
754					"Runtime API request internal error"
755				);
756
757				RuntimeRequestFailed
758			})
759		})
760}
761
762async fn request_validation_code_by_hash<Sender>(
763	sender: &mut Sender,
764	relay_parent: Hash,
765	validation_code_hash: ValidationCodeHash,
766) -> Result<Option<ValidationCode>, RuntimeRequestFailed>
767where
768	Sender: SubsystemSender<RuntimeApiMessage>,
769{
770	let (tx, rx) = oneshot::channel();
771	runtime_api_request(
772		sender,
773		relay_parent,
774		RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
775		rx,
776	)
777	.await
778}
779
780async fn precheck_pvf<Sender>(
781	sender: &mut Sender,
782	mut validation_backend: impl ValidationBackend,
783	relay_parent: Hash,
784	validation_code_hash: ValidationCodeHash,
785	validation_code_bomb_limit: u32,
786) -> PreCheckOutcome
787where
788	Sender: SubsystemSender<RuntimeApiMessage>,
789{
790	let validation_code =
791		match request_validation_code_by_hash(sender, relay_parent, validation_code_hash).await {
792			Ok(Some(code)) => code,
793			_ => {
794				// The reasoning why this is "failed" and not invalid is because we assume that
795				// during pre-checking voting the relay-chain will pin the code. In case the code
796				// actually is not there, we issue failed since this looks more like a bug.
797				gum::warn!(
798					target: LOG_TARGET,
799					?relay_parent,
800					?validation_code_hash,
801					"precheck: requested validation code is not found on-chain!",
802				);
803				return PreCheckOutcome::Failed
804			},
805		};
806
807	let executor_params = if let Ok(executor_params) =
808		util::executor_params_at_relay_parent(relay_parent, sender).await
809	{
810		gum::debug!(
811			target: LOG_TARGET,
812			?relay_parent,
813			?validation_code_hash,
814			"precheck: acquired executor params for the session: {:?}",
815			executor_params,
816		);
817		executor_params
818	} else {
819		gum::warn!(
820			target: LOG_TARGET,
821			?relay_parent,
822			?validation_code_hash,
823			"precheck: failed to acquire executor params for the session, thus voting against.",
824		);
825		return PreCheckOutcome::Invalid
826	};
827
828	let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Precheck);
829
830	let pvf = PvfPrepData::from_code(
831		validation_code.0,
832		executor_params,
833		timeout,
834		PrepareJobKind::Prechecking,
835		validation_code_bomb_limit,
836	);
837
838	match validation_backend.precheck_pvf(pvf).await {
839		Ok(_) => PreCheckOutcome::Valid,
840		Err(prepare_err) =>
841			if prepare_err.is_deterministic() {
842				PreCheckOutcome::Invalid
843			} else {
844				PreCheckOutcome::Failed
845			},
846	}
847}
848
849async fn validate_candidate_exhaustive(
850	expected_session_index: SessionIndex,
851	mut validation_backend: impl ValidationBackend + Send,
852	persisted_validation_data: PersistedValidationData,
853	validation_code: ValidationCode,
854	candidate_receipt: CandidateReceipt,
855	pov: Arc<PoV>,
856	executor_params: ExecutorParams,
857	exec_kind: PvfExecKind,
858	metrics: &Metrics,
859	maybe_claim_queue: Option<ClaimQueueSnapshot>,
860	validation_code_bomb_limit: u32,
861) -> Result<ValidationResult, ValidationFailed> {
862	let _timer = metrics.time_validate_candidate_exhaustive();
863	let validation_code_hash = validation_code.hash();
864	let relay_parent = candidate_receipt.descriptor.relay_parent();
865	let para_id = candidate_receipt.descriptor.para_id();
866	let candidate_hash = candidate_receipt.hash();
867
868	gum::debug!(
869		target: LOG_TARGET,
870		?validation_code_hash,
871		?candidate_hash,
872		?para_id,
873		"About to validate a candidate.",
874	);
875
876	// We only check the session index for backing.
877	match (exec_kind, candidate_receipt.descriptor.session_index()) {
878		(PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_), Some(session_index)) =>
879			if session_index != expected_session_index {
880				return Ok(ValidationResult::Invalid(InvalidCandidate::InvalidSessionIndex))
881			},
882		(_, _) => {},
883	};
884
885	if let Err(e) = perform_basic_checks(
886		&candidate_receipt.descriptor,
887		persisted_validation_data.max_pov_size,
888		&pov,
889		&validation_code_hash,
890	) {
891		gum::debug!(target: LOG_TARGET, ?para_id, ?candidate_hash, "Invalid candidate (basic checks)");
892		return Ok(ValidationResult::Invalid(e))
893	}
894
895	let persisted_validation_data = Arc::new(persisted_validation_data);
896	let result = match exec_kind {
897		// Retry is disabled to reduce the chance of nondeterministic blocks getting backed and
898		// honest backers getting slashed.
899		PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_) => {
900			let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
901			let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind.into());
902			let pvf = PvfPrepData::from_code(
903				validation_code.0,
904				executor_params,
905				prep_timeout,
906				PrepareJobKind::Compilation,
907				validation_code_bomb_limit,
908			);
909
910			validation_backend
911				.validate_candidate(
912					pvf,
913					exec_timeout,
914					persisted_validation_data.clone(),
915					pov,
916					exec_kind.into(),
917					exec_kind,
918				)
919				.await
920		},
921		PvfExecKind::Approval | PvfExecKind::Dispute =>
922			validation_backend
923				.validate_candidate_with_retry(
924					validation_code.0,
925					pvf_exec_timeout(&executor_params, exec_kind.into()),
926					persisted_validation_data.clone(),
927					pov,
928					executor_params,
929					PVF_APPROVAL_EXECUTION_RETRY_DELAY,
930					exec_kind.into(),
931					exec_kind,
932					validation_code_bomb_limit,
933				)
934				.await,
935	};
936
937	if let Err(ref error) = result {
938		gum::info!(target: LOG_TARGET, ?para_id, ?candidate_hash, ?error, "Failed to validate candidate");
939	}
940
941	match result {
942		Err(ValidationError::Internal(e)) => {
943			gum::warn!(
944				target: LOG_TARGET,
945				?para_id,
946				?candidate_hash,
947				?e,
948				"An internal error occurred during validation, will abstain from voting",
949			);
950			Err(ValidationFailed(e.to_string()))
951		},
952		Err(ValidationError::Invalid(WasmInvalidCandidate::HardTimeout)) =>
953			Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)),
954		Err(ValidationError::Invalid(WasmInvalidCandidate::WorkerReportedInvalid(e))) =>
955			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))),
956		Err(ValidationError::Invalid(WasmInvalidCandidate::PoVDecompressionFailure)) =>
957			Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)),
958		Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)) =>
959			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
960				"ambiguous worker death".to_string(),
961			))),
962		Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))) =>
963			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
964		Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(err))) =>
965			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
966		Err(ValidationError::PossiblyInvalid(err @ PossiblyInvalidError::CorruptedArtifact)) =>
967			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err.to_string()))),
968
969		Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))) =>
970			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!(
971				"ambiguous job death: {err}"
972			)))),
973		Err(ValidationError::Preparation(e)) => {
974			gum::warn!(
975				target: LOG_TARGET,
976				?para_id,
977				?e,
978				"Deterministic error occurred during preparation (should have been ruled out by pre-checking phase)",
979			);
980			Err(ValidationFailed(e.to_string()))
981		},
982		Err(e @ ValidationError::ExecutionDeadline) => {
983			gum::warn!(
984				target: LOG_TARGET,
985				?para_id,
986				?e,
987				"Job assigned too late, execution queue probably overloaded",
988			);
989			Err(ValidationFailed(e.to_string()))
990		},
991		Ok(res) =>
992			if res.head_data.hash() != candidate_receipt.descriptor.para_head() {
993				gum::info!(target: LOG_TARGET, ?para_id, "Invalid candidate (para_head)");
994				Ok(ValidationResult::Invalid(InvalidCandidate::ParaHeadHashMismatch))
995			} else {
996				let committed_candidate_receipt = CommittedCandidateReceipt {
997					descriptor: candidate_receipt.descriptor.clone(),
998					commitments: CandidateCommitments {
999						head_data: res.head_data,
1000						upward_messages: res.upward_messages,
1001						horizontal_messages: res.horizontal_messages,
1002						new_validation_code: res.new_validation_code,
1003						processed_downward_messages: res.processed_downward_messages,
1004						hrmp_watermark: res.hrmp_watermark,
1005					},
1006				};
1007
1008				if candidate_receipt.commitments_hash !=
1009					committed_candidate_receipt.commitments.hash()
1010				{
1011					gum::info!(
1012						target: LOG_TARGET,
1013						?para_id,
1014						?candidate_hash,
1015						"Invalid candidate (commitments hash)"
1016					);
1017
1018					gum::trace!(
1019						target: LOG_TARGET,
1020						?para_id,
1021						?candidate_hash,
1022						produced_commitments = ?committed_candidate_receipt.commitments,
1023						"Invalid candidate commitments"
1024					);
1025
1026					// If validation produced a new set of commitments, we treat the candidate as
1027					// invalid.
1028					Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))
1029				} else {
1030					match exec_kind {
1031						// Core selectors are optional for V2 descriptors, but we still check the
1032						// descriptor core index.
1033						PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_) => {
1034							let Some(claim_queue) = maybe_claim_queue else {
1035								let error = "cannot fetch the claim queue from the runtime";
1036								gum::warn!(
1037									target: LOG_TARGET,
1038									?relay_parent,
1039									error
1040								);
1041
1042								return Err(ValidationFailed(error.into()))
1043							};
1044
1045							if let Err(err) = committed_candidate_receipt
1046								.parse_ump_signals(&transpose_claim_queue(claim_queue.0))
1047							{
1048								gum::warn!(
1049									target: LOG_TARGET,
1050									candidate_hash = ?candidate_receipt.hash(),
1051									"Invalid UMP signals: {}",
1052									err
1053								);
1054								return Ok(ValidationResult::Invalid(
1055									InvalidCandidate::InvalidUMPSignals(err),
1056								))
1057							}
1058						},
1059						// No checks for approvals and disputes
1060						_ => {},
1061					}
1062
1063					Ok(ValidationResult::Valid(
1064						committed_candidate_receipt.commitments,
1065						(*persisted_validation_data).clone(),
1066					))
1067				}
1068			},
1069	}
1070}
1071
1072#[async_trait]
1073trait ValidationBackend {
1074	/// Tries executing a PVF a single time (no retries).
1075	async fn validate_candidate(
1076		&mut self,
1077		pvf: PvfPrepData,
1078		exec_timeout: Duration,
1079		pvd: Arc<PersistedValidationData>,
1080		pov: Arc<PoV>,
1081		// The priority for the preparation job.
1082		prepare_priority: polkadot_node_core_pvf::Priority,
1083		// The kind for the execution job.
1084		exec_kind: PvfExecKind,
1085	) -> Result<WasmValidationResult, ValidationError>;
1086
1087	/// Tries executing a PVF. Will retry once if an error is encountered that may have
1088	/// been transient.
1089	///
1090	/// The `prepare_priority` is relevant in the context of the caller. Currently we expect
1091	/// that `approval` context has priority over `backing` context.
1092	///
1093	/// NOTE: Should retry only on errors that are a result of execution itself, and not of
1094	/// preparation.
1095	async fn validate_candidate_with_retry(
1096		&mut self,
1097		code: Vec<u8>,
1098		exec_timeout: Duration,
1099		pvd: Arc<PersistedValidationData>,
1100		pov: Arc<PoV>,
1101		executor_params: ExecutorParams,
1102		retry_delay: Duration,
1103		// The priority for the preparation job.
1104		prepare_priority: polkadot_node_core_pvf::Priority,
1105		// The kind for the execution job.
1106		exec_kind: PvfExecKind,
1107		validation_code_bomb_limit: u32,
1108	) -> Result<WasmValidationResult, ValidationError> {
1109		let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
1110		// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
1111		let pvf = PvfPrepData::from_code(
1112			code,
1113			executor_params,
1114			prep_timeout,
1115			PrepareJobKind::Compilation,
1116			validation_code_bomb_limit,
1117		);
1118		// We keep track of the total time that has passed and stop retrying if we are taking too
1119		// long.
1120		let total_time_start = Instant::now();
1121
1122		// Use `Priority::Critical` as finality trumps parachain liveliness.
1123		let mut validation_result = self
1124			.validate_candidate(
1125				pvf.clone(),
1126				exec_timeout,
1127				pvd.clone(),
1128				pov.clone(),
1129				prepare_priority,
1130				exec_kind,
1131			)
1132			.await;
1133		if validation_result.is_ok() {
1134			return validation_result
1135		}
1136
1137		macro_rules! break_if_no_retries_left {
1138			($counter:ident) => {
1139				if $counter > 0 {
1140					$counter -= 1;
1141				} else {
1142					break
1143				}
1144			};
1145		}
1146
1147		// Allow limited retries for each kind of error.
1148		let mut num_death_retries_left = 1;
1149		let mut num_job_error_retries_left = 1;
1150		let mut num_internal_retries_left = 1;
1151		let mut num_execution_error_retries_left = 1;
1152		loop {
1153			// Stop retrying if we exceeded the timeout.
1154			if total_time_start.elapsed() + retry_delay > exec_timeout {
1155				break
1156			}
1157			let mut retry_immediately = false;
1158			match validation_result {
1159				Err(ValidationError::PossiblyInvalid(
1160					PossiblyInvalidError::AmbiguousWorkerDeath |
1161					PossiblyInvalidError::AmbiguousJobDeath(_),
1162				)) => break_if_no_retries_left!(num_death_retries_left),
1163
1164				Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(_))) =>
1165					break_if_no_retries_left!(num_job_error_retries_left),
1166
1167				Err(ValidationError::Internal(_)) =>
1168					break_if_no_retries_left!(num_internal_retries_left),
1169
1170				Err(ValidationError::PossiblyInvalid(
1171					PossiblyInvalidError::RuntimeConstruction(_) |
1172					PossiblyInvalidError::CorruptedArtifact,
1173				)) => {
1174					break_if_no_retries_left!(num_execution_error_retries_left);
1175					self.precheck_pvf(pvf.clone()).await?;
1176					// In this case the error is deterministic
1177					// And a retry forces the ValidationBackend
1178					// to re-prepare the artifact so
1179					// there is no need to wait before the retry
1180					retry_immediately = true;
1181				},
1182
1183				Ok(_) |
1184				Err(
1185					ValidationError::Invalid(_) |
1186					ValidationError::Preparation(_) |
1187					ValidationError::ExecutionDeadline,
1188				) => break,
1189			}
1190
1191			// If we got a possibly transient error, retry once after a brief delay, on the
1192			// assumption that the conditions that caused this error may have resolved on their own.
1193			{
1194				// In case of many transient errors it is necessary to wait a little bit
1195				// for the error to be probably resolved
1196				if !retry_immediately {
1197					futures_timer::Delay::new(retry_delay).await;
1198				}
1199
1200				let new_timeout = exec_timeout.saturating_sub(total_time_start.elapsed());
1201
1202				gum::warn!(
1203					target: LOG_TARGET,
1204					?pvf,
1205					?new_timeout,
1206					"Re-trying failed candidate validation due to possible transient error: {:?}",
1207					validation_result
1208				);
1209
1210				validation_result = self
1211					.validate_candidate(
1212						pvf.clone(),
1213						new_timeout,
1214						pvd.clone(),
1215						pov.clone(),
1216						prepare_priority,
1217						exec_kind,
1218					)
1219					.await;
1220			}
1221		}
1222
1223		validation_result
1224	}
1225
1226	async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError>;
1227
1228	async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String>;
1229
1230	async fn update_active_leaves(
1231		&mut self,
1232		update: ActiveLeavesUpdate,
1233		ancestors: Vec<Hash>,
1234	) -> Result<(), String>;
1235}
1236
1237#[async_trait]
1238impl ValidationBackend for ValidationHost {
1239	/// Tries executing a PVF a single time (no retries).
1240	async fn validate_candidate(
1241		&mut self,
1242		pvf: PvfPrepData,
1243		exec_timeout: Duration,
1244		pvd: Arc<PersistedValidationData>,
1245		pov: Arc<PoV>,
1246		// The priority for the preparation job.
1247		prepare_priority: polkadot_node_core_pvf::Priority,
1248		// The kind for the execution job.
1249		exec_kind: PvfExecKind,
1250	) -> Result<WasmValidationResult, ValidationError> {
1251		let (tx, rx) = oneshot::channel();
1252		if let Err(err) = self
1253			.execute_pvf(pvf, exec_timeout, pvd, pov, prepare_priority, exec_kind, tx)
1254			.await
1255		{
1256			return Err(InternalValidationError::HostCommunication(format!(
1257				"cannot send pvf to the validation host, it might have shut down: {:?}",
1258				err
1259			))
1260			.into())
1261		}
1262
1263		rx.await.map_err(|_| {
1264			ValidationError::from(InternalValidationError::HostCommunication(
1265				"validation was cancelled".into(),
1266			))
1267		})?
1268	}
1269
1270	async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError> {
1271		let (tx, rx) = oneshot::channel();
1272		if let Err(err) = self.precheck_pvf(pvf, tx).await {
1273			// Return an IO error if there was an error communicating with the host.
1274			return Err(PrepareError::IoErr(err))
1275		}
1276
1277		let precheck_result = rx.await.map_err(|err| PrepareError::IoErr(err.to_string()))?;
1278
1279		precheck_result
1280	}
1281
1282	async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
1283		self.heads_up(active_pvfs).await
1284	}
1285
1286	async fn update_active_leaves(
1287		&mut self,
1288		update: ActiveLeavesUpdate,
1289		ancestors: Vec<Hash>,
1290	) -> Result<(), String> {
1291		self.update_active_leaves(update, ancestors).await
1292	}
1293}
1294
1295/// Does basic checks of a candidate. Provide the encoded PoV-block. Returns `Ok` if basic checks
1296/// are passed, `Err` otherwise.
1297fn perform_basic_checks(
1298	candidate: &CandidateDescriptor,
1299	max_pov_size: u32,
1300	pov: &PoV,
1301	validation_code_hash: &ValidationCodeHash,
1302) -> Result<(), InvalidCandidate> {
1303	let pov_hash = pov.hash();
1304
1305	let encoded_pov_size = pov.encoded_size();
1306	if encoded_pov_size > max_pov_size as usize {
1307		return Err(InvalidCandidate::ParamsTooLarge(encoded_pov_size as u64))
1308	}
1309
1310	if pov_hash != candidate.pov_hash() {
1311		return Err(InvalidCandidate::PoVHashMismatch)
1312	}
1313
1314	if *validation_code_hash != candidate.validation_code_hash() {
1315		return Err(InvalidCandidate::CodeHashMismatch)
1316	}
1317
1318	Ok(())
1319}
1320
1321/// To determine the amount of timeout time for the pvf execution.
1322///
1323/// Precheck
1324///	The time period after which the preparation worker is considered
1325/// unresponsive and will be killed.
1326///
1327/// Prepare
1328///The time period after which the preparation worker is considered
1329/// unresponsive and will be killed.
1330fn pvf_prep_timeout(executor_params: &ExecutorParams, kind: PvfPrepKind) -> Duration {
1331	if let Some(timeout) = executor_params.pvf_prep_timeout(kind) {
1332		return timeout
1333	}
1334	match kind {
1335		PvfPrepKind::Precheck => DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
1336		PvfPrepKind::Prepare => DEFAULT_LENIENT_PREPARATION_TIMEOUT,
1337	}
1338}
1339
1340/// To determine the amount of timeout time for the pvf execution.
1341///
1342/// Backing subsystem
1343/// The amount of time to spend on execution during backing.
1344///
1345/// Approval subsystem
1346/// The amount of time to spend on execution during approval or disputes.
1347/// This should be much longer than the backing execution timeout to ensure that in the
1348/// absence of extremely large disparities between hardware, blocks that pass backing are
1349/// considered executable by approval checkers or dispute participants.
1350fn pvf_exec_timeout(executor_params: &ExecutorParams, kind: RuntimePvfExecKind) -> Duration {
1351	if let Some(timeout) = executor_params.pvf_exec_timeout(kind) {
1352		return timeout
1353	}
1354	match kind {
1355		RuntimePvfExecKind::Backing => DEFAULT_BACKING_EXECUTION_TIMEOUT,
1356		RuntimePvfExecKind::Approval => DEFAULT_APPROVAL_EXECUTION_TIMEOUT,
1357	}
1358}