#![deny(unused_crate_dependencies, unused_results)]
#![warn(missing_docs)]
use polkadot_node_core_pvf::{
InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError,
PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost,
};
use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult};
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{
CandidateValidationMessage, ChainApiMessage, PreCheckOutcome, PvfExecKind,
RuntimeApiMessage, RuntimeApiRequest, ValidationFailed,
},
overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
SubsystemSender,
};
use polkadot_node_subsystem_util::{
self as util,
runtime::{prospective_parachains_mode, ClaimQueueSnapshot, ProspectiveParachainsMode},
};
use polkadot_overseer::ActiveLeavesUpdate;
use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult;
use polkadot_primitives::{
executor_params::{
DEFAULT_APPROVAL_EXECUTION_TIMEOUT, DEFAULT_BACKING_EXECUTION_TIMEOUT,
DEFAULT_LENIENT_PREPARATION_TIMEOUT, DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
},
vstaging::{
transpose_claim_queue, CandidateDescriptorV2 as CandidateDescriptor, CandidateEvent,
CandidateReceiptV2 as CandidateReceipt,
CommittedCandidateReceiptV2 as CommittedCandidateReceipt,
},
AuthorityDiscoveryId, CandidateCommitments, ExecutorParams, Hash, PersistedValidationData,
PvfExecKind as RuntimePvfExecKind, PvfPrepKind, SessionIndex, ValidationCode,
ValidationCodeHash, ValidatorId,
};
use sp_application_crypto::{AppCrypto, ByteArray};
use sp_keystore::KeystorePtr;
use codec::Encode;
use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
use std::{
collections::HashSet,
path::PathBuf,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
use async_trait::async_trait;
mod metrics;
use self::metrics::Metrics;
#[cfg(test)]
mod tests;
const LOG_TARGET: &'static str = "parachain::candidate-validation";
#[cfg(not(test))]
const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
#[cfg(test)]
const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
const TASK_LIMIT: usize = 30;
#[derive(Clone, Default)]
pub struct Config {
pub artifacts_cache_path: PathBuf,
pub node_version: Option<String>,
pub secure_validator_mode: bool,
pub prep_worker_path: PathBuf,
pub exec_worker_path: PathBuf,
pub pvf_execute_workers_max_num: usize,
pub pvf_prepare_workers_soft_max_num: usize,
pub pvf_prepare_workers_hard_max_num: usize,
}
pub struct CandidateValidationSubsystem {
keystore: KeystorePtr,
#[allow(missing_docs)]
pub metrics: Metrics,
#[allow(missing_docs)]
pub pvf_metrics: polkadot_node_core_pvf::Metrics,
config: Option<Config>,
}
impl CandidateValidationSubsystem {
pub fn with_config(
config: Option<Config>,
keystore: KeystorePtr,
metrics: Metrics,
pvf_metrics: polkadot_node_core_pvf::Metrics,
) -> Self {
CandidateValidationSubsystem { keystore, config, metrics, pvf_metrics }
}
}
#[overseer::subsystem(CandidateValidation, error=SubsystemError, prefix=self::overseer)]
impl<Context> CandidateValidationSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
if let Some(config) = self.config {
let future = run(ctx, self.keystore, self.metrics, self.pvf_metrics, config)
.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
.boxed();
SpawnedSubsystem { name: "candidate-validation-subsystem", future }
} else {
polkadot_overseer::DummySubsystem.start(ctx)
}
}
}
async fn claim_queue<Sender>(relay_parent: Hash, sender: &mut Sender) -> Option<ClaimQueueSnapshot>
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
match util::runtime::fetch_claim_queue(sender, relay_parent).await {
Ok(maybe_cq) => maybe_cq,
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
?err,
"Claim queue not available"
);
None
},
}
}
fn handle_validation_message<S>(
mut sender: S,
validation_host: ValidationHost,
metrics: Metrics,
msg: CandidateValidationMessage,
) -> Pin<Box<dyn Future<Output = ()> + Send>>
where
S: SubsystemSender<RuntimeApiMessage>,
{
match msg {
CandidateValidationMessage::ValidateFromExhaustive {
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
response_sender,
..
} =>
async move {
let _timer = metrics.time_validate_from_exhaustive();
let relay_parent = candidate_receipt.descriptor.relay_parent();
let maybe_claim_queue = claim_queue(relay_parent, &mut sender).await;
let maybe_expected_session_index =
match util::request_session_index_for_child(relay_parent, &mut sender)
.await
.await
{
Ok(Ok(expected_session_index)) => Some(expected_session_index),
_ => None,
};
let res = validate_candidate_exhaustive(
maybe_expected_session_index,
validation_host,
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind,
&metrics,
maybe_claim_queue,
)
.await;
metrics.on_validation_event(&res);
let _ = response_sender.send(res);
}
.boxed(),
CandidateValidationMessage::PreCheck {
relay_parent,
validation_code_hash,
response_sender,
..
} => async move {
let precheck_result =
precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash)
.await;
let _ = response_sender.send(precheck_result);
}
.boxed(),
}
}
#[overseer::contextbounds(CandidateValidation, prefix = self::overseer)]
async fn run<Context>(
mut ctx: Context,
keystore: KeystorePtr,
metrics: Metrics,
pvf_metrics: polkadot_node_core_pvf::Metrics,
Config {
artifacts_cache_path,
node_version,
secure_validator_mode,
prep_worker_path,
exec_worker_path,
pvf_execute_workers_max_num,
pvf_prepare_workers_soft_max_num,
pvf_prepare_workers_hard_max_num,
}: Config,
) -> SubsystemResult<()> {
let (validation_host, task) = polkadot_node_core_pvf::start(
polkadot_node_core_pvf::Config::new(
artifacts_cache_path,
node_version,
secure_validator_mode,
prep_worker_path,
exec_worker_path,
pvf_execute_workers_max_num,
pvf_prepare_workers_soft_max_num,
pvf_prepare_workers_hard_max_num,
),
pvf_metrics,
)
.await?;
ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
let mut tasks = FuturesUnordered::new();
let mut prepare_state = PrepareValidationState::default();
loop {
loop {
futures::select! {
comm = ctx.recv().fuse() => {
match comm {
Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update))) => {
update_active_leaves(ctx.sender(), validation_host.clone(), update.clone()).await;
maybe_prepare_validation(ctx.sender(), keystore.clone(), validation_host.clone(), update, &mut prepare_state).await;
},
Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {},
Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(FromOrchestra::Communication { msg }) => {
let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg);
tasks.push(task);
if tasks.len() >= TASK_LIMIT {
break
}
},
Err(e) => return Err(SubsystemError::from(e)),
}
},
_ = tasks.select_next_some() => ()
}
}
gum::debug!(target: LOG_TARGET, "Validation task limit hit");
loop {
futures::select! {
signal = ctx.recv_signal().fuse() => {
match signal {
Ok(OverseerSignal::ActiveLeaves(_)) => {},
Ok(OverseerSignal::BlockFinalized(..)) => {},
Ok(OverseerSignal::Conclude) => return Ok(()),
Err(e) => return Err(SubsystemError::from(e)),
}
},
_ = tasks.select_next_some() => {
if tasks.len() < TASK_LIMIT {
break
}
}
}
}
}
}
struct PrepareValidationState {
session_index: Option<SessionIndex>,
is_next_session_authority: bool,
already_prepared_code_hashes: HashSet<ValidationCodeHash>,
per_block_limit: usize,
}
impl Default for PrepareValidationState {
fn default() -> Self {
Self {
session_index: None,
is_next_session_authority: false,
already_prepared_code_hashes: HashSet::new(),
per_block_limit: 1,
}
}
}
async fn maybe_prepare_validation<Sender>(
sender: &mut Sender,
keystore: KeystorePtr,
validation_backend: impl ValidationBackend,
update: ActiveLeavesUpdate,
state: &mut PrepareValidationState,
) where
Sender: SubsystemSender<RuntimeApiMessage>,
{
let Some(leaf) = update.activated else { return };
let new_session_index = new_session_index(sender, state.session_index, leaf.hash).await;
if new_session_index.is_some() {
state.session_index = new_session_index;
state.already_prepared_code_hashes.clear();
state.is_next_session_authority = check_next_session_authority(
sender,
keystore,
leaf.hash,
state.session_index.expect("qed: just checked above"),
)
.await;
}
if state.is_next_session_authority {
let code_hashes = prepare_pvfs_for_backed_candidates(
sender,
validation_backend,
leaf.hash,
&state.already_prepared_code_hashes,
state.per_block_limit,
)
.await;
state.already_prepared_code_hashes.extend(code_hashes.unwrap_or_default());
}
}
async fn new_session_index<Sender>(
sender: &mut Sender,
session_index: Option<SessionIndex>,
relay_parent: Hash,
) -> Option<SessionIndex>
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
let Ok(Ok(new_session_index)) =
util::request_session_index_for_child(relay_parent, sender).await.await
else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch session index from runtime API",
);
return None
};
session_index.map_or(Some(new_session_index), |index| {
if new_session_index > index {
Some(new_session_index)
} else {
None
}
})
}
async fn check_next_session_authority<Sender>(
sender: &mut Sender,
keystore: KeystorePtr,
relay_parent: Hash,
session_index: SessionIndex,
) -> bool
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
let Ok(Ok(authorities)) = util::request_authorities(relay_parent, sender).await.await else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch authorities from runtime API",
);
return false
};
let Ok(Ok(Some(session_info))) =
util::request_session_info(relay_parent, session_index, sender).await.await
else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch session info from runtime API",
);
return false
};
let is_past_present_or_future_authority = authorities
.iter()
.any(|v| keystore.has_keys(&[(v.to_raw_vec(), AuthorityDiscoveryId::ID)]));
let is_present_validator = session_info
.validators
.iter()
.any(|v| keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]));
is_past_present_or_future_authority && !is_present_validator
}
async fn prepare_pvfs_for_backed_candidates<Sender>(
sender: &mut Sender,
mut validation_backend: impl ValidationBackend,
relay_parent: Hash,
already_prepared: &HashSet<ValidationCodeHash>,
per_block_limit: usize,
) -> Option<Vec<ValidationCodeHash>>
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
let Ok(Ok(events)) = util::request_candidate_events(relay_parent, sender).await.await else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch candidate events from runtime API",
);
return None
};
let code_hashes = events
.into_iter()
.filter_map(|e| match e {
CandidateEvent::CandidateBacked(receipt, ..) => {
let h = receipt.descriptor.validation_code_hash();
if already_prepared.contains(&h) {
None
} else {
Some(h)
}
},
_ => None,
})
.take(per_block_limit)
.collect::<Vec<_>>();
let Ok(executor_params) = util::executor_params_at_relay_parent(relay_parent, sender).await
else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
"cannot fetch executor params for the session",
);
return None
};
let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
let mut active_pvfs = vec![];
let mut processed_code_hashes = vec![];
for code_hash in code_hashes {
let Ok(Ok(Some(validation_code))) =
util::request_validation_code_by_hash(relay_parent, code_hash, sender)
.await
.await
else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
?code_hash,
"cannot fetch validation code hash from runtime API",
);
continue;
};
let pvf = PvfPrepData::from_code(
validation_code.0,
executor_params.clone(),
timeout,
PrepareJobKind::Prechecking,
);
active_pvfs.push(pvf);
processed_code_hashes.push(code_hash);
}
if active_pvfs.is_empty() {
return None
}
if let Err(err) = validation_backend.heads_up(active_pvfs).await {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
?err,
"cannot prepare PVF for the next session",
);
return None
};
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?processed_code_hashes,
"Prepared PVF for the next session",
);
Some(processed_code_hashes)
}
async fn update_active_leaves<Sender>(
sender: &mut Sender,
mut validation_backend: impl ValidationBackend,
update: ActiveLeavesUpdate,
) where
Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
{
let ancestors = get_block_ancestors(sender, update.activated.as_ref().map(|x| x.hash)).await;
if let Err(err) = validation_backend.update_active_leaves(update, ancestors).await {
gum::warn!(
target: LOG_TARGET,
?err,
"cannot update active leaves in validation backend",
);
};
}
async fn get_allowed_ancestry_len<Sender>(sender: &mut Sender, relay_parent: Hash) -> Option<usize>
where
Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
{
match prospective_parachains_mode(sender, relay_parent).await {
Ok(ProspectiveParachainsMode::Enabled { allowed_ancestry_len, .. }) =>
Some(allowed_ancestry_len),
res => {
gum::warn!(target: LOG_TARGET, ?res, "async backing is disabled");
None
},
}
}
async fn get_block_ancestors<Sender>(
sender: &mut Sender,
maybe_relay_parent: Option<Hash>,
) -> Vec<Hash>
where
Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
{
let Some(relay_parent) = maybe_relay_parent else { return vec![] };
let Some(allowed_ancestry_len) = get_allowed_ancestry_len(sender, relay_parent).await else {
return vec![]
};
let (tx, rx) = oneshot::channel();
sender
.send_message(ChainApiMessage::Ancestors {
hash: relay_parent,
k: allowed_ancestry_len,
response_channel: tx,
})
.await;
match rx.await {
Ok(Ok(x)) => x,
res => {
gum::warn!(target: LOG_TARGET, ?res, "cannot request ancestors");
vec![]
},
}
}
struct RuntimeRequestFailed;
async fn runtime_api_request<T, Sender>(
sender: &mut Sender,
relay_parent: Hash,
request: RuntimeApiRequest,
receiver: oneshot::Receiver<Result<T, RuntimeApiError>>,
) -> Result<T, RuntimeRequestFailed>
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
sender
.send_message(RuntimeApiMessage::Request(relay_parent, request).into())
.await;
receiver
.await
.map_err(|_| {
gum::debug!(target: LOG_TARGET, ?relay_parent, "Runtime API request dropped");
RuntimeRequestFailed
})
.and_then(|res| {
res.map_err(|e| {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
err = ?e,
"Runtime API request internal error"
);
RuntimeRequestFailed
})
})
}
async fn request_validation_code_by_hash<Sender>(
sender: &mut Sender,
relay_parent: Hash,
validation_code_hash: ValidationCodeHash,
) -> Result<Option<ValidationCode>, RuntimeRequestFailed>
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
let (tx, rx) = oneshot::channel();
runtime_api_request(
sender,
relay_parent,
RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
rx,
)
.await
}
async fn precheck_pvf<Sender>(
sender: &mut Sender,
mut validation_backend: impl ValidationBackend,
relay_parent: Hash,
validation_code_hash: ValidationCodeHash,
) -> PreCheckOutcome
where
Sender: SubsystemSender<RuntimeApiMessage>,
{
let validation_code =
match request_validation_code_by_hash(sender, relay_parent, validation_code_hash).await {
Ok(Some(code)) => code,
_ => {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
?validation_code_hash,
"precheck: requested validation code is not found on-chain!",
);
return PreCheckOutcome::Failed
},
};
let executor_params = if let Ok(executor_params) =
util::executor_params_at_relay_parent(relay_parent, sender).await
{
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?validation_code_hash,
"precheck: acquired executor params for the session: {:?}",
executor_params,
);
executor_params
} else {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
?validation_code_hash,
"precheck: failed to acquire executor params for the session, thus voting against.",
);
return PreCheckOutcome::Invalid
};
let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Precheck);
let pvf = PvfPrepData::from_code(
validation_code.0,
executor_params,
timeout,
PrepareJobKind::Prechecking,
);
match validation_backend.precheck_pvf(pvf).await {
Ok(_) => PreCheckOutcome::Valid,
Err(prepare_err) =>
if prepare_err.is_deterministic() {
PreCheckOutcome::Invalid
} else {
PreCheckOutcome::Failed
},
}
}
async fn validate_candidate_exhaustive(
maybe_expected_session_index: Option<SessionIndex>,
mut validation_backend: impl ValidationBackend + Send,
persisted_validation_data: PersistedValidationData,
validation_code: ValidationCode,
candidate_receipt: CandidateReceipt,
pov: Arc<PoV>,
executor_params: ExecutorParams,
exec_kind: PvfExecKind,
metrics: &Metrics,
maybe_claim_queue: Option<ClaimQueueSnapshot>,
) -> Result<ValidationResult, ValidationFailed> {
let _timer = metrics.time_validate_candidate_exhaustive();
let validation_code_hash = validation_code.hash();
let relay_parent = candidate_receipt.descriptor.relay_parent();
let para_id = candidate_receipt.descriptor.para_id();
gum::debug!(
target: LOG_TARGET,
?validation_code_hash,
?para_id,
"About to validate a candidate.",
);
match (exec_kind, candidate_receipt.descriptor.session_index()) {
(PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_), Some(session_index)) => {
let Some(expected_session_index) = maybe_expected_session_index else {
let error = "cannot fetch session index from the runtime";
gum::warn!(
target: LOG_TARGET,
?relay_parent,
error,
);
return Err(ValidationFailed(error.into()))
};
if session_index != expected_session_index {
return Ok(ValidationResult::Invalid(InvalidCandidate::InvalidSessionIndex))
}
},
(_, _) => {},
};
if let Err(e) = perform_basic_checks(
&candidate_receipt.descriptor,
persisted_validation_data.max_pov_size,
&pov,
&validation_code_hash,
) {
gum::info!(target: LOG_TARGET, ?para_id, "Invalid candidate (basic checks)");
return Ok(ValidationResult::Invalid(e))
}
let persisted_validation_data = Arc::new(persisted_validation_data);
let result = match exec_kind {
PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_) => {
let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
let exec_timeout = pvf_exec_timeout(&executor_params, exec_kind.into());
let pvf = PvfPrepData::from_code(
validation_code.0,
executor_params,
prep_timeout,
PrepareJobKind::Compilation,
);
validation_backend
.validate_candidate(
pvf,
exec_timeout,
persisted_validation_data.clone(),
pov,
exec_kind.into(),
exec_kind,
)
.await
},
PvfExecKind::Approval | PvfExecKind::Dispute =>
validation_backend
.validate_candidate_with_retry(
validation_code.0,
pvf_exec_timeout(&executor_params, exec_kind.into()),
persisted_validation_data.clone(),
pov,
executor_params,
PVF_APPROVAL_EXECUTION_RETRY_DELAY,
exec_kind.into(),
exec_kind,
)
.await,
};
if let Err(ref error) = result {
gum::info!(target: LOG_TARGET, ?para_id, ?error, "Failed to validate candidate");
}
match result {
Err(ValidationError::Internal(e)) => {
gum::warn!(
target: LOG_TARGET,
?para_id,
?e,
"An internal error occurred during validation, will abstain from voting",
);
Err(ValidationFailed(e.to_string()))
},
Err(ValidationError::Invalid(WasmInvalidCandidate::HardTimeout)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)),
Err(ValidationError::Invalid(WasmInvalidCandidate::WorkerReportedInvalid(e))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e))),
Err(ValidationError::Invalid(WasmInvalidCandidate::PoVDecompressionFailure)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure)),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
"ambiguous worker death".to_string(),
))),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err))),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))) =>
Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!(
"ambiguous job death: {err}"
)))),
Err(ValidationError::Preparation(e)) => {
gum::warn!(
target: LOG_TARGET,
?para_id,
?e,
"Deterministic error occurred during preparation (should have been ruled out by pre-checking phase)",
);
Err(ValidationFailed(e.to_string()))
},
Err(e @ ValidationError::ExecutionDeadline) => {
gum::warn!(
target: LOG_TARGET,
?para_id,
?e,
"Job assigned too late, execution queue probably overloaded",
);
Err(ValidationFailed(e.to_string()))
},
Ok(res) =>
if res.head_data.hash() != candidate_receipt.descriptor.para_head() {
gum::info!(target: LOG_TARGET, ?para_id, "Invalid candidate (para_head)");
Ok(ValidationResult::Invalid(InvalidCandidate::ParaHeadHashMismatch))
} else {
let committed_candidate_receipt = CommittedCandidateReceipt {
descriptor: candidate_receipt.descriptor.clone(),
commitments: CandidateCommitments {
head_data: res.head_data,
upward_messages: res.upward_messages,
horizontal_messages: res.horizontal_messages,
new_validation_code: res.new_validation_code,
processed_downward_messages: res.processed_downward_messages,
hrmp_watermark: res.hrmp_watermark,
},
};
if candidate_receipt.commitments_hash !=
committed_candidate_receipt.commitments.hash()
{
gum::info!(
target: LOG_TARGET,
?para_id,
"Invalid candidate (commitments hash)"
);
Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))
} else {
let core_index = candidate_receipt.descriptor.core_index();
match (core_index, exec_kind) {
(
Some(_core_index),
PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_),
) => {
let Some(claim_queue) = maybe_claim_queue else {
let error = "cannot fetch the claim queue from the runtime";
gum::warn!(
target: LOG_TARGET,
?relay_parent,
error
);
return Err(ValidationFailed(error.into()))
};
if let Err(err) = committed_candidate_receipt
.check_core_index(&transpose_claim_queue(claim_queue.0))
{
gum::warn!(
target: LOG_TARGET,
?err,
candidate_hash = ?candidate_receipt.hash(),
"Candidate core index is invalid",
);
return Ok(ValidationResult::Invalid(
InvalidCandidate::InvalidCoreIndex,
))
}
},
(_, _) => {},
}
Ok(ValidationResult::Valid(
committed_candidate_receipt.commitments,
(*persisted_validation_data).clone(),
))
}
},
}
}
#[async_trait]
trait ValidationBackend {
async fn validate_candidate(
&mut self,
pvf: PvfPrepData,
exec_timeout: Duration,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
prepare_priority: polkadot_node_core_pvf::Priority,
exec_kind: PvfExecKind,
) -> Result<WasmValidationResult, ValidationError>;
async fn validate_candidate_with_retry(
&mut self,
code: Vec<u8>,
exec_timeout: Duration,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
executor_params: ExecutorParams,
retry_delay: Duration,
prepare_priority: polkadot_node_core_pvf::Priority,
exec_kind: PvfExecKind,
) -> Result<WasmValidationResult, ValidationError> {
let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
let pvf = PvfPrepData::from_code(
code,
executor_params,
prep_timeout,
PrepareJobKind::Compilation,
);
let total_time_start = Instant::now();
let mut validation_result = self
.validate_candidate(
pvf.clone(),
exec_timeout,
pvd.clone(),
pov.clone(),
prepare_priority,
exec_kind,
)
.await;
if validation_result.is_ok() {
return validation_result
}
macro_rules! break_if_no_retries_left {
($counter:ident) => {
if $counter > 0 {
$counter -= 1;
} else {
break
}
};
}
let mut num_death_retries_left = 1;
let mut num_job_error_retries_left = 1;
let mut num_internal_retries_left = 1;
let mut num_runtime_construction_retries_left = 1;
loop {
if total_time_start.elapsed() + retry_delay > exec_timeout {
break
}
let mut retry_immediately = false;
match validation_result {
Err(ValidationError::PossiblyInvalid(
PossiblyInvalidError::AmbiguousWorkerDeath |
PossiblyInvalidError::AmbiguousJobDeath(_),
)) => break_if_no_retries_left!(num_death_retries_left),
Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(_))) =>
break_if_no_retries_left!(num_job_error_retries_left),
Err(ValidationError::Internal(_)) =>
break_if_no_retries_left!(num_internal_retries_left),
Err(ValidationError::PossiblyInvalid(
PossiblyInvalidError::RuntimeConstruction(_),
)) => {
break_if_no_retries_left!(num_runtime_construction_retries_left);
self.precheck_pvf(pvf.clone()).await?;
retry_immediately = true;
},
Ok(_) |
Err(
ValidationError::Invalid(_) |
ValidationError::Preparation(_) |
ValidationError::ExecutionDeadline,
) => break,
}
{
if !retry_immediately {
futures_timer::Delay::new(retry_delay).await;
}
let new_timeout = exec_timeout.saturating_sub(total_time_start.elapsed());
gum::warn!(
target: LOG_TARGET,
?pvf,
?new_timeout,
"Re-trying failed candidate validation due to possible transient error: {:?}",
validation_result
);
validation_result = self
.validate_candidate(
pvf.clone(),
new_timeout,
pvd.clone(),
pov.clone(),
prepare_priority,
exec_kind,
)
.await;
}
}
validation_result
}
async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError>;
async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String>;
async fn update_active_leaves(
&mut self,
update: ActiveLeavesUpdate,
ancestors: Vec<Hash>,
) -> Result<(), String>;
}
#[async_trait]
impl ValidationBackend for ValidationHost {
async fn validate_candidate(
&mut self,
pvf: PvfPrepData,
exec_timeout: Duration,
pvd: Arc<PersistedValidationData>,
pov: Arc<PoV>,
prepare_priority: polkadot_node_core_pvf::Priority,
exec_kind: PvfExecKind,
) -> Result<WasmValidationResult, ValidationError> {
let (tx, rx) = oneshot::channel();
if let Err(err) = self
.execute_pvf(pvf, exec_timeout, pvd, pov, prepare_priority, exec_kind, tx)
.await
{
return Err(InternalValidationError::HostCommunication(format!(
"cannot send pvf to the validation host, it might have shut down: {:?}",
err
))
.into())
}
rx.await.map_err(|_| {
ValidationError::from(InternalValidationError::HostCommunication(
"validation was cancelled".into(),
))
})?
}
async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError> {
let (tx, rx) = oneshot::channel();
if let Err(err) = self.precheck_pvf(pvf, tx).await {
return Err(PrepareError::IoErr(err))
}
let precheck_result = rx.await.map_err(|err| PrepareError::IoErr(err.to_string()))?;
precheck_result
}
async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
self.heads_up(active_pvfs).await
}
async fn update_active_leaves(
&mut self,
update: ActiveLeavesUpdate,
ancestors: Vec<Hash>,
) -> Result<(), String> {
self.update_active_leaves(update, ancestors).await
}
}
fn perform_basic_checks(
candidate: &CandidateDescriptor,
max_pov_size: u32,
pov: &PoV,
validation_code_hash: &ValidationCodeHash,
) -> Result<(), InvalidCandidate> {
let pov_hash = pov.hash();
let encoded_pov_size = pov.encoded_size();
if encoded_pov_size > max_pov_size as usize {
return Err(InvalidCandidate::ParamsTooLarge(encoded_pov_size as u64))
}
if pov_hash != candidate.pov_hash() {
return Err(InvalidCandidate::PoVHashMismatch)
}
if *validation_code_hash != candidate.validation_code_hash() {
return Err(InvalidCandidate::CodeHashMismatch)
}
if let Err(()) = candidate.check_collator_signature() {
return Err(InvalidCandidate::BadSignature)
}
Ok(())
}
fn pvf_prep_timeout(executor_params: &ExecutorParams, kind: PvfPrepKind) -> Duration {
if let Some(timeout) = executor_params.pvf_prep_timeout(kind) {
return timeout
}
match kind {
PvfPrepKind::Precheck => DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
PvfPrepKind::Prepare => DEFAULT_LENIENT_PREPARATION_TIMEOUT,
}
}
fn pvf_exec_timeout(executor_params: &ExecutorParams, kind: RuntimePvfExecKind) -> Duration {
if let Some(timeout) = executor_params.pvf_exec_timeout(kind) {
return timeout
}
match kind {
RuntimePvfExecKind::Backing => DEFAULT_BACKING_EXECUTION_TIMEOUT,
RuntimePvfExecKind::Approval => DEFAULT_APPROVAL_EXECUTION_TIMEOUT,
}
}