1use std::sync::Arc;
29
30use error::FatalError;
31use futures::FutureExt;
32
33use gum::CandidateHash;
34use sc_keystore::LocalKeystore;
35
36use polkadot_node_primitives::{
37 CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement,
38 DISPUTE_WINDOW,
39};
40use polkadot_node_subsystem::{
41 messages::DisputeDistributionMessage, overseer, ActivatedLeaf, FromOrchestra, OverseerSignal,
42 SpawnedSubsystem, SubsystemError,
43};
44use polkadot_node_subsystem_util::{
45 database::Database,
46 runtime::{Config as RuntimeInfoConfig, RuntimeInfo},
47 ControlledValidatorIndices,
48};
49use polkadot_primitives::{
50 DisputeStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidatorIndex,
51};
52
53use crate::{
54 error::{FatalResult, Result},
55 metrics::Metrics,
56 status::{get_active_with_status, SystemClock},
57};
58use backend::{Backend, OverlayedBackend};
59use db::v1::DbBackend;
60use fatality::Split;
61
62use self::{
63 import::{CandidateEnvironment, CandidateVoteState},
64 participation::{ParticipationPriority, ParticipationRequest},
65 spam_slots::{SpamSlots, UnconfirmedDisputes},
66};
67
68pub(crate) mod backend;
69pub(crate) mod db;
70pub(crate) mod error;
71
72mod initialized;
74use initialized::{InitialData, Initialized};
75
76mod scraping;
85use scraping::ChainScraper;
86
87mod spam_slots;
95
96pub(crate) mod participation;
103
104pub(crate) mod import;
106
107mod metrics;
109
110mod status;
112
113use crate::status::Clock;
114
115#[cfg(test)]
116mod tests;
117
118pub(crate) const LOG_TARGET: &str = "parachain::dispute-coordinator";
119
120pub struct DisputeCoordinatorSubsystem {
122 config: Config,
123 store: Arc<dyn Database>,
124 keystore: Arc<LocalKeystore>,
125 metrics: Metrics,
126}
127
128#[derive(Debug, Clone, Copy)]
130pub struct Config {
131 pub col_dispute_data: u32,
133}
134
135impl Config {
136 fn column_config(&self) -> db::v1::ColumnConfiguration {
137 db::v1::ColumnConfiguration { col_dispute_data: self.col_dispute_data }
138 }
139}
140
141#[overseer::subsystem(DisputeCoordinator, error=SubsystemError, prefix=self::overseer)]
142impl<Context: Send> DisputeCoordinatorSubsystem {
143 fn start(self, ctx: Context) -> SpawnedSubsystem {
144 let future = async {
145 let backend = DbBackend::new(
146 self.store.clone(),
147 self.config.column_config(),
148 self.metrics.clone(),
149 );
150 self.run(ctx, backend, Box::new(SystemClock))
151 .await
152 .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e))
153 }
154 .boxed();
155
156 SpawnedSubsystem { name: "dispute-coordinator-subsystem", future }
157 }
158}
159
160#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
161impl DisputeCoordinatorSubsystem {
162 pub fn new(
164 store: Arc<dyn Database>,
165 config: Config,
166 keystore: Arc<LocalKeystore>,
167 metrics: Metrics,
168 ) -> Self {
169 Self { store, config, keystore, metrics }
170 }
171
172 async fn run<B, Context>(
174 self,
175 mut ctx: Context,
176 backend: B,
177 clock: Box<dyn Clock>,
178 ) -> FatalResult<()>
179 where
180 B: Backend + 'static,
181 {
182 let res = self.initialize(&mut ctx, backend, &*clock).await?;
183
184 let (participations, votes, first_leaf, initialized, backend) = match res {
185 None => return Ok(()),
187 Some(r) => r,
188 };
189
190 initialized
191 .run(ctx, backend, Some(InitialData { participations, votes, leaf: first_leaf }), clock)
192 .await
193 }
194
195 async fn initialize<B, Context>(
197 self,
198 ctx: &mut Context,
199 mut backend: B,
200 clock: &(dyn Clock),
201 ) -> FatalResult<
202 Option<(
203 Vec<(ParticipationPriority, ParticipationRequest)>,
204 Vec<ScrapedOnChainVotes>,
205 ActivatedLeaf,
206 Initialized,
207 B,
208 )>,
209 >
210 where
211 B: Backend + 'static,
212 {
213 loop {
214 let first_leaf = match wait_for_first_leaf(ctx).await {
215 Ok(Some(activated_leaf)) => activated_leaf,
216 Ok(None) => continue,
217 Err(e) => {
218 e.split()?.log();
219 continue
220 },
221 };
222
223 let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig {
226 keystore: None,
227 session_cache_lru_size: DISPUTE_WINDOW.get(),
228 });
229 let mut overlay_db = OverlayedBackend::new(&mut backend);
230 let (
231 participations,
232 votes,
233 spam_slots,
234 ordering_provider,
235 highest_session_seen,
236 gaps_in_cache,
237 offchain_disabled_validators,
238 controlled_validator_indices,
239 ) = match self
240 .handle_startup(ctx, first_leaf.clone(), &mut runtime_info, &mut overlay_db, clock)
241 .await
242 {
243 Ok(v) => v,
244 Err(e) => {
245 e.split()?.log();
246 continue
247 },
248 };
249 if !overlay_db.is_empty() {
250 let ops = overlay_db.into_write_ops();
251 backend.write(ops)?;
252 }
253
254 return Ok(Some((
255 participations,
256 votes,
257 first_leaf,
258 Initialized::new(
259 self,
260 runtime_info,
261 spam_slots,
262 ordering_provider,
263 highest_session_seen,
264 gaps_in_cache,
265 offchain_disabled_validators,
266 controlled_validator_indices,
267 ),
268 backend,
269 )))
270 }
271 }
272
273 async fn handle_startup<Context>(
279 &self,
280 ctx: &mut Context,
281 initial_head: ActivatedLeaf,
282 runtime_info: &mut RuntimeInfo,
283 overlay_db: &mut OverlayedBackend<'_, impl Backend>,
284 clock: &dyn Clock,
285 ) -> Result<(
286 Vec<(ParticipationPriority, ParticipationRequest)>,
287 Vec<ScrapedOnChainVotes>,
288 SpamSlots,
289 ChainScraper,
290 SessionIndex,
291 bool,
292 initialized::OffchainDisabledValidators,
293 ControlledValidatorIndices,
294 )> {
295 let now = clock.now();
296
297 let highest_session = runtime_info
300 .get_session_index_for_child(ctx.sender(), initial_head.hash)
301 .await?;
302 let earliest_session = highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1);
303
304 let recent_disputes = match overlay_db.load_recent_disputes() {
306 Ok(disputes) => disputes.unwrap_or_default(),
307 Err(e) => {
308 gum::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e);
309 return Err(e.into())
310 },
311 };
312
313 let offchain_disabled_validators = initialized::OffchainDisabledValidators::new_from_state(
315 &recent_disputes,
316 |session, candidate_hash| match overlay_db.load_candidate_votes(session, candidate_hash)
317 {
318 Ok(Some(votes)) => Some(votes.into()),
319 _ => None,
320 },
321 earliest_session,
322 );
323
324 let active_disputes = get_active_with_status(recent_disputes.into_iter(), now);
325
326 let mut gap_in_cache = false;
327 for idx in earliest_session..=highest_session {
330 let disabled: Vec<u32> = offchain_disabled_validators.iter(idx).map(|i| i.0).collect();
332 if !disabled.is_empty() {
333 gum::info!(
334 target: LOG_TARGET,
335 disabled = ?disabled,
336 session = idx,
337 "Detected disabled validators on startup",
338 );
339 }
340
341 if let Err(e) = runtime_info
342 .get_session_info_by_index(ctx.sender(), initial_head.hash, idx)
343 .await
344 {
345 gum::debug!(
346 target: LOG_TARGET,
347 leaf_hash = ?initial_head.hash,
348 session_idx = idx,
349 err = ?e,
350 "Can't cache SessionInfo during subsystem initialization. Skipping session."
351 );
352 gap_in_cache = true;
353 continue
354 };
355 }
356
357 db::v1::note_earliest_session(overlay_db, earliest_session)?;
359
360 let mut participation_requests = Vec::new();
361 let mut spam_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new();
362 let mut controlled_indices =
363 ControlledValidatorIndices::new(self.keystore.clone(), DISPUTE_WINDOW.get());
364 let leaf_hash = initial_head.hash;
365 let (scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?;
366 for ((session, ref candidate_hash), _) in active_disputes {
367 let env = match CandidateEnvironment::new(
368 ctx,
369 runtime_info,
370 highest_session,
371 leaf_hash,
372 offchain_disabled_validators.iter(session),
373 &mut controlled_indices,
374 )
375 .await
376 {
377 None => {
378 gum::warn!(
379 target: LOG_TARGET,
380 session,
381 "We are lacking a `SessionInfo` for handling db votes on startup."
382 );
383
384 continue
385 },
386 Some(env) => env,
387 };
388
389 let votes: CandidateVotes =
390 match overlay_db.load_candidate_votes(session, candidate_hash) {
391 Ok(Some(votes)) => votes.into(),
392 Ok(None) => continue,
393 Err(e) => {
394 gum::error!(
395 target: LOG_TARGET,
396 "Failed initial load of candidate votes: {:?}",
397 e
398 );
399 continue
400 },
401 };
402 let vote_state = CandidateVoteState::new(votes, &env, now);
403 let is_disabled = |v: &ValidatorIndex| env.disabled_indices().contains(v);
404 let potential_spam =
405 is_potential_spam(&scraper, &vote_state, candidate_hash, is_disabled);
406 let is_included =
407 scraper.is_candidate_included(&vote_state.votes().candidate_receipt.hash());
408
409 if potential_spam {
410 gum::trace!(
411 target: LOG_TARGET,
412 ?session,
413 ?candidate_hash,
414 "Found potential spam dispute on startup"
415 );
416 spam_disputes
417 .insert((session, *candidate_hash), vote_state.votes().voted_indices());
418 } else {
419 if vote_state.own_vote_missing() {
421 gum::trace!(
422 target: LOG_TARGET,
423 ?session,
424 ?candidate_hash,
425 "Found valid dispute, with no vote from us on startup - participating."
426 );
427 let request_timer = self.metrics.time_participation_pipeline();
428 participation_requests.push((
429 ParticipationPriority::with_priority_if(is_included),
430 ParticipationRequest::new(
431 vote_state.votes().candidate_receipt.clone(),
432 session,
433 env.executor_params().clone(),
434 request_timer,
435 ),
436 ));
437 }
438 else {
440 gum::trace!(
441 target: LOG_TARGET,
442 ?session,
443 ?candidate_hash,
444 "Found valid dispute, with vote from us on startup - send vote."
445 );
446 send_dispute_messages(ctx, &env, &vote_state).await;
447 }
448 }
449 }
450
451 Ok((
452 participation_requests,
453 votes,
454 SpamSlots::recover_from_state(spam_disputes),
455 scraper,
456 highest_session,
457 gap_in_cache,
458 offchain_disabled_validators,
459 controlled_indices,
460 ))
461 }
462}
463
464#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
466async fn wait_for_first_leaf<Context>(ctx: &mut Context) -> Result<Option<ActivatedLeaf>> {
467 loop {
468 match ctx.recv().await.map_err(FatalError::SubsystemReceive)? {
469 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(None),
470 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
471 if let Some(activated) = update.activated {
472 return Ok(Some(activated))
473 }
474 },
475 FromOrchestra::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
476 FromOrchestra::Communication { msg } =>
477 {
485 gum::warn!(
486 target: LOG_TARGET,
487 ?msg,
488 "Received msg before first active leaves update. This is not expected - message will be dropped."
489 )
490 },
491 }
492 }
493}
494
495pub fn is_potential_spam(
499 scraper: &ChainScraper,
500 vote_state: &CandidateVoteState<CandidateVotes>,
501 candidate_hash: &CandidateHash,
502 is_disabled: impl FnMut(&ValidatorIndex) -> bool,
503) -> bool {
504 let is_disputed = vote_state.is_disputed();
505 let is_included = scraper.is_candidate_included(candidate_hash);
506 let is_backed = scraper.is_candidate_backed(candidate_hash);
507 let is_confirmed = vote_state.is_confirmed();
508 let all_invalid_votes_disabled = vote_state.invalid_votes_all_disabled(is_disabled);
509 let ignore_disabled = !is_confirmed && all_invalid_votes_disabled;
510
511 gum::trace!(
512 target: LOG_TARGET,
513 ?candidate_hash,
514 ?is_disputed,
515 ?is_included,
516 ?is_backed,
517 ?is_confirmed,
518 ?all_invalid_votes_disabled,
519 ?ignore_disabled,
520 "Checking for potential spam"
521 );
522
523 (is_disputed && !is_included && !is_backed && !is_confirmed) || ignore_disabled
524}
525
526#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
530async fn send_dispute_messages<Context>(
531 ctx: &mut Context,
532 env: &CandidateEnvironment<'_>,
533 vote_state: &CandidateVoteState<CandidateVotes>,
534) {
535 for own_vote in vote_state.own_votes().into_iter().flatten() {
536 let (validator_index, (kind, sig)) = own_vote;
537 let public_key = if let Some(key) = env.session_info().validators.get(*validator_index) {
538 key.clone()
539 } else {
540 gum::error!(
541 target: LOG_TARGET,
542 ?validator_index,
543 session_index = ?env.session_index(),
544 "Could not find our own key in `SessionInfo`"
545 );
546 continue
547 };
548 let our_vote_signed = SignedDisputeStatement::new_checked(
549 kind.clone(),
550 vote_state.votes().candidate_receipt.hash(),
551 env.session_index(),
552 public_key,
553 sig.clone(),
554 );
555 let our_vote_signed = match our_vote_signed {
556 Ok(signed) => signed,
557 Err(()) => {
558 gum::error!(
559 target: LOG_TARGET,
560 "Checking our own signature failed - db corruption?"
561 );
562 continue
563 },
564 };
565 let dispute_message = match make_dispute_message(
566 env.session_info(),
567 vote_state.votes(),
568 our_vote_signed,
569 *validator_index,
570 ) {
571 Err(err) => {
572 gum::debug!(target: LOG_TARGET, ?err, "Creating dispute message failed.");
573 continue
574 },
575 Ok(dispute_message) => dispute_message,
576 };
577
578 ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message)).await;
579 }
580}
581
582#[derive(Debug, thiserror::Error)]
583pub enum DisputeMessageCreationError {
584 #[error("There was no opposite vote available")]
585 NoOppositeVote,
586 #[error("Found vote had an invalid validator index that could not be found")]
587 InvalidValidatorIndex,
588 #[error("Statement found in votes had invalid signature.")]
589 InvalidStoredStatement,
590 #[error(transparent)]
591 InvalidStatementCombination(DisputeMessageCheckError),
592}
593
594pub fn make_dispute_message(
596 info: &SessionInfo,
597 votes: &CandidateVotes,
598 our_vote: SignedDisputeStatement,
599 our_index: ValidatorIndex,
600) -> std::result::Result<DisputeMessage, DisputeMessageCreationError> {
601 let validators = &info.validators;
602
603 let (valid_statement, valid_index, invalid_statement, invalid_index) =
604 if let DisputeStatement::Valid(_) = our_vote.statement() {
605 let (validator_index, (statement_kind, validator_signature)) =
606 votes.invalid.iter().next().ok_or(DisputeMessageCreationError::NoOppositeVote)?;
607 let other_vote = SignedDisputeStatement::new_checked(
608 DisputeStatement::Invalid(*statement_kind),
609 *our_vote.candidate_hash(),
610 our_vote.session_index(),
611 validators
612 .get(*validator_index)
613 .ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
614 .clone(),
615 validator_signature.clone(),
616 )
617 .map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
618 (our_vote, our_index, other_vote, *validator_index)
619 } else {
620 let (validator_index, (statement_kind, validator_signature)) = votes
621 .valid
622 .raw()
623 .iter()
624 .next()
625 .ok_or(DisputeMessageCreationError::NoOppositeVote)?;
626 let other_vote = SignedDisputeStatement::new_checked(
627 DisputeStatement::Valid(statement_kind.clone()),
628 *our_vote.candidate_hash(),
629 our_vote.session_index(),
630 validators
631 .get(*validator_index)
632 .ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
633 .clone(),
634 validator_signature.clone(),
635 )
636 .map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
637 (other_vote, *validator_index, our_vote, our_index)
638 };
639
640 DisputeMessage::from_signed_statements(
641 valid_statement,
642 valid_index,
643 invalid_statement,
644 invalid_index,
645 votes.candidate_receipt.clone(),
646 info,
647 )
648 .map_err(DisputeMessageCreationError::InvalidStatementCombination)
649}