#![deny(unused_crate_dependencies)]
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use bitvec::vec::BitVec;
use futures::{
channel::{mpsc, oneshot},
future::BoxFuture,
stream::FuturesOrdered,
FutureExt, SinkExt, StreamExt, TryFutureExt,
};
use schnellru::{ByLength, LruMap};
use error::{Error, FatalResult};
use polkadot_node_primitives::{
AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
ValidationResult,
};
use polkadot_node_subsystem::{
messages::{
AvailabilityDistributionMessage, AvailabilityStoreMessage, CanSecondRequest,
CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage,
HypotheticalCandidate, HypotheticalMembershipRequest, IntroduceSecondedCandidateRequest,
ProspectiveParachainsMessage, ProvisionableData, ProvisionerMessage, PvfExecKind,
RuntimeApiMessage, RuntimeApiRequest, StatementDistributionMessage,
StoreAvailableDataError,
},
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
SubsystemError,
};
use polkadot_node_subsystem_util::{
self as util,
backing_implicit_view::View as ImplicitView,
request_claim_queue, request_disabled_validators, request_session_executor_params,
request_session_index_for_child, request_validator_groups, request_validators,
runtime::{self, request_min_backing_votes, ClaimQueueSnapshot},
Validator,
};
use polkadot_parachain_primitives::primitives::IsSystem;
use polkadot_primitives::{
node_features::FeatureIndex,
vstaging::{
BackedCandidate, CandidateReceiptV2 as CandidateReceipt,
CommittedCandidateReceiptV2 as CommittedCandidateReceipt,
},
CandidateCommitments, CandidateHash, CoreIndex, ExecutorParams, GroupIndex, GroupRotationInfo,
Hash, Id as ParaId, IndexedVec, NodeFeatures, PersistedValidationData, SessionIndex,
SigningContext, ValidationCode, ValidatorId, ValidatorIndex, ValidatorSignature,
ValidityAttestation,
};
use polkadot_statement_table::{
generic::AttestedCandidate as TableAttestedCandidate,
v2::{
SignedStatement as TableSignedStatement, Statement as TableStatement,
Summary as TableSummary,
},
Context as TableContextTrait, Table,
};
use sp_keystore::KeystorePtr;
use util::runtime::request_node_features;
mod error;
mod metrics;
use self::metrics::Metrics;
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "parachain::candidate-backing";
enum PoVData {
Ready(Arc<PoV>),
FetchFromValidator {
from_validator: ValidatorIndex,
candidate_hash: CandidateHash,
pov_hash: Hash,
},
}
enum ValidatedCandidateCommand {
Second(BackgroundValidationResult),
Attest(BackgroundValidationResult),
AttestNoPoV(CandidateHash),
}
impl std::fmt::Debug for ValidatedCandidateCommand {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let candidate_hash = self.candidate_hash();
match *self {
ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
}
}
}
impl ValidatedCandidateCommand {
fn candidate_hash(&self) -> CandidateHash {
match *self {
ValidatedCandidateCommand::Second(Ok(ref outputs)) => outputs.candidate.hash(),
ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
ValidatedCandidateCommand::Attest(Ok(ref outputs)) => outputs.candidate.hash(),
ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
}
}
}
pub struct CandidateBackingSubsystem {
keystore: KeystorePtr,
metrics: Metrics,
}
impl CandidateBackingSubsystem {
pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
Self { keystore, metrics }
}
}
#[overseer::subsystem(CandidateBacking, error = SubsystemError, prefix = self::overseer)]
impl<Context> CandidateBackingSubsystem
where
Context: Send + Sync,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = async move {
run(ctx, self.keystore, self.metrics)
.await
.map_err(|e| SubsystemError::with_origin("candidate-backing", e))
}
.boxed();
SpawnedSubsystem { name: "candidate-backing-subsystem", future }
}
}
struct PerRelayParentState {
parent: Hash,
node_features: NodeFeatures,
executor_params: Arc<ExecutorParams>,
assigned_core: Option<CoreIndex>,
backed: HashSet<CandidateHash>,
table: Table<TableContext>,
table_context: TableContext,
issued_statements: HashSet<CandidateHash>,
awaiting_validation: HashSet<CandidateHash>,
fallbacks: HashMap<CandidateHash, AttestingData>,
minimum_backing_votes: u32,
inject_core_index: bool,
n_cores: u32,
claim_queue: ClaimQueueSnapshot,
validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
group_rotation_info: GroupRotationInfo,
}
struct PerCandidateState {
persisted_validation_data: PersistedValidationData,
seconded_locally: bool,
relay_parent: Hash,
}
struct PerSessionCache {
validators_cache: LruMap<SessionIndex, Arc<Vec<ValidatorId>>>,
node_features_cache: LruMap<SessionIndex, Option<NodeFeatures>>,
executor_params_cache: LruMap<SessionIndex, Arc<ExecutorParams>>,
minimum_backing_votes_cache: LruMap<SessionIndex, u32>,
validator_to_group_cache:
LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
}
impl Default for PerSessionCache {
fn default() -> Self {
Self::new(2)
}
}
impl PerSessionCache {
fn new(capacity: u32) -> Self {
PerSessionCache {
validators_cache: LruMap::new(ByLength::new(capacity)),
node_features_cache: LruMap::new(ByLength::new(capacity)),
executor_params_cache: LruMap::new(ByLength::new(capacity)),
minimum_backing_votes_cache: LruMap::new(ByLength::new(capacity)),
validator_to_group_cache: LruMap::new(ByLength::new(capacity)),
}
}
async fn validators(
&mut self,
session_index: SessionIndex,
parent: Hash,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> Result<Arc<Vec<ValidatorId>>, RuntimeApiError> {
if let Some(validators) = self.validators_cache.get(&session_index) {
return Ok(Arc::clone(validators));
}
let validators: Vec<ValidatorId> =
request_validators(parent, sender).await.await.map_err(|err| {
RuntimeApiError::Execution { runtime_api_name: "Validators", source: Arc::new(err) }
})??;
let validators = Arc::new(validators);
self.validators_cache.insert(session_index, Arc::clone(&validators));
Ok(validators)
}
async fn node_features(
&mut self,
session_index: SessionIndex,
parent: Hash,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> Result<Option<NodeFeatures>, Error> {
if let Some(node_features) = self.node_features_cache.get(&session_index) {
return Ok(node_features.clone());
}
let node_features: Option<NodeFeatures> =
request_node_features(parent, session_index, sender).await?;
self.node_features_cache.insert(session_index, node_features.clone());
Ok(node_features)
}
async fn executor_params(
&mut self,
session_index: SessionIndex,
parent: Hash,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> Result<Arc<ExecutorParams>, RuntimeApiError> {
if let Some(executor_params) = self.executor_params_cache.get(&session_index) {
return Ok(Arc::clone(executor_params));
}
let executor_params = request_session_executor_params(parent, session_index, sender)
.await
.await
.map_err(|err| RuntimeApiError::Execution {
runtime_api_name: "SessionExecutorParams",
source: Arc::new(err),
})??
.ok_or_else(|| RuntimeApiError::Execution {
runtime_api_name: "SessionExecutorParams",
source: Arc::new(Error::MissingExecutorParams),
})?;
let executor_params = Arc::new(executor_params);
self.executor_params_cache.insert(session_index, Arc::clone(&executor_params));
Ok(executor_params)
}
async fn minimum_backing_votes(
&mut self,
session_index: SessionIndex,
parent: Hash,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
) -> Result<u32, RuntimeApiError> {
if let Some(minimum_backing_votes) = self.minimum_backing_votes_cache.get(&session_index) {
return Ok(*minimum_backing_votes);
}
let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender)
.await
.map_err(|err| RuntimeApiError::Execution {
runtime_api_name: "MinimumBackingVotes",
source: Arc::new(err),
})?;
self.minimum_backing_votes_cache.insert(session_index, minimum_backing_votes);
Ok(minimum_backing_votes)
}
fn validator_to_group(
&mut self,
session_index: SessionIndex,
validators: &[ValidatorId],
validator_groups: &[Vec<ValidatorIndex>],
) -> Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>> {
let validator_to_group = self
.validator_to_group_cache
.get_or_insert(session_index, || {
let mut vector = vec![None; validators.len()];
for (group_idx, validator_group) in validator_groups.iter().enumerate() {
for validator in validator_group {
vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
}
}
Arc::new(IndexedVec::<_, _>::from(vector))
})
.expect("Just inserted");
Arc::clone(validator_to_group)
}
}
struct State {
implicit_view: ImplicitView,
per_relay_parent: HashMap<Hash, PerRelayParentState>,
per_candidate: HashMap<CandidateHash, PerCandidateState>,
per_session_cache: PerSessionCache,
background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
keystore: KeystorePtr,
}
impl State {
fn new(
background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
keystore: KeystorePtr,
) -> Self {
State {
implicit_view: ImplicitView::default(),
per_relay_parent: HashMap::default(),
per_candidate: HashMap::new(),
per_session_cache: PerSessionCache::default(),
background_validation_tx,
keystore,
}
}
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn run<Context>(
mut ctx: Context,
keystore: KeystorePtr,
metrics: Metrics,
) -> FatalResult<()> {
let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
let mut state = State::new(background_validation_tx, keystore);
loop {
let res =
run_iteration(&mut ctx, &mut state, &metrics, &mut background_validation_rx).await;
match res {
Ok(()) => break,
Err(e) => crate::error::log_error(Err(e))?,
}
}
Ok(())
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn run_iteration<Context>(
ctx: &mut Context,
state: &mut State,
metrics: &Metrics,
background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
) -> Result<(), Error> {
loop {
futures::select!(
validated_command = background_validation_rx.next().fuse() => {
if let Some((relay_parent, command)) = validated_command {
handle_validated_candidate_command(
&mut *ctx,
state,
relay_parent,
command,
metrics,
).await?;
} else {
panic!("background_validation_tx always alive at this point; qed");
}
}
from_overseer = ctx.recv().fuse() => {
match from_overseer.map_err(Error::OverseerExited)? {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
handle_active_leaves_update(
&mut *ctx,
update,
state,
).await?;
}
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Communication { msg } => {
handle_communication(&mut *ctx, state, msg, metrics).await?;
}
}
}
)
}
}
#[derive(Clone)]
struct AttestingData {
candidate: CandidateReceipt,
pov_hash: Hash,
from_validator: ValidatorIndex,
backing: Vec<ValidatorIndex>,
}
#[derive(Default, Debug)]
struct TableContext {
validator: Option<Validator>,
groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
validators: Vec<ValidatorId>,
disabled_validators: Vec<ValidatorIndex>,
}
impl TableContext {
pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
self.disabled_validators
.iter()
.any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
}
pub fn local_validator_is_disabled(&self) -> Option<bool> {
self.validator.as_ref().map(|v| v.disabled())
}
}
impl TableContextTrait for TableContext {
type AuthorityId = ValidatorIndex;
type Digest = CandidateHash;
type GroupId = CoreIndex;
type Signature = ValidatorSignature;
type Candidate = CommittedCandidateReceipt;
fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
candidate.hash()
}
fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
}
fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
self.groups.get(group).map(|g| g.len())
}
}
fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedStatement {
let statement = match s.payload() {
StatementWithPVD::Seconded(c, _) => TableStatement::Seconded(c.clone()),
StatementWithPVD::Valid(h) => TableStatement::Valid(*h),
};
TableSignedStatement {
statement,
signature: s.signature().clone(),
sender: s.validator_index(),
}
}
fn table_attested_to_backed(
attested: TableAttestedCandidate<
CoreIndex,
CommittedCandidateReceipt,
ValidatorIndex,
ValidatorSignature,
>,
table_context: &TableContext,
inject_core_index: bool,
) -> Option<BackedCandidate> {
let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
let group = table_context.groups.get(&core_index)?;
let mut validator_indices = BitVec::with_capacity(group.len());
validator_indices.resize(group.len(), false);
let mut vote_positions = Vec::with_capacity(validity_votes.len());
for (orig_idx, id) in ids.iter().enumerate() {
if let Some(position) = group.iter().position(|x| x == id) {
validator_indices.set(position, true);
vote_positions.push((orig_idx, position));
} else {
gum::warn!(
target: LOG_TARGET,
"Logic error: Validity vote from table does not correspond to group",
);
return None
}
}
vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
Some(BackedCandidate::new(
candidate,
vote_positions
.into_iter()
.map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
.collect(),
validator_indices,
inject_core_index.then_some(core_index),
))
}
async fn store_available_data(
sender: &mut impl overseer::CandidateBackingSenderTrait,
n_validators: u32,
candidate_hash: CandidateHash,
available_data: AvailableData,
expected_erasure_root: Hash,
core_index: CoreIndex,
node_features: NodeFeatures,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
sender
.send_message(AvailabilityStoreMessage::StoreAvailableData {
candidate_hash,
n_validators,
available_data,
expected_erasure_root,
core_index,
node_features,
tx,
})
.await;
rx.await
.map_err(Error::StoreAvailableDataChannel)?
.map_err(Error::StoreAvailableData)
}
async fn make_pov_available(
sender: &mut impl overseer::CandidateBackingSenderTrait,
n_validators: usize,
pov: Arc<PoV>,
candidate_hash: CandidateHash,
validation_data: PersistedValidationData,
expected_erasure_root: Hash,
core_index: CoreIndex,
node_features: NodeFeatures,
) -> Result<(), Error> {
store_available_data(
sender,
n_validators as u32,
candidate_hash,
AvailableData { pov, validation_data },
expected_erasure_root,
core_index,
node_features,
)
.await
}
async fn request_pov(
sender: &mut impl overseer::CandidateBackingSenderTrait,
relay_parent: Hash,
from_validator: ValidatorIndex,
para_id: ParaId,
candidate_hash: CandidateHash,
pov_hash: Hash,
) -> Result<Arc<PoV>, Error> {
let (tx, rx) = oneshot::channel();
sender
.send_message(AvailabilityDistributionMessage::FetchPoV {
relay_parent,
from_validator,
para_id,
candidate_hash,
pov_hash,
tx,
})
.await;
let pov = rx.await.map_err(|_| Error::FetchPoV)?;
Ok(Arc::new(pov))
}
async fn request_candidate_validation(
sender: &mut impl overseer::CandidateBackingSenderTrait,
validation_data: PersistedValidationData,
validation_code: ValidationCode,
candidate_receipt: CandidateReceipt,
pov: Arc<PoV>,
executor_params: ExecutorParams,
) -> Result<ValidationResult, Error> {
let (tx, rx) = oneshot::channel();
let is_system = candidate_receipt.descriptor.para_id().is_system();
let relay_parent = candidate_receipt.descriptor.relay_parent();
sender
.send_message(CandidateValidationMessage::ValidateFromExhaustive {
validation_data,
validation_code,
candidate_receipt,
pov,
executor_params,
exec_kind: if is_system {
PvfExecKind::BackingSystemParas(relay_parent)
} else {
PvfExecKind::Backing(relay_parent)
},
response_sender: tx,
})
.await;
match rx.await {
Ok(Ok(validation_result)) => Ok(validation_result),
Ok(Err(err)) => Err(Error::ValidationFailed(err)),
Err(err) => Err(Error::ValidateFromExhaustive(err)),
}
}
struct BackgroundValidationOutputs {
candidate: CandidateReceipt,
commitments: CandidateCommitments,
persisted_validation_data: PersistedValidationData,
}
type BackgroundValidationResult = Result<BackgroundValidationOutputs, CandidateReceipt>;
struct BackgroundValidationParams<S: overseer::CandidateBackingSenderTrait, F> {
sender: S,
tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
candidate: CandidateReceipt,
relay_parent: Hash,
node_features: NodeFeatures,
executor_params: Arc<ExecutorParams>,
persisted_validation_data: PersistedValidationData,
pov: PoVData,
n_validators: usize,
make_command: F,
}
async fn validate_and_make_available(
params: BackgroundValidationParams<
impl overseer::CandidateBackingSenderTrait,
impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
>,
core_index: CoreIndex,
) -> Result<(), Error> {
let BackgroundValidationParams {
mut sender,
mut tx_command,
candidate,
relay_parent,
node_features,
executor_params,
persisted_validation_data,
pov,
n_validators,
make_command,
} = params;
let validation_code = {
let validation_code_hash = candidate.descriptor().validation_code_hash();
let (tx, rx) = oneshot::channel();
sender
.send_message(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
))
.await;
let code = rx.await.map_err(Error::RuntimeApiUnavailable)?;
match code {
Err(e) => return Err(Error::FetchValidationCode(validation_code_hash, e)),
Ok(None) => return Err(Error::NoValidationCode(validation_code_hash)),
Ok(Some(c)) => c,
}
};
let pov = match pov {
PoVData::Ready(pov) => pov,
PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } =>
match request_pov(
&mut sender,
relay_parent,
from_validator,
candidate.descriptor.para_id(),
candidate_hash,
pov_hash,
)
.await
{
Err(Error::FetchPoV) => {
tx_command
.send((
relay_parent,
ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
))
.await
.map_err(Error::BackgroundValidationMpsc)?;
return Ok(())
},
Err(err) => return Err(err),
Ok(pov) => pov,
},
};
let v = {
request_candidate_validation(
&mut sender,
persisted_validation_data,
validation_code,
candidate.clone(),
pov.clone(),
executor_params.as_ref().clone(),
)
.await?
};
let res = match v {
ValidationResult::Valid(commitments, validation_data) => {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate.hash(),
"Validation successful",
);
let erasure_valid = make_pov_available(
&mut sender,
n_validators,
pov.clone(),
candidate.hash(),
validation_data.clone(),
candidate.descriptor.erasure_root(),
core_index,
node_features,
)
.await;
match erasure_valid {
Ok(()) => Ok(BackgroundValidationOutputs {
candidate,
commitments,
persisted_validation_data: validation_data,
}),
Err(Error::StoreAvailableData(StoreAvailableDataError::InvalidErasureRoot)) => {
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate.hash(),
actual_commitments = ?commitments,
"Erasure root doesn't match the announced by the candidate receipt",
);
Err(candidate)
},
Err(e) => return Err(e),
}
},
ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch) => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate.hash(),
"Validation yielded different commitments",
);
Err(candidate)
},
ValidationResult::Invalid(reason) => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate.hash(),
reason = ?reason,
"Validation yielded an invalid candidate",
);
Err(candidate)
},
};
tx_command.send((relay_parent, make_command(res))).await.map_err(Into::into)
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn handle_communication<Context>(
ctx: &mut Context,
state: &mut State,
message: CandidateBackingMessage,
metrics: &Metrics,
) -> Result<(), Error> {
match message {
CandidateBackingMessage::Second(_relay_parent, candidate, pvd, pov) => {
handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
},
CandidateBackingMessage::Statement(relay_parent, statement) => {
handle_statement_message(ctx, state, relay_parent, statement, metrics).await?;
},
CandidateBackingMessage::GetBackableCandidates(requested_candidates, tx) =>
handle_get_backable_candidates_message(state, requested_candidates, tx, metrics)?,
CandidateBackingMessage::CanSecond(request, tx) =>
handle_can_second_request(ctx, state, request, tx).await,
}
Ok(())
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn handle_active_leaves_update<Context>(
ctx: &mut Context,
update: ActiveLeavesUpdate,
state: &mut State,
) -> Result<(), Error> {
let res = if let Some(leaf) = update.activated {
let leaf_hash = leaf.hash;
Some((leaf, state.implicit_view.activate_leaf(ctx.sender(), leaf_hash).await.map(|_| ())))
} else {
None
};
for deactivated in update.deactivated {
state.implicit_view.deactivate_leaf(deactivated);
}
{
let remaining: HashSet<_> = state.implicit_view.all_allowed_relay_parents().collect();
state.per_relay_parent.retain(|r, _| remaining.contains(&r));
}
state
.per_candidate
.retain(|_, pc| state.per_relay_parent.contains_key(&pc.relay_parent));
let fresh_relay_parents = match res {
None => return Ok(()),
Some((leaf, Ok(_))) => {
let fresh_relay_parents =
state.implicit_view.known_allowed_relay_parents_under(&leaf.hash, None);
let fresh_relay_parent = match fresh_relay_parents {
Some(f) => f.to_vec(),
None => {
gum::warn!(
target: LOG_TARGET,
leaf_hash = ?leaf.hash,
"Implicit view gave no relay-parents"
);
vec![leaf.hash]
},
};
fresh_relay_parent
},
Some((leaf, Err(e))) => {
gum::debug!(
target: LOG_TARGET,
leaf_hash = ?leaf.hash,
err = ?e,
"Failed to load implicit view for leaf."
);
return Ok(())
},
};
for maybe_new in fresh_relay_parents {
if state.per_relay_parent.contains_key(&maybe_new) {
continue
}
let per = construct_per_relay_parent_state(
ctx,
maybe_new,
&state.keystore,
&mut state.per_session_cache,
)
.await?;
if let Some(per) = per {
state.per_relay_parent.insert(maybe_new, per);
}
}
Ok(())
}
macro_rules! try_runtime_api {
($x: expr) => {
match $x {
Ok(x) => x,
Err(err) => {
error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
return Ok(None)
},
}
};
}
fn core_index_from_statement(
validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
group_rotation_info: &GroupRotationInfo,
n_cores: u32,
claim_queue: &ClaimQueueSnapshot,
statement: &SignedFullStatementWithPVD,
) -> Option<CoreIndex> {
let compact_statement = statement.as_unchecked();
let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
gum::trace!(
target:LOG_TARGET,
?group_rotation_info,
?statement,
?validator_to_group,
n_cores,
?candidate_hash,
"Extracting core index from statement"
);
let statement_validator_index = statement.validator_index();
let Some(Some(group_index)) = validator_to_group.get(statement_validator_index) else {
gum::debug!(
target: LOG_TARGET,
?group_rotation_info,
?statement,
?validator_to_group,
n_cores,
?candidate_hash,
"Invalid validator index: {:?}",
statement_validator_index
);
return None
};
let core_index = group_rotation_info.core_for_group(*group_index, n_cores as _);
if core_index.0 > n_cores {
gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
return None
}
if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
let candidate_para_id = candidate.descriptor.para_id();
let mut assigned_paras = claim_queue.iter_claims_for_core(&core_index);
if !assigned_paras.any(|id| id == &candidate_para_id) {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?core_index,
assigned_paras = ?claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
?candidate_para_id,
"Invalid CoreIndex, core is not assigned to this para_id"
);
return None
}
return Some(core_index)
} else {
return Some(core_index)
}
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn construct_per_relay_parent_state<Context>(
ctx: &mut Context,
relay_parent: Hash,
keystore: &KeystorePtr,
per_session_cache: &mut PerSessionCache,
) -> Result<Option<PerRelayParentState>, Error> {
let parent = relay_parent;
let (session_index, groups, claim_queue, disabled_validators) = futures::try_join!(
request_session_index_for_child(parent, ctx.sender()).await,
request_validator_groups(parent, ctx.sender()).await,
request_claim_queue(parent, ctx.sender()).await,
request_disabled_validators(parent, ctx.sender()).await,
)
.map_err(Error::JoinMultiple)?;
let session_index = try_runtime_api!(session_index);
let validators = per_session_cache.validators(session_index, parent, ctx.sender()).await;
let validators = try_runtime_api!(validators);
let node_features = per_session_cache
.node_features(session_index, parent, ctx.sender())
.await?
.unwrap_or(NodeFeatures::EMPTY);
let inject_core_index = node_features
.get(FeatureIndex::ElasticScalingMVP as usize)
.map(|b| *b)
.unwrap_or(false);
let executor_params =
per_session_cache.executor_params(session_index, parent, ctx.sender()).await;
let executor_params = try_runtime_api!(executor_params);
gum::debug!(target: LOG_TARGET, inject_core_index, ?parent, "New state");
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
let minimum_backing_votes = per_session_cache
.minimum_backing_votes(session_index, parent, ctx.sender())
.await;
let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);
let claim_queue = try_runtime_api!(claim_queue);
let disabled_validators = try_runtime_api!(disabled_validators);
let signing_context = SigningContext { parent_hash: parent, session_index };
let validator = match Validator::construct(
&validators,
&disabled_validators,
signing_context.clone(),
keystore.clone(),
) {
Ok(v) => Some(v),
Err(util::Error::NotAValidator) => None,
Err(e) => {
gum::warn!(
target: LOG_TARGET,
err = ?e,
"Cannot participate in candidate backing",
);
return Ok(None)
},
};
let n_cores = validator_groups.len();
let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
let mut assigned_core = None;
for idx in 0..n_cores {
let core_index = CoreIndex(idx as _);
if !claim_queue.contains_key(&core_index) {
continue
}
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
if let Some(g) = validator_groups.get(group_index.0 as usize) {
if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
assigned_core = Some(core_index);
}
groups.insert(core_index, g.clone());
}
}
gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
let validator_to_group =
per_session_cache.validator_to_group(session_index, &validators, &validator_groups);
let table_context =
TableContext { validator, groups, validators: validators.to_vec(), disabled_validators };
Ok(Some(PerRelayParentState {
parent,
node_features,
executor_params,
assigned_core,
backed: HashSet::new(),
table: Table::new(),
table_context,
issued_statements: HashSet::new(),
awaiting_validation: HashSet::new(),
fallbacks: HashMap::new(),
minimum_backing_votes,
inject_core_index,
n_cores: validator_groups.len() as u32,
claim_queue: ClaimQueueSnapshot::from(claim_queue),
validator_to_group,
group_rotation_info,
}))
}
enum SecondingAllowed {
No,
Yes(Vec<Hash>),
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn seconding_sanity_check<Context>(
ctx: &mut Context,
implicit_view: &ImplicitView,
hypothetical_candidate: HypotheticalCandidate,
) -> SecondingAllowed {
let mut leaves_for_seconding = Vec::new();
let mut responses = FuturesOrdered::<BoxFuture<'_, Result<_, oneshot::Canceled>>>::new();
let candidate_para = hypothetical_candidate.candidate_para();
let candidate_relay_parent = hypothetical_candidate.relay_parent();
let candidate_hash = hypothetical_candidate.candidate_hash();
for head in implicit_view.leaves() {
let allowed_parents_for_para =
implicit_view.known_allowed_relay_parents_under(head, Some(candidate_para));
if !allowed_parents_for_para.unwrap_or_default().contains(&candidate_relay_parent) {
continue
}
let (tx, rx) = oneshot::channel();
ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
HypotheticalMembershipRequest {
candidates: vec![hypothetical_candidate.clone()],
fragment_chain_relay_parent: Some(*head),
},
tx,
))
.await;
let response = rx.map_ok(move |candidate_memberships| {
let is_member_or_potential = candidate_memberships
.into_iter()
.find_map(|(candidate, leaves)| {
(candidate.candidate_hash() == candidate_hash).then_some(leaves)
})
.and_then(|leaves| leaves.into_iter().find(|leaf| leaf == head))
.is_some();
(is_member_or_potential, head)
});
responses.push_back(response.boxed());
}
if responses.is_empty() {
return SecondingAllowed::No
}
while let Some(response) = responses.next().await {
match response {
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Failed to reach prospective parachains subsystem for hypothetical membership",
);
return SecondingAllowed::No
},
Ok((is_member_or_potential, head)) => match is_member_or_potential {
false => {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
leaf_hash = ?head,
"Refusing to second candidate at leaf. Is not a potential member.",
);
},
true => {
leaves_for_seconding.push(*head);
},
},
}
}
if leaves_for_seconding.is_empty() {
SecondingAllowed::No
} else {
SecondingAllowed::Yes(leaves_for_seconding)
}
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn handle_can_second_request<Context>(
ctx: &mut Context,
state: &State,
request: CanSecondRequest,
tx: oneshot::Sender<bool>,
) {
let relay_parent = request.candidate_relay_parent;
let response = if state.per_relay_parent.get(&relay_parent).is_some() {
let hypothetical_candidate = HypotheticalCandidate::Incomplete {
candidate_hash: request.candidate_hash,
candidate_para: request.candidate_para_id,
parent_head_data_hash: request.parent_head_data_hash,
candidate_relay_parent: relay_parent,
};
let result =
seconding_sanity_check(ctx, &state.implicit_view, hypothetical_candidate).await;
match result {
SecondingAllowed::No => false,
SecondingAllowed::Yes(leaves) => !leaves.is_empty(),
}
} else {
false
};
let _ = tx.send(response);
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn handle_validated_candidate_command<Context>(
ctx: &mut Context,
state: &mut State,
relay_parent: Hash,
command: ValidatedCandidateCommand,
metrics: &Metrics,
) -> Result<(), Error> {
match state.per_relay_parent.get_mut(&relay_parent) {
Some(rp_state) => {
let candidate_hash = command.candidate_hash();
rp_state.awaiting_validation.remove(&candidate_hash);
match command {
ValidatedCandidateCommand::Second(res) => match res {
Ok(outputs) => {
let BackgroundValidationOutputs {
candidate,
commitments,
persisted_validation_data,
} = outputs;
if rp_state.issued_statements.contains(&candidate_hash) {
return Ok(())
}
let receipt = CommittedCandidateReceipt {
descriptor: candidate.descriptor.clone(),
commitments,
};
let hypothetical_candidate = HypotheticalCandidate::Complete {
candidate_hash,
receipt: Arc::new(receipt.clone()),
persisted_validation_data: persisted_validation_data.clone(),
};
if let SecondingAllowed::No = seconding_sanity_check(
ctx,
&state.implicit_view,
hypothetical_candidate,
)
.await
{
return Ok(())
};
let statement =
StatementWithPVD::Seconded(receipt, persisted_validation_data);
let res = sign_import_and_distribute_statement(
ctx,
rp_state,
&mut state.per_candidate,
statement,
state.keystore.clone(),
metrics,
)
.await;
if let Err(Error::RejectedByProspectiveParachains) = res {
let candidate_hash = candidate.hash();
gum::debug!(
target: LOG_TARGET,
relay_parent = ?candidate.descriptor().relay_parent(),
?candidate_hash,
"Attempted to second candidate but was rejected by prospective parachains",
);
ctx.send_message(CollatorProtocolMessage::Invalid(
candidate.descriptor().relay_parent(),
candidate,
))
.await;
return Ok(())
}
if let Some(stmt) = res? {
match state.per_candidate.get_mut(&candidate_hash) {
None => {
gum::warn!(
target: LOG_TARGET,
?candidate_hash,
"Missing `per_candidate` for seconded candidate.",
);
},
Some(p) => p.seconded_locally = true,
}
rp_state.issued_statements.insert(candidate_hash);
metrics.on_candidate_seconded();
ctx.send_message(CollatorProtocolMessage::Seconded(
rp_state.parent,
StatementWithPVD::drop_pvd_from_signed(stmt),
))
.await;
}
},
Err(candidate) => {
ctx.send_message(CollatorProtocolMessage::Invalid(
rp_state.parent,
candidate,
))
.await;
},
},
ValidatedCandidateCommand::Attest(res) => {
rp_state.fallbacks.remove(&candidate_hash);
if !rp_state.issued_statements.contains(&candidate_hash) {
if res.is_ok() {
let statement = StatementWithPVD::Valid(candidate_hash);
sign_import_and_distribute_statement(
ctx,
rp_state,
&mut state.per_candidate,
statement,
state.keystore.clone(),
metrics,
)
.await?;
}
rp_state.issued_statements.insert(candidate_hash);
}
},
ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
if let Some(attesting) = rp_state.fallbacks.get_mut(&candidate_hash) {
if let Some(index) = attesting.backing.pop() {
attesting.from_validator = index;
let attesting = attesting.clone();
if let Some(pvd) = state
.per_candidate
.get(&candidate_hash)
.map(|pc| pc.persisted_validation_data.clone())
{
kick_off_validation_work(
ctx,
rp_state,
pvd,
&state.background_validation_tx,
attesting,
)
.await?;
}
}
} else {
gum::warn!(
target: LOG_TARGET,
"AttestNoPoV was triggered without fallback being available."
);
debug_assert!(false);
}
},
}
},
None => {
},
}
Ok(())
}
fn sign_statement(
rp_state: &PerRelayParentState,
statement: StatementWithPVD,
keystore: KeystorePtr,
metrics: &Metrics,
) -> Option<SignedFullStatementWithPVD> {
let signed = rp_state
.table_context
.validator
.as_ref()?
.sign(keystore, statement)
.ok()
.flatten()?;
metrics.on_statement_signed();
Some(signed)
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn import_statement<Context>(
ctx: &mut Context,
rp_state: &mut PerRelayParentState,
per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
statement: &SignedFullStatementWithPVD,
) -> Result<Option<TableSummary>, Error> {
let candidate_hash = statement.payload().candidate_hash();
gum::debug!(
target: LOG_TARGET,
statement = ?statement.payload().to_compact(),
validator_index = statement.validator_index().0,
?candidate_hash,
"Importing statement",
);
if let StatementWithPVD::Seconded(candidate, pvd) = statement.payload() {
if !per_candidate.contains_key(&candidate_hash) {
let (tx, rx) = oneshot::channel();
ctx.send_message(ProspectiveParachainsMessage::IntroduceSecondedCandidate(
IntroduceSecondedCandidateRequest {
candidate_para: candidate.descriptor.para_id(),
candidate_receipt: candidate.clone(),
persisted_validation_data: pvd.clone(),
},
tx,
))
.await;
match rx.await {
Err(oneshot::Canceled) => {
gum::warn!(
target: LOG_TARGET,
"Could not reach the Prospective Parachains subsystem."
);
return Err(Error::RejectedByProspectiveParachains)
},
Ok(false) => return Err(Error::RejectedByProspectiveParachains),
Ok(true) => {},
}
per_candidate.insert(
candidate_hash,
PerCandidateState {
persisted_validation_data: pvd.clone(),
seconded_locally: false,
relay_parent: candidate.descriptor.relay_parent(),
},
);
}
}
let stmt = primitive_statement_to_table(statement);
let core = core_index_from_statement(
&rp_state.validator_to_group,
&rp_state.group_rotation_info,
rp_state.n_cores,
&rp_state.claim_queue,
statement,
)
.ok_or(Error::CoreIndexUnavailable)?;
Ok(rp_state.table.import_statement(&rp_state.table_context, core, stmt))
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn post_import_statement_actions<Context>(
ctx: &mut Context,
rp_state: &mut PerRelayParentState,
summary: Option<&TableSummary>,
) {
if let Some(attested) = summary.as_ref().and_then(|s| {
rp_state.table.attested_candidate(
&s.candidate,
&rp_state.table_context,
rp_state.minimum_backing_votes,
)
}) {
let candidate_hash = attested.candidate.hash();
if rp_state.backed.insert(candidate_hash) {
if let Some(backed) = table_attested_to_backed(
attested,
&rp_state.table_context,
rp_state.inject_core_index,
) {
let para_id = backed.candidate().descriptor.para_id();
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
relay_parent = ?rp_state.parent,
%para_id,
"Candidate backed",
);
ctx.send_message(ProspectiveParachainsMessage::CandidateBacked(
para_id,
candidate_hash,
))
.await;
ctx.send_message(StatementDistributionMessage::Backed(candidate_hash)).await;
} else {
gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
}
} else {
gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
}
} else {
gum::debug!(target: LOG_TARGET, "No attested candidate");
}
issue_new_misbehaviors(ctx, rp_state.parent, &mut rp_state.table);
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
fn issue_new_misbehaviors<Context>(
ctx: &mut Context,
relay_parent: Hash,
table: &mut Table<TableContext>,
) {
let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
for (validator_id, report) in misbehaviors {
ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
relay_parent,
ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
));
}
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn sign_import_and_distribute_statement<Context>(
ctx: &mut Context,
rp_state: &mut PerRelayParentState,
per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
statement: StatementWithPVD,
keystore: KeystorePtr,
metrics: &Metrics,
) -> Result<Option<SignedFullStatementWithPVD>, Error> {
if let Some(signed_statement) = sign_statement(&*rp_state, statement, keystore, metrics) {
let summary = import_statement(ctx, rp_state, per_candidate, &signed_statement).await?;
let smsg = StatementDistributionMessage::Share(rp_state.parent, signed_statement.clone());
ctx.send_unbounded_message(smsg);
post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
Ok(Some(signed_statement))
} else {
Ok(None)
}
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn background_validate_and_make_available<Context>(
ctx: &mut Context,
rp_state: &mut PerRelayParentState,
params: BackgroundValidationParams<
impl overseer::CandidateBackingSenderTrait,
impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
>,
) -> Result<(), Error> {
let candidate_hash = params.candidate.hash();
let Some(core_index) = rp_state.assigned_core else { return Ok(()) };
if rp_state.awaiting_validation.insert(candidate_hash) {
let bg = async move {
if let Err(error) = validate_and_make_available(params, core_index).await {
if let Error::BackgroundValidationMpsc(error) = error {
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?error,
"Mpsc background validation mpsc died during validation- leaf no longer active?"
);
} else {
gum::error!(
target: LOG_TARGET,
?candidate_hash,
?error,
"Failed to validate and make available",
);
}
}
};
ctx.spawn("backing-validation", bg.boxed())
.map_err(|_| Error::FailedToSpawnBackgroundTask)?;
}
Ok(())
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn kick_off_validation_work<Context>(
ctx: &mut Context,
rp_state: &mut PerRelayParentState,
persisted_validation_data: PersistedValidationData,
background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
attesting: AttestingData,
) -> Result<(), Error> {
match rp_state.table_context.local_validator_is_disabled() {
Some(true) => {
gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
return Ok(())
},
Some(false) => {}, None => {
gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
return Ok(())
},
}
let candidate_hash = attesting.candidate.hash();
if rp_state.issued_statements.contains(&candidate_hash) {
return Ok(())
}
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
candidate_receipt = ?attesting.candidate,
"Kicking off validation",
);
let bg_sender = ctx.sender().clone();
let pov = PoVData::FetchFromValidator {
from_validator: attesting.from_validator,
candidate_hash,
pov_hash: attesting.pov_hash,
};
background_validate_and_make_available(
ctx,
rp_state,
BackgroundValidationParams {
sender: bg_sender,
tx_command: background_validation_tx.clone(),
candidate: attesting.candidate,
relay_parent: rp_state.parent,
node_features: rp_state.node_features.clone(),
executor_params: Arc::clone(&rp_state.executor_params),
persisted_validation_data,
pov,
n_validators: rp_state.table_context.validators.len(),
make_command: ValidatedCandidateCommand::Attest,
},
)
.await
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn maybe_validate_and_import<Context>(
ctx: &mut Context,
state: &mut State,
relay_parent: Hash,
statement: SignedFullStatementWithPVD,
) -> Result<(), Error> {
let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
Some(r) => r,
None => {
gum::trace!(
target: LOG_TARGET,
?relay_parent,
"Received statement for unknown relay-parent"
);
return Ok(())
},
};
if rp_state.table_context.validator_is_disabled(&statement.validator_index()) {
gum::debug!(
target: LOG_TARGET,
sender_validator_idx = ?statement.validator_index(),
"Not importing statement because the sender is disabled"
);
return Ok(())
}
let res = import_statement(ctx, rp_state, &mut state.per_candidate, &statement).await;
if let Err(Error::RejectedByProspectiveParachains) = res {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
"Statement rejected by prospective parachains."
);
return Ok(())
}
let summary = res?;
post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
if let Some(summary) = summary {
let candidate_hash = summary.candidate;
if Some(summary.group_id) != rp_state.assigned_core {
return Ok(())
}
let attesting = match statement.payload() {
StatementWithPVD::Seconded(receipt, _) => {
let attesting = AttestingData {
candidate: rp_state
.table
.get_candidate(&candidate_hash)
.ok_or(Error::CandidateNotFound)?
.to_plain(),
pov_hash: receipt.descriptor.pov_hash(),
from_validator: statement.validator_index(),
backing: Vec::new(),
};
rp_state.fallbacks.insert(summary.candidate, attesting.clone());
attesting
},
StatementWithPVD::Valid(candidate_hash) => {
if let Some(attesting) = rp_state.fallbacks.get_mut(candidate_hash) {
let our_index = rp_state.table_context.validator.as_ref().map(|v| v.index());
if our_index == Some(statement.validator_index()) {
return Ok(())
}
if rp_state.awaiting_validation.contains(candidate_hash) {
attesting.backing.push(statement.validator_index());
return Ok(())
} else {
attesting.from_validator = statement.validator_index();
attesting.clone()
}
} else {
return Ok(())
}
},
};
if let Some(pvd) = state
.per_candidate
.get(&candidate_hash)
.map(|pc| pc.persisted_validation_data.clone())
{
kick_off_validation_work(
ctx,
rp_state,
pvd,
&state.background_validation_tx,
attesting,
)
.await?;
}
}
Ok(())
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn validate_and_second<Context>(
ctx: &mut Context,
rp_state: &mut PerRelayParentState,
persisted_validation_data: PersistedValidationData,
candidate: &CandidateReceipt,
pov: Arc<PoV>,
background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
) -> Result<(), Error> {
let candidate_hash = candidate.hash();
gum::debug!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
candidate_receipt = ?candidate,
"Validate and second candidate",
);
let bg_sender = ctx.sender().clone();
background_validate_and_make_available(
ctx,
rp_state,
BackgroundValidationParams {
sender: bg_sender,
tx_command: background_validation_tx.clone(),
candidate: candidate.clone(),
relay_parent: rp_state.parent,
node_features: rp_state.node_features.clone(),
executor_params: Arc::clone(&rp_state.executor_params),
persisted_validation_data,
pov: PoVData::Ready(pov),
n_validators: rp_state.table_context.validators.len(),
make_command: ValidatedCandidateCommand::Second,
},
)
.await?;
Ok(())
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn handle_second_message<Context>(
ctx: &mut Context,
state: &mut State,
candidate: CandidateReceipt,
persisted_validation_data: PersistedValidationData,
pov: PoV,
metrics: &Metrics,
) -> Result<(), Error> {
let _timer = metrics.time_process_second();
let candidate_hash = candidate.hash();
let relay_parent = candidate.descriptor().relay_parent();
if candidate.descriptor().persisted_validation_data_hash() != persisted_validation_data.hash() {
gum::warn!(
target: LOG_TARGET,
?candidate_hash,
"Candidate backing was asked to second candidate with wrong PVD",
);
return Ok(())
}
let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
None => {
gum::trace!(
target: LOG_TARGET,
?relay_parent,
?candidate_hash,
"We were asked to second a candidate outside of our view."
);
return Ok(())
},
Some(r) => r,
};
if rp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
return Ok(())
}
let assigned_paras = rp_state.assigned_core.and_then(|core| rp_state.claim_queue.0.get(&core));
if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id())) {
gum::debug!(
target: LOG_TARGET,
our_assignment_core = ?rp_state.assigned_core,
our_assignment_paras = ?assigned_paras,
collation = ?candidate.descriptor().para_id(),
"Subsystem asked to second for para outside of our assignment",
);
return Ok(());
}
gum::debug!(
target: LOG_TARGET,
our_assignment_core = ?rp_state.assigned_core,
our_assignment_paras = ?assigned_paras,
collation = ?candidate.descriptor().para_id(),
"Current assignments vs collation",
);
if !rp_state.issued_statements.contains(&candidate_hash) {
let pov = Arc::new(pov);
validate_and_second(
ctx,
rp_state,
persisted_validation_data,
&candidate,
pov,
&state.background_validation_tx,
)
.await?;
}
Ok(())
}
#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
async fn handle_statement_message<Context>(
ctx: &mut Context,
state: &mut State,
relay_parent: Hash,
statement: SignedFullStatementWithPVD,
metrics: &Metrics,
) -> Result<(), Error> {
let _timer = metrics.time_process_statement();
match maybe_validate_and_import(ctx, state, relay_parent, statement).await {
Err(Error::ValidationFailed(_)) => Ok(()),
Err(e) => Err(e),
Ok(()) => Ok(()),
}
}
fn handle_get_backable_candidates_message(
state: &State,
requested_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>>,
tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
metrics: &Metrics,
) -> Result<(), Error> {
let _timer = metrics.time_get_backed_candidates();
let mut backed = HashMap::with_capacity(requested_candidates.len());
for (para_id, para_candidates) in requested_candidates {
for (candidate_hash, relay_parent) in para_candidates.iter() {
let rp_state = match state.per_relay_parent.get(&relay_parent) {
Some(rp_state) => rp_state,
None => {
gum::debug!(
target: LOG_TARGET,
?relay_parent,
?candidate_hash,
"Requested candidate's relay parent is out of view",
);
break
},
};
let maybe_backed_candidate = rp_state
.table
.attested_candidate(
candidate_hash,
&rp_state.table_context,
rp_state.minimum_backing_votes,
)
.and_then(|attested| {
table_attested_to_backed(
attested,
&rp_state.table_context,
rp_state.inject_core_index,
)
});
if let Some(backed_candidate) = maybe_backed_candidate {
backed
.entry(para_id)
.or_insert_with(|| Vec::with_capacity(para_candidates.len()))
.push(backed_candidate);
} else {
break
}
}
}
tx.send(backed).map_err(|data| Error::Send(data))?;
Ok(())
}