referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_dispute_coordinator/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Implements the dispute coordinator subsystem.
18//!
19//! This is the central subsystem of the node-side components which participate in disputes.
20//! This subsystem wraps a database which tracks all statements observed by all validators over some
21//! window of sessions. Votes older than this session window are pruned.
22//!
23//! This subsystem will be the point which produce dispute votes, either positive or negative, based
24//! on locally-observed validation results as well as a sink for votes received by other subsystems.
25//! When importing a dispute vote from another node, this will trigger dispute participation to
26//! recover and validate the block.
27
28use std::sync::Arc;
29
30use error::FatalError;
31use futures::FutureExt;
32
33use gum::CandidateHash;
34use sc_keystore::LocalKeystore;
35
36use polkadot_node_primitives::{
37	CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement,
38	DISPUTE_WINDOW,
39};
40use polkadot_node_subsystem::{
41	messages::DisputeDistributionMessage, overseer, ActivatedLeaf, FromOrchestra, OverseerSignal,
42	SpawnedSubsystem, SubsystemError,
43};
44use polkadot_node_subsystem_util::{
45	database::Database,
46	runtime::{Config as RuntimeInfoConfig, RuntimeInfo},
47	ControlledValidatorIndices,
48};
49use polkadot_primitives::{
50	DisputeStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidatorIndex,
51};
52
53use crate::{
54	error::{FatalResult, Result},
55	metrics::Metrics,
56	status::{get_active_with_status, SystemClock},
57};
58use backend::{Backend, OverlayedBackend};
59use db::v1::DbBackend;
60use fatality::Split;
61
62use self::{
63	import::{CandidateEnvironment, CandidateVoteState},
64	participation::{ParticipationPriority, ParticipationRequest},
65	spam_slots::{SpamSlots, UnconfirmedDisputes},
66};
67
68pub(crate) mod backend;
69pub(crate) mod db;
70pub(crate) mod error;
71
72/// Subsystem after receiving the first active leaf.
73mod initialized;
74use initialized::{InitialData, Initialized};
75
76/// Provider of data scraped from chain.
77///
78/// If we have seen a candidate included somewhere, we should treat it as priority and will be able
79/// to provide an ordering for participation. Thus a dispute for a candidate where we can get some
80/// ordering is high-priority (we know it is a valid dispute) and those can be ordered by
81/// `participation` based on `relay_parent` block number and other metrics, so each validator will
82/// participate in disputes in a similar order, which ensures we will be resolving disputes, even
83/// under heavy load.
84mod scraping;
85use scraping::ChainScraper;
86
87/// When importing votes we will check via the `ordering` module, whether or not we know of the
88/// candidate to be included somewhere. If not, the votes might be spam, in this case we want to
89/// limit the amount of locally imported votes, to prevent DoS attacks/resource exhaustion. The
90/// `spam_slots` module helps keeping track of unconfirmed disputes per validators, if a spam slot
91/// gets full, we will drop any further potential spam votes from that validator and report back
92/// that the import failed. Which will lead to any honest validator to retry, thus the spam slots
93/// can be relatively small, as a drop is not fatal.
94mod spam_slots;
95
96/// Handling of participation requests via `Participation`.
97///
98/// `Participation` provides an API (`Participation::queue_participation`) for queuing of dispute
99/// participations and will process those participation requests, such that most important/urgent
100/// disputes will be resolved and processed first and more importantly it will order requests in a
101/// way so disputes will get resolved, even if there are lots of them.
102pub(crate) mod participation;
103
104/// Pure processing of vote imports.
105pub(crate) mod import;
106
107/// Metrics types.
108mod metrics;
109
110/// Status tracking of disputes (`DisputeStatus`).
111mod status;
112
113use crate::status::Clock;
114
115#[cfg(test)]
116mod tests;
117
118pub(crate) const LOG_TARGET: &str = "parachain::dispute-coordinator";
119
120/// An implementation of the dispute coordinator subsystem.
121pub struct DisputeCoordinatorSubsystem {
122	config: Config,
123	store: Arc<dyn Database>,
124	keystore: Arc<LocalKeystore>,
125	metrics: Metrics,
126}
127
128/// Configuration for the dispute coordinator subsystem.
129#[derive(Debug, Clone, Copy)]
130pub struct Config {
131	/// The data column in the store to use for dispute data.
132	pub col_dispute_data: u32,
133}
134
135impl Config {
136	fn column_config(&self) -> db::v1::ColumnConfiguration {
137		db::v1::ColumnConfiguration { col_dispute_data: self.col_dispute_data }
138	}
139}
140
141#[overseer::subsystem(DisputeCoordinator, error=SubsystemError, prefix=self::overseer)]
142impl<Context: Send> DisputeCoordinatorSubsystem {
143	fn start(self, ctx: Context) -> SpawnedSubsystem {
144		let future = async {
145			let backend = DbBackend::new(
146				self.store.clone(),
147				self.config.column_config(),
148				self.metrics.clone(),
149			);
150			self.run(ctx, backend, Box::new(SystemClock))
151				.await
152				.map_err(|e| SubsystemError::with_origin("dispute-coordinator", e))
153		}
154		.boxed();
155
156		SpawnedSubsystem { name: "dispute-coordinator-subsystem", future }
157	}
158}
159
160#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
161impl DisputeCoordinatorSubsystem {
162	/// Create a new instance of the subsystem.
163	pub fn new(
164		store: Arc<dyn Database>,
165		config: Config,
166		keystore: Arc<LocalKeystore>,
167		metrics: Metrics,
168	) -> Self {
169		Self { store, config, keystore, metrics }
170	}
171
172	/// Initialize and afterwards run `Initialized::run`.
173	async fn run<B, Context>(
174		self,
175		mut ctx: Context,
176		backend: B,
177		clock: Box<dyn Clock>,
178	) -> FatalResult<()>
179	where
180		B: Backend + 'static,
181	{
182		let res = self.initialize(&mut ctx, backend, &*clock).await?;
183
184		let (participations, votes, first_leaf, initialized, backend) = match res {
185			// Concluded:
186			None => return Ok(()),
187			Some(r) => r,
188		};
189
190		initialized
191			.run(ctx, backend, Some(InitialData { participations, votes, leaf: first_leaf }), clock)
192			.await
193	}
194
195	/// Make sure to recover participations properly on startup.
196	async fn initialize<B, Context>(
197		self,
198		ctx: &mut Context,
199		mut backend: B,
200		clock: &(dyn Clock),
201	) -> FatalResult<
202		Option<(
203			Vec<(ParticipationPriority, ParticipationRequest)>,
204			Vec<ScrapedOnChainVotes>,
205			ActivatedLeaf,
206			Initialized,
207			B,
208		)>,
209	>
210	where
211		B: Backend + 'static,
212	{
213		loop {
214			let first_leaf = match wait_for_first_leaf(ctx).await {
215				Ok(Some(activated_leaf)) => activated_leaf,
216				Ok(None) => continue,
217				Err(e) => {
218					e.split()?.log();
219					continue
220				},
221			};
222
223			// `RuntimeInfo` cache should match `DISPUTE_WINDOW` so that we can
224			// keep all sessions for a dispute window
225			let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig {
226				keystore: None,
227				session_cache_lru_size: DISPUTE_WINDOW.get(),
228			});
229			let mut overlay_db = OverlayedBackend::new(&mut backend);
230			let (
231				participations,
232				votes,
233				spam_slots,
234				ordering_provider,
235				highest_session_seen,
236				gaps_in_cache,
237				offchain_disabled_validators,
238				controlled_validator_indices,
239			) = match self
240				.handle_startup(ctx, first_leaf.clone(), &mut runtime_info, &mut overlay_db, clock)
241				.await
242			{
243				Ok(v) => v,
244				Err(e) => {
245					e.split()?.log();
246					continue
247				},
248			};
249			if !overlay_db.is_empty() {
250				let ops = overlay_db.into_write_ops();
251				backend.write(ops)?;
252			}
253
254			return Ok(Some((
255				participations,
256				votes,
257				first_leaf,
258				Initialized::new(
259					self,
260					runtime_info,
261					spam_slots,
262					ordering_provider,
263					highest_session_seen,
264					gaps_in_cache,
265					offchain_disabled_validators,
266					controlled_validator_indices,
267				),
268				backend,
269			)))
270		}
271	}
272
273	// Restores the subsystem's state before proceeding with the main event loop.
274	//
275	// - Prune any old disputes.
276	// - Find disputes we need to participate in.
277	// - Initialize spam slots & OrderingProvider.
278	async fn handle_startup<Context>(
279		&self,
280		ctx: &mut Context,
281		initial_head: ActivatedLeaf,
282		runtime_info: &mut RuntimeInfo,
283		overlay_db: &mut OverlayedBackend<'_, impl Backend>,
284		clock: &dyn Clock,
285	) -> Result<(
286		Vec<(ParticipationPriority, ParticipationRequest)>,
287		Vec<ScrapedOnChainVotes>,
288		SpamSlots,
289		ChainScraper,
290		SessionIndex,
291		bool,
292		initialized::OffchainDisabledValidators,
293		ControlledValidatorIndices,
294	)> {
295		let now = clock.now();
296
297		// We assume the highest session is the passed leaf. If we can't get the session index
298		// we can't initialize the subsystem so we'll wait for a new leaf
299		let highest_session = runtime_info
300			.get_session_index_for_child(ctx.sender(), initial_head.hash)
301			.await?;
302		let earliest_session = highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1);
303
304		// Load recent disputes from the database
305		let recent_disputes = match overlay_db.load_recent_disputes() {
306			Ok(disputes) => disputes.unwrap_or_default(),
307			Err(e) => {
308				gum::error!(target: LOG_TARGET, "Failed initial load of recent disputes: {:?}", e);
309				return Err(e.into())
310			},
311		};
312
313		// Initialize offchain disabled validators from recent disputes
314		let offchain_disabled_validators = initialized::OffchainDisabledValidators::new_from_state(
315			&recent_disputes,
316			|session, candidate_hash| match overlay_db.load_candidate_votes(session, candidate_hash)
317			{
318				Ok(Some(votes)) => Some(votes.into()),
319				_ => None,
320			},
321			earliest_session,
322		);
323
324		let active_disputes = get_active_with_status(recent_disputes.into_iter(), now);
325
326		let mut gap_in_cache = false;
327		// Cache the sessions. A failure to fetch a session here is not that critical so we
328		// won't abort the initialization
329		for idx in earliest_session..=highest_session {
330			// Print disabled validators on startup if any
331			let disabled: Vec<u32> = offchain_disabled_validators.iter(idx).map(|i| i.0).collect();
332			if !disabled.is_empty() {
333				gum::info!(
334					target: LOG_TARGET,
335					disabled = ?disabled,
336					session = idx,
337					"Detected disabled validators on startup",
338				);
339			}
340
341			if let Err(e) = runtime_info
342				.get_session_info_by_index(ctx.sender(), initial_head.hash, idx)
343				.await
344			{
345				gum::debug!(
346					target: LOG_TARGET,
347					leaf_hash = ?initial_head.hash,
348					session_idx = idx,
349					err = ?e,
350					"Can't cache SessionInfo during subsystem initialization. Skipping session."
351				);
352				gap_in_cache = true;
353				continue
354			};
355		}
356
357		// Prune obsolete disputes:
358		db::v1::note_earliest_session(overlay_db, earliest_session)?;
359
360		let mut participation_requests = Vec::new();
361		let mut spam_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new();
362		let mut controlled_indices =
363			ControlledValidatorIndices::new(self.keystore.clone(), DISPUTE_WINDOW.get());
364		let leaf_hash = initial_head.hash;
365		let (scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?;
366		for ((session, ref candidate_hash), _) in active_disputes {
367			let env = match CandidateEnvironment::new(
368				ctx,
369				runtime_info,
370				highest_session,
371				leaf_hash,
372				offchain_disabled_validators.iter(session),
373				&mut controlled_indices,
374			)
375			.await
376			{
377				None => {
378					gum::warn!(
379						target: LOG_TARGET,
380						session,
381						"We are lacking a `SessionInfo` for handling db votes on startup."
382					);
383
384					continue
385				},
386				Some(env) => env,
387			};
388
389			let votes: CandidateVotes =
390				match overlay_db.load_candidate_votes(session, candidate_hash) {
391					Ok(Some(votes)) => votes.into(),
392					Ok(None) => continue,
393					Err(e) => {
394						gum::error!(
395							target: LOG_TARGET,
396							"Failed initial load of candidate votes: {:?}",
397							e
398						);
399						continue
400					},
401				};
402			let vote_state = CandidateVoteState::new(votes, &env, now);
403			let is_disabled = |v: &ValidatorIndex| env.disabled_indices().contains(v);
404			let potential_spam =
405				is_potential_spam(&scraper, &vote_state, candidate_hash, is_disabled);
406			let is_included =
407				scraper.is_candidate_included(&vote_state.votes().candidate_receipt.hash());
408
409			if potential_spam {
410				gum::trace!(
411					target: LOG_TARGET,
412					?session,
413					?candidate_hash,
414					"Found potential spam dispute on startup"
415				);
416				spam_disputes
417					.insert((session, *candidate_hash), vote_state.votes().voted_indices());
418			} else {
419				// Participate if need be:
420				if vote_state.own_vote_missing() {
421					gum::trace!(
422						target: LOG_TARGET,
423						?session,
424						?candidate_hash,
425						"Found valid dispute, with no vote from us on startup - participating."
426					);
427					let request_timer = self.metrics.time_participation_pipeline();
428					participation_requests.push((
429						ParticipationPriority::with_priority_if(is_included),
430						ParticipationRequest::new(
431							vote_state.votes().candidate_receipt.clone(),
432							session,
433							env.executor_params().clone(),
434							request_timer,
435						),
436					));
437				}
438				// Else make sure our own vote is distributed:
439				else {
440					gum::trace!(
441						target: LOG_TARGET,
442						?session,
443						?candidate_hash,
444						"Found valid dispute, with vote from us on startup - send vote."
445					);
446					send_dispute_messages(ctx, &env, &vote_state).await;
447				}
448			}
449		}
450
451		Ok((
452			participation_requests,
453			votes,
454			SpamSlots::recover_from_state(spam_disputes),
455			scraper,
456			highest_session,
457			gap_in_cache,
458			offchain_disabled_validators,
459			controlled_indices,
460		))
461	}
462}
463
464/// Wait for `ActiveLeavesUpdate`, returns `None` if `Conclude` signal came first.
465#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
466async fn wait_for_first_leaf<Context>(ctx: &mut Context) -> Result<Option<ActivatedLeaf>> {
467	loop {
468		match ctx.recv().await.map_err(FatalError::SubsystemReceive)? {
469			FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(None),
470			FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
471				if let Some(activated) = update.activated {
472					return Ok(Some(activated))
473				}
474			},
475			FromOrchestra::Signal(OverseerSignal::BlockFinalized(_, _)) => {},
476			FromOrchestra::Communication { msg } =>
477			// NOTE: We could technically actually handle a couple of message types, even if
478			// not initialized (e.g. all requests that only query the database). The problem
479			// is, we would deliver potentially outdated information, especially in the event
480			// of bugs where initialization fails for a while (e.g. `SessionInfo`s are not
481			// available). So instead of telling subsystems, everything is fine, because of an
482			// hour old database state, we should rather cancel contained oneshots and delay
483			// finality until we are fully functional.
484			{
485				gum::warn!(
486					target: LOG_TARGET,
487					?msg,
488					"Received msg before first active leaves update. This is not expected - message will be dropped."
489				)
490			},
491		}
492	}
493}
494
495/// Check whether a dispute for the given candidate could be spam.
496///
497/// That is the candidate could be made up.
498pub fn is_potential_spam(
499	scraper: &ChainScraper,
500	vote_state: &CandidateVoteState<CandidateVotes>,
501	candidate_hash: &CandidateHash,
502	is_disabled: impl FnMut(&ValidatorIndex) -> bool,
503) -> bool {
504	let is_disputed = vote_state.is_disputed();
505	let is_included = scraper.is_candidate_included(candidate_hash);
506	let is_backed = scraper.is_candidate_backed(candidate_hash);
507	let is_confirmed = vote_state.is_confirmed();
508	let all_invalid_votes_disabled = vote_state.invalid_votes_all_disabled(is_disabled);
509	let ignore_disabled = !is_confirmed && all_invalid_votes_disabled;
510
511	gum::trace!(
512		target: LOG_TARGET,
513		?candidate_hash,
514		?is_disputed,
515		?is_included,
516		?is_backed,
517		?is_confirmed,
518		?all_invalid_votes_disabled,
519		?ignore_disabled,
520		"Checking for potential spam"
521	);
522
523	(is_disputed && !is_included && !is_backed && !is_confirmed) || ignore_disabled
524}
525
526/// Tell dispute-distribution to send all our votes.
527///
528/// Should be called on startup for all active disputes where there are votes from us already.
529#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
530async fn send_dispute_messages<Context>(
531	ctx: &mut Context,
532	env: &CandidateEnvironment<'_>,
533	vote_state: &CandidateVoteState<CandidateVotes>,
534) {
535	for own_vote in vote_state.own_votes().into_iter().flatten() {
536		let (validator_index, (kind, sig)) = own_vote;
537		let public_key = if let Some(key) = env.session_info().validators.get(*validator_index) {
538			key.clone()
539		} else {
540			gum::error!(
541				target: LOG_TARGET,
542				?validator_index,
543				session_index = ?env.session_index(),
544				"Could not find our own key in `SessionInfo`"
545			);
546			continue
547		};
548		let our_vote_signed = SignedDisputeStatement::new_checked(
549			kind.clone(),
550			vote_state.votes().candidate_receipt.hash(),
551			env.session_index(),
552			public_key,
553			sig.clone(),
554		);
555		let our_vote_signed = match our_vote_signed {
556			Ok(signed) => signed,
557			Err(()) => {
558				gum::error!(
559					target: LOG_TARGET,
560					"Checking our own signature failed - db corruption?"
561				);
562				continue
563			},
564		};
565		let dispute_message = match make_dispute_message(
566			env.session_info(),
567			vote_state.votes(),
568			our_vote_signed,
569			*validator_index,
570		) {
571			Err(err) => {
572				gum::debug!(target: LOG_TARGET, ?err, "Creating dispute message failed.");
573				continue
574			},
575			Ok(dispute_message) => dispute_message,
576		};
577
578		ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message)).await;
579	}
580}
581
582#[derive(Debug, thiserror::Error)]
583pub enum DisputeMessageCreationError {
584	#[error("There was no opposite vote available")]
585	NoOppositeVote,
586	#[error("Found vote had an invalid validator index that could not be found")]
587	InvalidValidatorIndex,
588	#[error("Statement found in votes had invalid signature.")]
589	InvalidStoredStatement,
590	#[error(transparent)]
591	InvalidStatementCombination(DisputeMessageCheckError),
592}
593
594/// Create a `DisputeMessage` to be sent to `DisputeDistribution`.
595pub fn make_dispute_message(
596	info: &SessionInfo,
597	votes: &CandidateVotes,
598	our_vote: SignedDisputeStatement,
599	our_index: ValidatorIndex,
600) -> std::result::Result<DisputeMessage, DisputeMessageCreationError> {
601	let validators = &info.validators;
602
603	let (valid_statement, valid_index, invalid_statement, invalid_index) =
604		if let DisputeStatement::Valid(_) = our_vote.statement() {
605			let (validator_index, (statement_kind, validator_signature)) =
606				votes.invalid.iter().next().ok_or(DisputeMessageCreationError::NoOppositeVote)?;
607			let other_vote = SignedDisputeStatement::new_checked(
608				DisputeStatement::Invalid(*statement_kind),
609				*our_vote.candidate_hash(),
610				our_vote.session_index(),
611				validators
612					.get(*validator_index)
613					.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
614					.clone(),
615				validator_signature.clone(),
616			)
617			.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
618			(our_vote, our_index, other_vote, *validator_index)
619		} else {
620			let (validator_index, (statement_kind, validator_signature)) = votes
621				.valid
622				.raw()
623				.iter()
624				.next()
625				.ok_or(DisputeMessageCreationError::NoOppositeVote)?;
626			let other_vote = SignedDisputeStatement::new_checked(
627				DisputeStatement::Valid(statement_kind.clone()),
628				*our_vote.candidate_hash(),
629				our_vote.session_index(),
630				validators
631					.get(*validator_index)
632					.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
633					.clone(),
634				validator_signature.clone(),
635			)
636			.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
637			(other_vote, *validator_index, our_vote, our_index)
638		};
639
640	DisputeMessage::from_signed_statements(
641		valid_statement,
642		valid_index,
643		invalid_statement,
644		invalid_index,
645		votes.candidate_receipt.clone(),
646		info,
647	)
648	.map_err(DisputeMessageCreationError::InvalidStatementCombination)
649}