use std::sync::Arc;
use error::FatalError;
use futures::FutureExt;
use gum::CandidateHash;
use sc_keystore::LocalKeystore;
use polkadot_node_primitives::{
CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement,
DISPUTE_WINDOW,
};
use polkadot_node_subsystem::{
messages::DisputeDistributionMessage, overseer, ActivatedLeaf, FromOrchestra, OverseerSignal,
SpawnedSubsystem, SubsystemError,
};
use polkadot_node_subsystem_util::{
database::Database,
runtime::{Config as RuntimeInfoConfig, RuntimeInfo},
};
use polkadot_primitives::{
vstaging::ScrapedOnChainVotes, DisputeStatement, SessionIndex, SessionInfo, ValidatorIndex,
};
use crate::{
error::{FatalResult, Result},
metrics::Metrics,
status::{get_active_with_status, SystemClock},
};
use backend::{Backend, OverlayedBackend};
use db::v1::DbBackend;
use fatality::Split;
use self::{
import::{CandidateEnvironment, CandidateVoteState},
participation::{ParticipationPriority, ParticipationRequest},
spam_slots::{SpamSlots, UnconfirmedDisputes},
};
pub(crate) mod backend;
pub(crate) mod db;
pub(crate) mod error;
mod initialized;
use initialized::{InitialData, Initialized};
mod scraping;
use scraping::ChainScraper;
mod spam_slots;
pub(crate) mod participation;
pub(crate) mod import;
mod metrics;
mod status;
use crate::status::Clock;
#[cfg(test)]
mod tests;
pub(crate) const LOG_TARGET: &str = "parachain::dispute-coordinator";
pub struct DisputeCoordinatorSubsystem {
config: Config,
store: Arc<dyn Database>,
keystore: Arc<LocalKeystore>,
metrics: Metrics,
approval_voting_parallel_enabled: bool,
}
#[derive(Debug, Clone, Copy)]
pub struct Config {
pub col_dispute_data: u32,
}
impl Config {
fn column_config(&self) -> db::v1::ColumnConfiguration {
db::v1::ColumnConfiguration { col_dispute_data: self.col_dispute_data }
}
}
#[overseer::subsystem(DisputeCoordinator, error=SubsystemError, prefix=self::overseer)]
impl<Context: Send> DisputeCoordinatorSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = async {
let backend = DbBackend::new(
self.store.clone(),
self.config.column_config(),
self.metrics.clone(),
);
self.run(ctx, backend, Box::new(SystemClock))
.await
.map_err(|e| SubsystemError::with_origin("dispute-coordinator", e))
}
.boxed();
SpawnedSubsystem { name: "dispute-coordinator-subsystem", future }
}
}
#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
impl DisputeCoordinatorSubsystem {
pub fn new(
store: Arc<dyn Database>,
config: Config,
keystore: Arc<LocalKeystore>,
metrics: Metrics,
approval_voting_parallel_enabled: bool,
) -> Self {
Self { store, config, keystore, metrics, approval_voting_parallel_enabled }
}
async fn run<B, Context>(
self,
mut ctx: Context,
backend: B,
clock: Box<dyn Clock>,
) -> FatalResult<()>
where
B: Backend + 'static,
{
let res = self.initialize(&mut ctx, backend, &*clock).await?;
let (participations, votes, first_leaf, initialized, backend) = match res {
None => return Ok(()),
Some(r) => r,
};
initialized
.run(ctx, backend, Some(InitialData { participations, votes, leaf: first_leaf }), clock)
.await
}
async fn initialize<B, Context>(
self,
ctx: &mut Context,
mut backend: B,
clock: &(dyn Clock),
) -> FatalResult<
Option<(
Vec<(ParticipationPriority, ParticipationRequest)>,
Vec<ScrapedOnChainVotes>,
ActivatedLeaf,
Initialized,
B,
)>,
>
where
B: Backend + 'static,
{
loop {
let first_leaf = match wait_for_first_leaf(ctx).await {
Ok(Some(activated_leaf)) => activated_leaf,
Ok(None) => continue,
Err(e) => {
e.split()?.log();
continue
},
};
let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig {
keystore: None,
session_cache_lru_size: DISPUTE_WINDOW.get(),
});
let mut overlay_db = OverlayedBackend::new(&mut backend);
let (
participations,
votes,
spam_slots,
ordering_provider,
highest_session_seen,
gaps_in_cache,
) = match self
.handle_startup(ctx, first_leaf.clone(), &mut runtime_info, &mut overlay_db, clock)
.await
{
Ok(v) => v,
Err(e) => {
e.split()?.log();
continue
},
};
if !overlay_db.is_empty() {
let ops = overlay_db.into_write_ops();
backend.write(ops)?;
}
return Ok(Some((
participations,
votes,
first_leaf,
Initialized::new(
self,
runtime_info,
spam_slots,
ordering_provider,
highest_session_seen,
gaps_in_cache,
),
backend,
)))
}
}
async fn handle_startup<Context>(
&self,
ctx: &mut Context,
initial_head: ActivatedLeaf,
runtime_info: &mut RuntimeInfo,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
clock: &dyn Clock,
) -> Result<(
Vec<(ParticipationPriority, ParticipationRequest)>,
Vec<ScrapedOnChainVotes>,
SpamSlots,
ChainScraper,
SessionIndex,
bool,
)> {
let now = clock.now();
let active_disputes = match overlay_db.load_recent_disputes() {
Ok(disputes) => disputes
.map(|disputes| get_active_with_status(disputes.into_iter(), now))
.into_iter()
.flatten(),
Err(e) => {
gum::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e);
return Err(e.into())
},
};
let highest_session = runtime_info
.get_session_index_for_child(ctx.sender(), initial_head.hash)
.await?;
let mut gap_in_cache = false;
for idx in highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1)..=highest_session {
if let Err(e) = runtime_info
.get_session_info_by_index(ctx.sender(), initial_head.hash, idx)
.await
{
gum::debug!(
target: LOG_TARGET,
leaf_hash = ?initial_head.hash,
session_idx = idx,
err = ?e,
"Can't cache SessionInfo during subsystem initialization. Skipping session."
);
gap_in_cache = true;
continue
};
}
db::v1::note_earliest_session(
overlay_db,
highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1),
)?;
let mut participation_requests = Vec::new();
let mut spam_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new();
let leaf_hash = initial_head.hash;
let (scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?;
for ((session, ref candidate_hash), _) in active_disputes {
let env = match CandidateEnvironment::new(
&self.keystore,
ctx,
runtime_info,
highest_session,
leaf_hash,
std::iter::empty(),
)
.await
{
None => {
gum::warn!(
target: LOG_TARGET,
session,
"We are lacking a `SessionInfo` for handling db votes on startup."
);
continue
},
Some(env) => env,
};
let votes: CandidateVotes =
match overlay_db.load_candidate_votes(session, candidate_hash) {
Ok(Some(votes)) => votes.into(),
Ok(None) => continue,
Err(e) => {
gum::error!(
target: LOG_TARGET,
"Failed initial load of candidate votes: {:?}",
e
);
continue
},
};
let vote_state = CandidateVoteState::new(votes, &env, now);
let is_disabled = |v: &ValidatorIndex| env.disabled_indices().contains(v);
let potential_spam =
is_potential_spam(&scraper, &vote_state, candidate_hash, is_disabled);
let is_included =
scraper.is_candidate_included(&vote_state.votes().candidate_receipt.hash());
if potential_spam {
gum::trace!(
target: LOG_TARGET,
?session,
?candidate_hash,
"Found potential spam dispute on startup"
);
spam_disputes
.insert((session, *candidate_hash), vote_state.votes().voted_indices());
} else {
if vote_state.own_vote_missing() {
gum::trace!(
target: LOG_TARGET,
?session,
?candidate_hash,
"Found valid dispute, with no vote from us on startup - participating."
);
let request_timer = self.metrics.time_participation_pipeline();
participation_requests.push((
ParticipationPriority::with_priority_if(is_included),
ParticipationRequest::new(
vote_state.votes().candidate_receipt.clone(),
session,
env.executor_params().clone(),
request_timer,
),
));
}
else {
gum::trace!(
target: LOG_TARGET,
?session,
?candidate_hash,
"Found valid dispute, with vote from us on startup - send vote."
);
send_dispute_messages(ctx, &env, &vote_state).await;
}
}
}
Ok((
participation_requests,
votes,
SpamSlots::recover_from_state(spam_disputes),
scraper,
highest_session,
gap_in_cache,
))
}
}
#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
async fn wait_for_first_leaf<Context>(ctx: &mut Context) -> Result<Option<ActivatedLeaf>> {
loop {
match ctx.recv().await.map_err(FatalError::SubsystemReceive)? {
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(None),
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
if let Some(activated) = update.activated {
return Ok(Some(activated))
}
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
FromOrchestra::Communication { msg } =>
{
gum::warn!(
target: LOG_TARGET,
?msg,
"Received msg before first active leaves update. This is not expected - message will be dropped."
)
},
}
}
}
pub fn is_potential_spam(
scraper: &ChainScraper,
vote_state: &CandidateVoteState<CandidateVotes>,
candidate_hash: &CandidateHash,
is_disabled: impl FnMut(&ValidatorIndex) -> bool,
) -> bool {
let is_disputed = vote_state.is_disputed();
let is_included = scraper.is_candidate_included(candidate_hash);
let is_backed = scraper.is_candidate_backed(candidate_hash);
let is_confirmed = vote_state.is_confirmed();
let all_invalid_votes_disabled = vote_state.invalid_votes_all_disabled(is_disabled);
let ignore_disabled = !is_confirmed && all_invalid_votes_disabled;
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
?is_disputed,
?is_included,
?is_backed,
?is_confirmed,
?all_invalid_votes_disabled,
?ignore_disabled,
"Checking for potential spam"
);
(is_disputed && !is_included && !is_backed && !is_confirmed) || ignore_disabled
}
#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
async fn send_dispute_messages<Context>(
ctx: &mut Context,
env: &CandidateEnvironment<'_>,
vote_state: &CandidateVoteState<CandidateVotes>,
) {
for own_vote in vote_state.own_votes().into_iter().flatten() {
let (validator_index, (kind, sig)) = own_vote;
let public_key = if let Some(key) = env.session_info().validators.get(*validator_index) {
key.clone()
} else {
gum::error!(
target: LOG_TARGET,
?validator_index,
session_index = ?env.session_index(),
"Could not find our own key in `SessionInfo`"
);
continue
};
let our_vote_signed = SignedDisputeStatement::new_checked(
kind.clone(),
vote_state.votes().candidate_receipt.hash(),
env.session_index(),
public_key,
sig.clone(),
);
let our_vote_signed = match our_vote_signed {
Ok(signed) => signed,
Err(()) => {
gum::error!(
target: LOG_TARGET,
"Checking our own signature failed - db corruption?"
);
continue
},
};
let dispute_message = match make_dispute_message(
env.session_info(),
vote_state.votes(),
our_vote_signed,
*validator_index,
) {
Err(err) => {
gum::debug!(target: LOG_TARGET, ?err, "Creating dispute message failed.");
continue
},
Ok(dispute_message) => dispute_message,
};
ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message)).await;
}
}
#[derive(Debug, thiserror::Error)]
pub enum DisputeMessageCreationError {
#[error("There was no opposite vote available")]
NoOppositeVote,
#[error("Found vote had an invalid validator index that could not be found")]
InvalidValidatorIndex,
#[error("Statement found in votes had invalid signature.")]
InvalidStoredStatement,
#[error(transparent)]
InvalidStatementCombination(DisputeMessageCheckError),
}
pub fn make_dispute_message(
info: &SessionInfo,
votes: &CandidateVotes,
our_vote: SignedDisputeStatement,
our_index: ValidatorIndex,
) -> std::result::Result<DisputeMessage, DisputeMessageCreationError> {
let validators = &info.validators;
let (valid_statement, valid_index, invalid_statement, invalid_index) =
if let DisputeStatement::Valid(_) = our_vote.statement() {
let (validator_index, (statement_kind, validator_signature)) =
votes.invalid.iter().next().ok_or(DisputeMessageCreationError::NoOppositeVote)?;
let other_vote = SignedDisputeStatement::new_checked(
DisputeStatement::Invalid(*statement_kind),
*our_vote.candidate_hash(),
our_vote.session_index(),
validators
.get(*validator_index)
.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
.clone(),
validator_signature.clone(),
)
.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
(our_vote, our_index, other_vote, *validator_index)
} else {
let (validator_index, (statement_kind, validator_signature)) = votes
.valid
.raw()
.iter()
.next()
.ok_or(DisputeMessageCreationError::NoOppositeVote)?;
let other_vote = SignedDisputeStatement::new_checked(
DisputeStatement::Valid(statement_kind.clone()),
*our_vote.candidate_hash(),
our_vote.session_index(),
validators
.get(*validator_index)
.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
.clone(),
validator_signature.clone(),
)
.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
(other_vote, *validator_index, our_vote, our_index)
};
DisputeMessage::from_signed_statements(
valid_statement,
valid_index,
invalid_statement,
invalid_index,
votes.candidate_receipt.clone(),
info,
)
.map_err(DisputeMessageCreationError::InvalidStatementCombination)
}