use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered};
use polkadot_node_subsystem::{
messages::{CandidateValidationMessage, PreCheckOutcome, PvfCheckerMessage, RuntimeApiMessage},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
SubsystemResult, SubsystemSender,
};
use polkadot_primitives::{
BlockNumber, Hash, PvfCheckStatement, SessionIndex, ValidationCodeHash, ValidatorId,
ValidatorIndex,
};
use sp_keystore::KeystorePtr;
use std::collections::HashSet;
const LOG_TARGET: &str = "parachain::pvf-checker";
mod interest_view;
mod metrics;
mod runtime_api;
#[cfg(test)]
mod tests;
use self::{
interest_view::{InterestView, Judgement},
metrics::Metrics,
};
pub struct PvfCheckerSubsystem {
keystore: KeystorePtr,
metrics: Metrics,
}
impl PvfCheckerSubsystem {
pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
PvfCheckerSubsystem { keystore, metrics }
}
}
#[overseer::subsystem(PvfChecker, error=SubsystemError, prefix = self::overseer)]
impl<Context> PvfCheckerSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run(ctx, self.keystore, self.metrics)
.map_err(|e| SubsystemError::with_origin("pvf-checker", e))
.boxed();
SpawnedSubsystem { name: "pvf-checker-subsystem", future }
}
}
struct SigningCredentials {
validator_key: ValidatorId,
validator_index: ValidatorIndex,
}
struct State {
credentials: Option<SigningCredentials>,
recent_block: Option<(BlockNumber, Hash)>,
latest_session: Option<SessionIndex>,
voted: HashSet<ValidationCodeHash>,
view: InterestView,
currently_checking:
FuturesUnordered<BoxFuture<'static, Option<(PreCheckOutcome, ValidationCodeHash)>>>,
}
#[overseer::contextbounds(PvfChecker, prefix = self::overseer)]
async fn run<Context>(
mut ctx: Context,
keystore: KeystorePtr,
metrics: Metrics,
) -> SubsystemResult<()> {
let mut state = State {
credentials: None,
recent_block: None,
latest_session: None,
voted: HashSet::with_capacity(16),
view: InterestView::new(),
currently_checking: FuturesUnordered::new(),
};
loop {
let mut sender = ctx.sender().clone();
futures::select! {
precheck_response = state.currently_checking.select_next_some() => {
if let Some((outcome, validation_code_hash)) = precheck_response {
handle_pvf_check(
&mut state,
&mut sender,
&keystore,
&metrics,
outcome,
validation_code_hash,
).await;
} else {
}
}
from_overseer = ctx.recv().fuse() => {
let outcome = handle_from_overseer(
&mut state,
&mut sender,
&keystore,
&metrics,
from_overseer?,
)
.await;
if let Some(Conclude) = outcome {
return Ok(());
}
}
}
}
}
async fn handle_pvf_check(
state: &mut State,
sender: &mut impl overseer::PvfCheckerSenderTrait,
keystore: &KeystorePtr,
metrics: &Metrics,
outcome: PreCheckOutcome,
validation_code_hash: ValidationCodeHash,
) {
gum::debug!(
target: LOG_TARGET,
?validation_code_hash,
"Received pre-check result: {:?}",
outcome,
);
let judgement = match outcome {
PreCheckOutcome::Valid => Judgement::Valid,
PreCheckOutcome::Invalid => Judgement::Invalid,
PreCheckOutcome::Failed => {
gum::info!(
target: LOG_TARGET,
?validation_code_hash,
"Pre-check failed, voting against",
);
Judgement::Invalid
},
};
match state.view.on_judgement(validation_code_hash, judgement) {
Ok(()) => (),
Err(()) => {
gum::debug!(
target: LOG_TARGET,
?validation_code_hash,
"received judgement for an unknown (or removed) PVF hash",
);
return
},
}
match (state.credentials.as_ref(), state.recent_block, state.latest_session) {
(Some(credentials), Some(recent_block), Some(session_index)) => {
sign_and_submit_pvf_check_statement(
sender,
keystore,
&mut state.voted,
credentials,
metrics,
recent_block.1,
session_index,
judgement,
validation_code_hash,
)
.await;
},
_ => (),
}
}
struct Conclude;
async fn handle_from_overseer(
state: &mut State,
sender: &mut impl overseer::PvfCheckerSenderTrait,
keystore: &KeystorePtr,
metrics: &Metrics,
from_overseer: FromOrchestra<PvfCheckerMessage>,
) -> Option<Conclude> {
match from_overseer {
FromOrchestra::Signal(OverseerSignal::Conclude) => {
gum::info!(target: LOG_TARGET, "Received `Conclude` signal, exiting");
Some(Conclude)
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_, _)) => {
None
},
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
handle_leaves_update(state, sender, keystore, metrics, update).await;
None
},
FromOrchestra::Communication { msg } => match msg {
},
}
}
async fn handle_leaves_update(
state: &mut State,
sender: &mut impl overseer::PvfCheckerSenderTrait,
keystore: &KeystorePtr,
metrics: &Metrics,
update: ActiveLeavesUpdate,
) {
if let Some(activated) = update.activated {
let ActivationEffect { new_session_index, recent_block, pending_pvfs } =
match examine_activation(state, sender, keystore, activated.hash, activated.number)
.await
{
None => {
return
},
Some(e) => e,
};
let recent_block_hash = recent_block.1;
state.recent_block = Some(recent_block);
let outcome = state
.view
.on_leaves_update(Some((activated.hash, pending_pvfs)), &update.deactivated);
metrics.on_pvf_observed(outcome.newcomers.len());
metrics.on_pvf_left(outcome.left_num);
for newcomer in outcome.newcomers {
initiate_precheck(state, sender, activated.hash, newcomer, metrics).await;
}
if let Some((new_session_index, credentials)) = new_session_index {
state.latest_session = Some(new_session_index);
state.voted.clear();
state.credentials = credentials;
if let Some(ref credentials) = state.credentials {
for (code_hash, judgement) in state.view.judgements() {
sign_and_submit_pvf_check_statement(
sender,
keystore,
&mut state.voted,
credentials,
metrics,
recent_block_hash,
new_session_index,
judgement,
code_hash,
)
.await;
}
}
}
} else {
state.view.on_leaves_update(None, &update.deactivated);
}
}
struct ActivationEffect {
new_session_index: Option<(SessionIndex, Option<SigningCredentials>)>,
recent_block: (BlockNumber, Hash),
pending_pvfs: Vec<ValidationCodeHash>,
}
async fn examine_activation(
state: &mut State,
sender: &mut impl overseer::PvfCheckerSenderTrait,
keystore: &KeystorePtr,
leaf_hash: Hash,
leaf_number: BlockNumber,
) -> Option<ActivationEffect> {
gum::debug!(
target: LOG_TARGET,
"Examining activation of leaf {:?} ({})",
leaf_hash,
leaf_number,
);
let pending_pvfs = match runtime_api::pvfs_require_precheck(sender, leaf_hash).await {
Err(runtime_api::RuntimeRequestError::NotSupported) => return None,
Err(_) => {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?leaf_hash,
"cannot fetch PVFs that require pre-checking from runtime API",
);
Vec::new()
},
Ok(v) => v,
};
let recent_block = match state.recent_block {
Some((recent_block_num, recent_block_hash)) if leaf_number < recent_block_num => {
(recent_block_num, recent_block_hash)
},
_ => (leaf_number, leaf_hash),
};
let new_session_index = match runtime_api::session_index_for_child(sender, leaf_hash).await {
Ok(session_index) =>
if state.latest_session.map_or(true, |l| l < session_index) {
let signing_credentials =
check_signing_credentials(sender, keystore, leaf_hash).await;
Some((session_index, signing_credentials))
} else {
None
},
Err(e) => {
gum::warn!(
target: LOG_TARGET,
relay_parent = ?leaf_hash,
"cannot fetch session index from runtime API: {:?}",
e,
);
None
},
};
Some(ActivationEffect { new_session_index, recent_block, pending_pvfs })
}
async fn check_signing_credentials(
sender: &mut impl SubsystemSender<RuntimeApiMessage>,
keystore: &KeystorePtr,
leaf: Hash,
) -> Option<SigningCredentials> {
let validators = match runtime_api::validators(sender, leaf).await {
Ok(v) => v,
Err(e) => {
gum::warn!(
target: LOG_TARGET,
relay_parent = ?leaf,
"error occurred during requesting validators: {:?}",
e
);
return None
},
};
polkadot_node_subsystem_util::signing_key_and_index(&validators, keystore).map(
|(validator_key, validator_index)| SigningCredentials { validator_key, validator_index },
)
}
async fn sign_and_submit_pvf_check_statement(
sender: &mut impl overseer::PvfCheckerSenderTrait,
keystore: &KeystorePtr,
voted: &mut HashSet<ValidationCodeHash>,
credentials: &SigningCredentials,
metrics: &Metrics,
relay_parent: Hash,
session_index: SessionIndex,
judgement: Judgement,
validation_code_hash: ValidationCodeHash,
) {
gum::debug!(
target: LOG_TARGET,
?validation_code_hash,
?relay_parent,
"submitting a PVF check statement for validation code = {:?}",
judgement,
);
metrics.on_vote_submission_started();
if voted.contains(&validation_code_hash) {
gum::trace!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
?validation_code_hash,
"already voted for this validation code",
);
metrics.on_vote_duplicate();
return
}
voted.insert(validation_code_hash);
let stmt = PvfCheckStatement {
accept: judgement.is_valid(),
session_index,
subject: validation_code_hash,
validator_index: credentials.validator_index,
};
let signature = match polkadot_node_subsystem_util::sign(
keystore,
&credentials.validator_key,
&stmt.signing_payload(),
) {
Ok(Some(signature)) => signature,
Ok(None) => {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
validator_index = ?credentials.validator_index,
?validation_code_hash,
"private key for signing is not available",
);
return
},
Err(e) => {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
validator_index = ?credentials.validator_index,
?validation_code_hash,
"error signing the statement: {:?}",
e,
);
return
},
};
match runtime_api::submit_pvf_check_statement(sender, relay_parent, stmt, signature).await {
Ok(()) => {
metrics.on_vote_submitted();
},
Err(e) => {
gum::warn!(
target: LOG_TARGET,
?relay_parent,
?validation_code_hash,
"error occurred during submitting a vote: {:?}",
e,
);
},
}
}
async fn initiate_precheck(
state: &mut State,
sender: &mut impl overseer::PvfCheckerSenderTrait,
relay_parent: Hash,
validation_code_hash: ValidationCodeHash,
metrics: &Metrics,
) {
gum::debug!(target: LOG_TARGET, ?validation_code_hash, ?relay_parent, "initiating a precheck",);
let (tx, rx) = oneshot::channel();
sender
.send_message(CandidateValidationMessage::PreCheck {
relay_parent,
validation_code_hash,
response_sender: tx,
})
.await;
let timer = metrics.time_pre_check_judgement();
state.currently_checking.push(Box::pin(async move {
let _timer = timer;
match rx.await {
Ok(accept) => Some((accept, validation_code_hash)),
Err(oneshot::Canceled) => {
gum::debug!(
target: LOG_TARGET,
?validation_code_hash,
?relay_parent,
"precheck request was canceled",
);
None
},
}
}));
}