referrerpolicy=no-referrer-when-downgrade

polkadot_node_core_dispute_coordinator/
initialized.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Dispute coordinator subsystem in initialized state (after first active leaf is received).
18
19use std::{
20	collections::{BTreeMap, VecDeque},
21	sync::Arc,
22};
23
24use futures::{
25	channel::{mpsc, oneshot},
26	FutureExt, StreamExt,
27};
28
29use sc_keystore::LocalKeystore;
30
31use polkadot_node_primitives::{
32	disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement,
33	Timestamp, DISPUTE_WINDOW,
34};
35use polkadot_node_subsystem::{
36	messages::{
37		ApprovalVotingParallelMessage, BlockDescription, ChainSelectionMessage,
38		DisputeCoordinatorMessage, DisputeDistributionMessage, ImportStatementsResult,
39	},
40	overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError,
41};
42use polkadot_node_subsystem_util::{
43	runtime::{self, key_ownership_proof, submit_report_dispute_lost, RuntimeInfo},
44	ControlledValidatorIndices,
45};
46use polkadot_primitives::{
47	slashing, BlockNumber, CandidateHash, CandidateReceiptV2 as CandidateReceipt, CompactStatement,
48	DisputeStatement, DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex,
49	ValidDisputeStatementKind, ValidatorId, ValidatorIndex,
50};
51use schnellru::{LruMap, UnlimitedCompact};
52
53use crate::{
54	db::{self, v1::RecentDisputes},
55	error::{log_error, FatalError, FatalResult, JfyiError, JfyiResult, Result},
56	import::{CandidateEnvironment, CandidateVoteState},
57	is_potential_spam,
58	metrics::Metrics,
59	scraping::ScrapedUpdates,
60	status::{get_active_with_status, Clock},
61	DisputeCoordinatorSubsystem, LOG_TARGET,
62};
63
64use super::{
65	backend::Backend,
66	make_dispute_message,
67	participation::{
68		self, Participation, ParticipationPriority, ParticipationRequest, ParticipationStatement,
69		WorkerMessageReceiver,
70	},
71	scraping::ChainScraper,
72	spam_slots::SpamSlots,
73	OverlayedBackend,
74};
75
76/// How many blocks we import votes from per leaf update.
77///
78/// Since vote import is relatively slow, we have to limit the maximum amount of work we do on leaf
79/// updates (and especially on startup) so the dispute coordinator won't be considered stalling.
80const CHAIN_IMPORT_MAX_BATCH_SIZE: usize = 8;
81
82// Initial data for `dispute-coordinator`. It is provided only at first start.
83pub struct InitialData {
84	pub participations: Vec<(ParticipationPriority, ParticipationRequest)>,
85	pub votes: Vec<ScrapedOnChainVotes>,
86	pub leaf: ActivatedLeaf,
87}
88
89/// After the first active leaves update we transition to `Initialized` state.
90///
91/// Before the first active leaves update we can't really do much. We cannot check incoming
92/// statements for validity, we cannot query orderings, we have no valid `SessionInfo`,
93/// ...
94pub(crate) struct Initialized {
95	keystore: Arc<LocalKeystore>,
96	runtime_info: RuntimeInfo,
97	/// We have the onchain state of disabled validators as well as the offchain
98	/// state that is based on the lost disputes.
99	offchain_disabled_validators: OffchainDisabledValidators,
100	/// The indices of the controlled validators, cached by session.
101	controlled_validator_indices: ControlledValidatorIndices,
102	/// This is the highest `SessionIndex` seen via `ActiveLeavesUpdate`. It doesn't matter if it
103	/// was cached successfully or not. It is used to detect ancient disputes.
104	highest_session_seen: SessionIndex,
105	/// Will be set to `true` if an error occurred during the last caching attempt
106	gaps_in_cache: bool,
107	spam_slots: SpamSlots,
108	participation: Participation,
109	scraper: ChainScraper,
110	participation_receiver: WorkerMessageReceiver,
111	/// Backlog of still to be imported votes from chain.
112	///
113	/// For some reason importing votes is relatively slow, if there is a large finality lag (~50
114	/// blocks) we will be too slow importing all votes from unfinalized chains on startup
115	/// (dispute-coordinator gets killed because of unresponsiveness).
116	///
117	/// https://github.com/paritytech/polkadot/issues/6912
118	///
119	/// To resolve this, we limit the amount of votes imported at once to
120	/// `CHAIN_IMPORT_MAX_BATCH_SIZE` and put the rest here for later processing.
121	chain_import_backlog: VecDeque<ScrapedOnChainVotes>,
122	metrics: Metrics,
123}
124
125#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
126impl Initialized {
127	/// Make initialized subsystem, ready to `run`.
128	pub fn new(
129		subsystem: DisputeCoordinatorSubsystem,
130		runtime_info: RuntimeInfo,
131		spam_slots: SpamSlots,
132		scraper: ChainScraper,
133		highest_session_seen: SessionIndex,
134		gaps_in_cache: bool,
135		offchain_disabled_validators: OffchainDisabledValidators,
136		controlled_validator_indices: ControlledValidatorIndices,
137	) -> Self {
138		let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem;
139
140		let (participation_sender, participation_receiver) = mpsc::channel(1);
141		let participation = Participation::new(participation_sender, metrics.clone());
142
143		Self {
144			keystore,
145			runtime_info,
146			offchain_disabled_validators,
147			controlled_validator_indices,
148			highest_session_seen,
149			gaps_in_cache,
150			spam_slots,
151			scraper,
152			participation,
153			participation_receiver,
154			chain_import_backlog: VecDeque::new(),
155			metrics,
156		}
157	}
158
159	/// Run the initialized subsystem.
160	///
161	/// `initial_data` is optional. It is passed on first start and is `None` on subsystem restarts.
162	pub async fn run<B, Context>(
163		mut self,
164		mut ctx: Context,
165		mut backend: B,
166		mut initial_data: Option<InitialData>,
167		clock: Box<dyn Clock>,
168	) -> FatalResult<()>
169	where
170		B: Backend,
171	{
172		loop {
173			let res =
174				self.run_until_error(&mut ctx, &mut backend, &mut initial_data, &*clock).await;
175			if let Ok(()) = res {
176				gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
177				return Ok(())
178			}
179			log_error(res)?;
180		}
181	}
182
183	// Run the subsystem until an error is encountered or a `conclude` signal is received.
184	// Most errors are non-fatal and should lead to another call to this function.
185	//
186	// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
187	// lead to another call to this function.
188	async fn run_until_error<B, Context>(
189		&mut self,
190		ctx: &mut Context,
191		backend: &mut B,
192		initial_data: &mut Option<InitialData>,
193		clock: &dyn Clock,
194	) -> Result<()>
195	where
196		B: Backend,
197	{
198		if let Some(InitialData { participations, votes: on_chain_votes, leaf: first_leaf }) =
199			initial_data.take()
200		{
201			for (priority, request) in participations {
202				self.participation.queue_participation(ctx, priority, request).await?;
203			}
204
205			let mut overlay_db = OverlayedBackend::new(backend);
206
207			self.process_chain_import_backlog(
208				ctx,
209				&mut overlay_db,
210				on_chain_votes,
211				clock.now(),
212				first_leaf.hash,
213			)
214			.await;
215
216			if !overlay_db.is_empty() {
217				let ops = overlay_db.into_write_ops();
218				backend.write(ops)?;
219			}
220
221			// Also provide first leaf to participation for good measure.
222			self.participation
223				.process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf))
224				.await?;
225		}
226
227		loop {
228			gum::trace!(target: LOG_TARGET, "Waiting for message");
229			let mut overlay_db = OverlayedBackend::new(backend);
230			let default_confirm = Box::new(|| Ok(()));
231			let confirm_write =
232				match MuxedMessage::receive(ctx, &mut self.participation_receiver).await? {
233					MuxedMessage::Participation(msg) => {
234						gum::trace!(target: LOG_TARGET, "MuxedMessage::Participation");
235						let ParticipationStatement {
236							session,
237							candidate_hash,
238							candidate_receipt,
239							outcome,
240						} = self.participation.get_participation_result(ctx, msg).await?;
241						if let Some(valid) = outcome.validity() {
242							gum::trace!(
243								target: LOG_TARGET,
244								?session,
245								?candidate_hash,
246								?valid,
247								"Issuing local statement based on participation outcome."
248							);
249							self.issue_local_statement(
250								ctx,
251								&mut overlay_db,
252								candidate_hash,
253								candidate_receipt,
254								session,
255								valid,
256								clock.now(),
257							)
258							.await?;
259						} else {
260							gum::warn!(target: LOG_TARGET, ?outcome, "Dispute participation failed");
261						}
262						default_confirm
263					},
264					MuxedMessage::Subsystem(msg) => match msg {
265						FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
266						FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
267							gum::trace!(target: LOG_TARGET, "OverseerSignal::ActiveLeaves");
268							self.process_active_leaves_update(
269								ctx,
270								&mut overlay_db,
271								update,
272								clock.now(),
273							)
274							.await?;
275							default_confirm
276						},
277						FromOrchestra::Signal(OverseerSignal::BlockFinalized(_, n)) => {
278							gum::trace!(target: LOG_TARGET, "OverseerSignal::BlockFinalized");
279							self.scraper.process_finalized_block(&n);
280							default_confirm
281						},
282						FromOrchestra::Communication { msg } =>
283							self.handle_incoming(ctx, &mut overlay_db, msg, clock.now()).await?,
284					},
285				};
286
287			if !overlay_db.is_empty() {
288				let ops = overlay_db.into_write_ops();
289				backend.write(ops)?;
290			}
291			// even if the changeset was empty,
292			// otherwise the caller will error.
293			confirm_write()?;
294		}
295	}
296
297	async fn process_active_leaves_update<Context>(
298		&mut self,
299		ctx: &mut Context,
300		overlay_db: &mut OverlayedBackend<'_, impl Backend>,
301		update: ActiveLeavesUpdate,
302		now: u64,
303	) -> Result<()> {
304		gum::trace!(target: LOG_TARGET, timestamp = now, "Processing ActiveLeavesUpdate");
305		let scraped_updates =
306			self.scraper.process_active_leaves_update(ctx.sender(), &update).await?;
307		log_error(
308			self.participation
309				.bump_to_priority_for_candidates(ctx, &scraped_updates.included_receipts)
310				.await,
311		)?;
312		self.participation.process_active_leaves_update(ctx, &update).await?;
313
314		if let Some(new_leaf) = update.activated {
315			gum::trace!(
316				target: LOG_TARGET,
317				leaf_hash = ?new_leaf.hash,
318				block_number = new_leaf.number,
319				"Processing ActivatedLeaf"
320			);
321
322			let session_idx =
323				self.runtime_info.get_session_index_for_child(ctx.sender(), new_leaf.hash).await;
324
325			match session_idx {
326				Ok(session_idx)
327					if self.gaps_in_cache || session_idx > self.highest_session_seen =>
328				{
329					// Pin the block from the new session.
330					self.runtime_info.pin_block(session_idx, new_leaf.unpin_handle);
331					// Fetch the last `DISPUTE_WINDOW` number of sessions unless there are no gaps
332					// in cache and we are not missing too many `SessionInfo`s
333					let prune_up_to = session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1);
334					let fetch_lower_bound =
335						if !self.gaps_in_cache && self.highest_session_seen > prune_up_to {
336							self.highest_session_seen + 1
337						} else {
338							prune_up_to
339						};
340
341					// There is a new session. Perform a dummy fetch to cache it.
342					for idx in fetch_lower_bound..=session_idx {
343						if let Err(err) = self
344							.runtime_info
345							.get_session_info_by_index(ctx.sender(), new_leaf.hash, idx)
346							.await
347						{
348							gum::debug!(
349								target: LOG_TARGET,
350								session_idx,
351								leaf_hash = ?new_leaf.hash,
352								?err,
353								"Error caching SessionInfo on ActiveLeaves update"
354							);
355							self.gaps_in_cache = true;
356						}
357					}
358
359					self.highest_session_seen = session_idx;
360
361					db::v1::note_earliest_session(overlay_db, prune_up_to)?;
362					self.spam_slots.prune_old(prune_up_to);
363					self.offchain_disabled_validators.prune_old(prune_up_to);
364				},
365				Ok(_) => { /* no new session => nothing to cache */ },
366				Err(err) => {
367					gum::debug!(
368						target: LOG_TARGET,
369						?err,
370						"Failed to update session cache for disputes - can't fetch session index",
371					);
372				},
373			}
374
375			let ScrapedUpdates { unapplied_slashes, on_chain_votes, .. } = scraped_updates;
376
377			self.process_unapplied_slashes(ctx, new_leaf.hash, unapplied_slashes).await;
378
379			gum::trace!(
380				target: LOG_TARGET,
381				timestamp = now,
382				"Will process {} onchain votes",
383				on_chain_votes.len()
384			);
385
386			self.process_chain_import_backlog(ctx, overlay_db, on_chain_votes, now, new_leaf.hash)
387				.await;
388		}
389
390		gum::trace!(target: LOG_TARGET, timestamp = now, "Done processing ActiveLeavesUpdate");
391		Ok(())
392	}
393
394	/// For each unapplied (past-session) slash, report an unsigned extrinsic
395	/// to the runtime.
396	async fn process_unapplied_slashes<Context>(
397		&mut self,
398		ctx: &mut Context,
399		relay_parent: Hash,
400		unapplied_slashes: Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>,
401	) {
402		for (session_index, candidate_hash, pending) in unapplied_slashes {
403			gum::info!(
404				target: LOG_TARGET,
405				?session_index,
406				?candidate_hash,
407				n_slashes = pending.keys.len(),
408				"Processing unapplied validator slashes",
409			);
410
411			let pinned_hash = self.runtime_info.get_block_in_session(session_index);
412			let inclusions = self.scraper.get_blocks_including_candidate(&candidate_hash);
413			if pinned_hash.is_none() && inclusions.is_empty() {
414				gum::info!(
415					target: LOG_TARGET,
416					?session_index,
417					"Couldn't find blocks in the session for an unapplied slash",
418				);
419				return
420			}
421
422			// Find a relay block that we can use
423			// to generate key ownership proof on.
424			// We use inclusion parents as a fallback.
425			let mut key_ownership_proofs = Vec::new();
426			let mut dispute_proofs = Vec::new();
427
428			let blocks_in_the_session =
429				pinned_hash.into_iter().chain(inclusions.into_iter().map(|(_n, h)| h));
430			for hash in blocks_in_the_session {
431				for (validator_index, validator_id) in pending.keys.iter() {
432					let res = key_ownership_proof(ctx.sender(), hash, validator_id.clone()).await;
433
434					match res {
435						Ok(Some(key_ownership_proof)) => {
436							key_ownership_proofs.push(key_ownership_proof);
437							let time_slot =
438								slashing::DisputesTimeSlot::new(session_index, candidate_hash);
439							let dispute_proof = slashing::DisputeProof {
440								time_slot,
441								kind: pending.kind,
442								validator_index: *validator_index,
443								validator_id: validator_id.clone(),
444							};
445							dispute_proofs.push(dispute_proof);
446						},
447						Ok(None) => {},
448						Err(runtime::Error::RuntimeRequest(RuntimeApiError::NotSupported {
449							..
450						})) => {
451							gum::debug!(
452								target: LOG_TARGET,
453								?session_index,
454								?candidate_hash,
455								?validator_id,
456								"Key ownership proof not yet supported.",
457							);
458						},
459						Err(error) => {
460							gum::warn!(
461								target: LOG_TARGET,
462								?error,
463								?session_index,
464								?candidate_hash,
465								?validator_id,
466								"Could not generate key ownership proof",
467							);
468						},
469					}
470				}
471
472				if !key_ownership_proofs.is_empty() {
473					// If we found a parent that we can use, stop searching.
474					// If one key ownership was resolved successfully, all of them should be.
475					debug_assert_eq!(key_ownership_proofs.len(), pending.keys.len());
476					break
477				}
478			}
479
480			let expected_keys = pending.keys.len();
481			let resolved_keys = key_ownership_proofs.len();
482			if resolved_keys < expected_keys {
483				gum::warn!(
484					target: LOG_TARGET,
485					?session_index,
486					?candidate_hash,
487					"Could not generate key ownership proofs for {} keys",
488					expected_keys - resolved_keys,
489				);
490			}
491			debug_assert_eq!(resolved_keys, dispute_proofs.len());
492
493			for (key_ownership_proof, dispute_proof) in
494				key_ownership_proofs.into_iter().zip(dispute_proofs.into_iter())
495			{
496				let validator_id = dispute_proof.validator_id.clone();
497
498				gum::info!(
499					target: LOG_TARGET,
500					?session_index,
501					?candidate_hash,
502					key_ownership_proof_len = key_ownership_proof.len(),
503					"Trying to submit a slashing report",
504				);
505
506				let res = submit_report_dispute_lost(
507					ctx.sender(),
508					relay_parent,
509					dispute_proof,
510					key_ownership_proof,
511				)
512				.await;
513
514				match res {
515					Err(runtime::Error::RuntimeRequest(RuntimeApiError::NotSupported {
516						..
517					})) => {
518						gum::debug!(
519							target: LOG_TARGET,
520							?session_index,
521							?candidate_hash,
522							"Reporting pending slash not yet supported",
523						);
524					},
525					Err(error) => {
526						gum::warn!(
527							target: LOG_TARGET,
528							?error,
529							?session_index,
530							?candidate_hash,
531							"Error reporting pending slash",
532						);
533					},
534					Ok(Some(())) => {
535						gum::info!(
536							target: LOG_TARGET,
537							?session_index,
538							?candidate_hash,
539							?validator_id,
540							"Successfully reported pending slash",
541						);
542					},
543					Ok(None) => {
544						gum::debug!(
545							target: LOG_TARGET,
546							?session_index,
547							?candidate_hash,
548							?validator_id,
549							"Duplicate pending slash report",
550						);
551					},
552				}
553			}
554		}
555	}
556
557	/// Process one batch of our `chain_import_backlog`.
558	///
559	/// `new_votes` will be appended beforehand.
560	async fn process_chain_import_backlog<Context>(
561		&mut self,
562		ctx: &mut Context,
563		overlay_db: &mut OverlayedBackend<'_, impl Backend>,
564		new_votes: Vec<ScrapedOnChainVotes>,
565		now: u64,
566		block_hash: Hash,
567	) {
568		let mut chain_import_backlog = std::mem::take(&mut self.chain_import_backlog);
569		chain_import_backlog.extend(new_votes);
570		let import_range =
571			0..std::cmp::min(CHAIN_IMPORT_MAX_BATCH_SIZE, chain_import_backlog.len());
572		// The `runtime-api` subsystem has an internal queue which serializes the execution,
573		// so there is no point in running these in parallel
574		for votes in chain_import_backlog.drain(import_range) {
575			let res = self.process_on_chain_votes(ctx, overlay_db, votes, now, block_hash).await;
576			match res {
577				Ok(()) => {},
578				Err(error) => {
579					gum::warn!(target: LOG_TARGET, ?error, "Skipping scraping block due to error",);
580				},
581			};
582		}
583		self.chain_import_backlog = chain_import_backlog;
584	}
585
586	/// Scrapes on-chain votes (backing votes and concluded disputes) for a active leaf of the
587	/// relay chain.
588	async fn process_on_chain_votes<Context>(
589		&mut self,
590		ctx: &mut Context,
591		overlay_db: &mut OverlayedBackend<'_, impl Backend>,
592		votes: ScrapedOnChainVotes,
593		now: u64,
594		block_hash: Hash,
595	) -> Result<()> {
596		let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = votes;
597
598		if backing_validators_per_candidate.is_empty() && disputes.is_empty() {
599			return Ok(())
600		}
601
602		// Scraped on-chain backing votes for the candidates with
603		// the new active leaf as if we received them via gossip.
604		for (candidate_receipt, backers) in backing_validators_per_candidate {
605			// Obtain the session info, for sake of `ValidatorId`s
606			let relay_parent = candidate_receipt.descriptor.relay_parent();
607			let session_info = match self
608				.runtime_info
609				.get_session_info_by_index(ctx.sender(), relay_parent, session)
610				.await
611			{
612				Ok(extended_session_info) => &extended_session_info.session_info,
613				Err(err) => {
614					gum::warn!(
615						target: LOG_TARGET,
616						?session,
617						?err,
618						"Could not retrieve session info from RuntimeInfo",
619					);
620					return Ok(())
621				},
622			};
623
624			let candidate_hash = candidate_receipt.hash();
625			gum::trace!(
626				target: LOG_TARGET,
627				?candidate_hash,
628				?relay_parent,
629				"Importing backing votes from chain for candidate"
630			);
631			let statements = backers
632				.into_iter()
633				.filter_map(|(validator_index, attestation)| {
634					let validator_public: ValidatorId = session_info
635						.validators
636						.get(validator_index)
637						.or_else(|| {
638							gum::error!(
639								target: LOG_TARGET,
640								?session,
641								?validator_index,
642								"Missing public key for validator",
643							);
644							None
645						})
646						.cloned()?;
647					let validator_signature = attestation.signature().clone();
648					let valid_statement_kind =
649						match attestation.to_compact_statement(candidate_hash) {
650							CompactStatement::Seconded(_) =>
651								ValidDisputeStatementKind::BackingSeconded(relay_parent),
652							CompactStatement::Valid(_) =>
653								ValidDisputeStatementKind::BackingValid(relay_parent),
654						};
655					debug_assert!(
656						SignedDisputeStatement::new_checked(
657							DisputeStatement::Valid(valid_statement_kind.clone()),
658							candidate_hash,
659							session,
660							validator_public.clone(),
661							validator_signature.clone(),
662						).is_ok(),
663						"Scraped backing votes had invalid signature! candidate: {:?}, session: {:?}, validator_public: {:?}, validator_index: {}",
664						candidate_hash,
665						session,
666						validator_public,
667						validator_index.0,
668					);
669					let signed_dispute_statement =
670						SignedDisputeStatement::new_unchecked_from_trusted_source(
671							DisputeStatement::Valid(valid_statement_kind.clone()),
672							candidate_hash,
673							session,
674							validator_public,
675							validator_signature,
676						);
677					Some((signed_dispute_statement, validator_index))
678				})
679				.collect();
680
681			// Importantly, handling import statements for backing votes also
682			// clears spam slots for any newly backed candidates
683			let import_result = self
684				.handle_import_statements(
685					ctx,
686					overlay_db,
687					MaybeCandidateReceipt::Provides(candidate_receipt),
688					session,
689					statements,
690					now,
691				)
692				.await?;
693			match import_result {
694				ImportStatementsResult::ValidImport => gum::trace!(
695					target: LOG_TARGET,
696					?relay_parent,
697					?session,
698					"Imported backing votes from chain"
699				),
700				ImportStatementsResult::InvalidImport => gum::warn!(
701					target: LOG_TARGET,
702					?relay_parent,
703					?session,
704					"Attempted import of on-chain backing votes failed"
705				),
706			}
707		}
708
709		// Import disputes from on-chain, this already went through a vote so it's assumed
710		// as verified. This will only be stored, gossiping it is not necessary.
711		for DisputeStatementSet { candidate_hash, session, statements } in disputes {
712			gum::trace!(
713				target: LOG_TARGET,
714				?candidate_hash,
715				?session,
716				"Importing dispute votes from chain for candidate"
717			);
718			let session_info = match self
719				.runtime_info
720				.get_session_info_by_index(ctx.sender(), block_hash, session)
721				.await
722			{
723				Ok(extended_session_info) => &extended_session_info.session_info,
724				Err(err) => {
725					gum::warn!(
726						target: LOG_TARGET,
727						?candidate_hash,
728						?session,
729						?err,
730						"Could not retrieve session info for recently concluded dispute"
731					);
732					continue
733				},
734			};
735
736			let statements = statements
737				.into_iter()
738				.filter_map(|(dispute_statement, validator_index, validator_signature)| {
739					let validator_public: ValidatorId = session_info
740						.validators
741						.get(validator_index)
742						.or_else(|| {
743							gum::error!(
744								target: LOG_TARGET,
745								?candidate_hash,
746								?session,
747								"Missing public key for validator {:?} that participated in concluded dispute",
748								&validator_index
749							);
750							None
751						})
752						.cloned()?;
753
754					Some((
755						SignedDisputeStatement::new_unchecked_from_trusted_source(
756							dispute_statement,
757							candidate_hash,
758							session,
759							validator_public,
760							validator_signature,
761						),
762						validator_index,
763					))
764				})
765				.collect::<Vec<_>>();
766			if statements.is_empty() {
767				gum::debug!(target: LOG_TARGET, "Skipping empty from chain dispute import");
768				continue
769			}
770			let import_result = self
771				.handle_import_statements(
772					ctx,
773					overlay_db,
774					// TODO <https://github.com/paritytech/polkadot/issues/4011>
775					MaybeCandidateReceipt::AssumeBackingVotePresent(candidate_hash),
776					session,
777					statements,
778					now,
779				)
780				.await?;
781			match import_result {
782				ImportStatementsResult::ValidImport => gum::trace!(
783					target: LOG_TARGET,
784					?candidate_hash,
785					?session,
786					"Imported statement of dispute from on-chain"
787				),
788				ImportStatementsResult::InvalidImport => gum::warn!(
789					target: LOG_TARGET,
790					?candidate_hash,
791					?session,
792					"Attempted import of on-chain statement of dispute failed"
793				),
794			}
795		}
796
797		Ok(())
798	}
799
800	async fn handle_incoming<Context>(
801		&mut self,
802		ctx: &mut Context,
803		overlay_db: &mut OverlayedBackend<'_, impl Backend>,
804		message: DisputeCoordinatorMessage,
805		now: Timestamp,
806	) -> Result<Box<dyn FnOnce() -> JfyiResult<()>>> {
807		match message {
808			DisputeCoordinatorMessage::ImportStatements {
809				candidate_receipt,
810				session,
811				statements,
812				pending_confirmation,
813			} => {
814				gum::trace!(
815					target: LOG_TARGET,
816					candidate_hash = ?candidate_receipt.hash(),
817					?session,
818					"DisputeCoordinatorMessage::ImportStatements"
819				);
820				let outcome = self
821					.handle_import_statements(
822						ctx,
823						overlay_db,
824						MaybeCandidateReceipt::Provides(candidate_receipt),
825						session,
826						statements,
827						now,
828					)
829					.await?;
830				let report = move || match pending_confirmation {
831					Some(pending_confirmation) => pending_confirmation
832						.send(outcome)
833						.map_err(|_| JfyiError::DisputeImportOneshotSend),
834					None => Ok(()),
835				};
836
837				match outcome {
838					ImportStatementsResult::InvalidImport => {
839						report()?;
840					},
841					// In case of valid import, delay confirmation until actual disk write:
842					ImportStatementsResult::ValidImport => return Ok(Box::new(report)),
843				}
844			},
845			DisputeCoordinatorMessage::RecentDisputes(tx) => {
846				gum::trace!(target: LOG_TARGET, "Loading recent disputes from db");
847				let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
848					disputes
849				} else {
850					BTreeMap::new()
851				};
852				gum::trace!(target: LOG_TARGET, "Loaded recent disputes from db");
853
854				let _ = tx.send(recent_disputes);
855			},
856			DisputeCoordinatorMessage::ActiveDisputes(tx) => {
857				gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::ActiveDisputes");
858				let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
859					disputes
860				} else {
861					BTreeMap::new()
862				};
863
864				let _ = tx.send(
865					get_active_with_status(recent_disputes.into_iter(), now)
866						.collect::<BTreeMap<_, _>>(),
867				);
868			},
869			DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => {
870				gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::QueryCandidateVotes");
871				let mut query_output = Vec::new();
872				for (session_index, candidate_hash) in query {
873					if let Some(v) =
874						overlay_db.load_candidate_votes(session_index, &candidate_hash)?
875					{
876						query_output.push((session_index, candidate_hash, v.into()));
877					} else {
878						gum::debug!(
879							target: LOG_TARGET,
880							session_index,
881							"No votes found for candidate",
882						);
883					}
884				}
885				let _ = tx.send(query_output);
886			},
887			DisputeCoordinatorMessage::IssueLocalStatement(
888				session,
889				candidate_hash,
890				candidate_receipt,
891				valid,
892			) => {
893				gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::IssueLocalStatement");
894				self.issue_local_statement(
895					ctx,
896					overlay_db,
897					candidate_hash,
898					candidate_receipt,
899					session,
900					valid,
901					now,
902				)
903				.await?;
904			},
905			DisputeCoordinatorMessage::DetermineUndisputedChain {
906				base: (base_number, base_hash),
907				block_descriptions,
908				tx,
909			} => {
910				gum::trace!(
911					target: LOG_TARGET,
912					"DisputeCoordinatorMessage::DetermineUndisputedChain"
913				);
914
915				let undisputed_chain = determine_undisputed_chain(
916					overlay_db,
917					base_number,
918					base_hash,
919					block_descriptions,
920				)?;
921
922				let _ = tx.send(undisputed_chain);
923			},
924		}
925
926		Ok(Box::new(|| Ok(())))
927	}
928
929	// We use fatal result rather than result here. Reason being, We for example increase
930	// spam slots in this function. If then the import fails for some non fatal and
931	// unrelated reason, we should likely actually decrement previously incremented spam
932	// slots again, for non fatal errors - which is cumbersome and actually not needed
933	async fn handle_import_statements<Context>(
934		&mut self,
935		ctx: &mut Context,
936		overlay_db: &mut OverlayedBackend<'_, impl Backend>,
937		candidate_receipt: MaybeCandidateReceipt,
938		session: SessionIndex,
939		statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
940		now: Timestamp,
941	) -> FatalResult<ImportStatementsResult> {
942		gum::trace!(target: LOG_TARGET, ?statements, "In handle import statements");
943		if self.session_is_ancient(session) {
944			// It is not valid to participate in an ancient dispute (spam?) or too new.
945			return Ok(ImportStatementsResult::InvalidImport)
946		}
947
948		let candidate_hash = candidate_receipt.hash();
949		let votes_in_db = overlay_db.load_candidate_votes(session, &candidate_hash)?;
950		let relay_parent = match &candidate_receipt {
951			MaybeCandidateReceipt::Provides(candidate_receipt) =>
952				candidate_receipt.descriptor().relay_parent(),
953			MaybeCandidateReceipt::AssumeBackingVotePresent(candidate_hash) => match &votes_in_db {
954				Some(votes) => votes.candidate_receipt.descriptor().relay_parent(),
955				None => {
956					gum::warn!(
957						target: LOG_TARGET,
958						session,
959						?candidate_hash,
960						"Cannot obtain relay parent without `CandidateReceipt` available!"
961					);
962					return Ok(ImportStatementsResult::InvalidImport)
963				},
964			},
965		};
966
967		let env = match CandidateEnvironment::new(
968			ctx,
969			&mut self.runtime_info,
970			session,
971			relay_parent,
972			self.offchain_disabled_validators.iter(session),
973			&mut self.controlled_validator_indices,
974		)
975		.await
976		{
977			None => {
978				gum::warn!(
979					target: LOG_TARGET,
980					session,
981					"We are lacking a `SessionInfo` for handling import of statements."
982				);
983
984				return Ok(ImportStatementsResult::InvalidImport)
985			},
986			Some(env) => env,
987		};
988
989		let n_validators = env.validators().len();
990
991		gum::trace!(
992			target: LOG_TARGET,
993			?candidate_hash,
994			?session,
995			?n_validators,
996			"Number of validators"
997		);
998
999		// In case we are not provided with a candidate receipt
1000		// we operate under the assumption, that a previous vote
1001		// which included a `CandidateReceipt` was seen.
1002		// This holds since every block is preceded by the `Backing`-phase.
1003		//
1004		// There is one exception: A sufficiently sophisticated attacker could prevent
1005		// us from seeing the backing votes by withholding arbitrary blocks, and hence we do
1006		// not have a `CandidateReceipt` available.
1007		let old_state = match votes_in_db.map(CandidateVotes::from) {
1008			Some(votes) => CandidateVoteState::new(votes, &env, now),
1009			None =>
1010				if let MaybeCandidateReceipt::Provides(candidate_receipt) = candidate_receipt {
1011					CandidateVoteState::new_from_receipt(candidate_receipt)
1012				} else {
1013					gum::warn!(
1014						target: LOG_TARGET,
1015						session,
1016						?candidate_hash,
1017						"Cannot import votes, without `CandidateReceipt` available!"
1018					);
1019					return Ok(ImportStatementsResult::InvalidImport)
1020				},
1021		};
1022
1023		gum::trace!(target: LOG_TARGET, ?candidate_hash, ?session, "Loaded votes");
1024
1025		let controlled_indices = env.controlled_indices();
1026		let own_statements = statements
1027			.iter()
1028			.filter(|(statement, validator_index)| {
1029				controlled_indices.contains(validator_index) &&
1030					*statement.candidate_hash() == candidate_hash
1031			})
1032			.cloned()
1033			.collect::<Vec<_>>();
1034
1035		let import_result = {
1036			let intermediate_result = old_state.import_statements(&env, statements, now);
1037
1038			// Handle approval vote import:
1039			//
1040			// See guide: We import on fresh disputes to maximize likelihood of fetching votes for
1041			// dead forks and once concluded to maximize time for approval votes to trickle in.
1042			if intermediate_result.is_freshly_disputed() ||
1043				intermediate_result.is_freshly_concluded()
1044			{
1045				gum::trace!(
1046					target: LOG_TARGET,
1047					?candidate_hash,
1048					?session,
1049					"Requesting approval signatures"
1050				);
1051				let (tx, rx) = oneshot::channel();
1052				// Use of unbounded channels justified because:
1053				// 1. Only triggered twice per dispute.
1054				// 2. Raising a dispute is costly (requires validation + recovery) by honest nodes,
1055				// dishonest nodes are limited by spam slots.
1056				// 3. Concluding a dispute is even more costly.
1057				// Therefore it is reasonable to expect a simple vote request to succeed way faster
1058				// than disputes are raised.
1059				// 4. We are waiting (and blocking the whole subsystem) on a response right after -
1060				// therefore even with all else failing we will never have more than
1061				// one message in flight at any given time.
1062				ctx.send_unbounded_message(
1063					ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(
1064						candidate_hash,
1065						tx,
1066					),
1067				);
1068
1069				match rx.await {
1070					Err(_) => {
1071						gum::warn!(
1072							target: LOG_TARGET,
1073							"Fetch for approval votes got cancelled, only expected during shutdown!"
1074						);
1075						intermediate_result
1076					},
1077					Ok(votes) => {
1078						gum::trace!(
1079							target: LOG_TARGET,
1080							count = votes.len(),
1081							"Successfully received approval votes."
1082						);
1083						intermediate_result.import_approval_votes(&env, votes, now)
1084					},
1085				}
1086			} else {
1087				gum::trace!(
1088					target: LOG_TARGET,
1089					?candidate_hash,
1090					?session,
1091					"Not requested approval signatures"
1092				);
1093				intermediate_result
1094			}
1095		};
1096
1097		gum::trace!(
1098			target: LOG_TARGET,
1099			?candidate_hash,
1100			?session,
1101			?n_validators,
1102			"Import result ready"
1103		);
1104
1105		let new_state = import_result.new_state();
1106
1107		let is_included = self.scraper.is_candidate_included(&candidate_hash);
1108		let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
1109		let own_vote_missing = new_state.own_vote_missing();
1110		let is_disputed = new_state.is_disputed();
1111		let is_confirmed = new_state.is_confirmed();
1112		let is_disabled = |v: &ValidatorIndex| env.disabled_indices().contains(v);
1113		let potential_spam =
1114			is_potential_spam(&self.scraper, &new_state, &candidate_hash, is_disabled);
1115		let allow_participation = !potential_spam;
1116
1117		gum::trace!(
1118			target: LOG_TARGET,
1119			?own_vote_missing,
1120			?potential_spam,
1121			?is_included,
1122			?candidate_hash,
1123			confirmed = ?new_state.is_confirmed(),
1124			has_invalid_voters = ?!import_result.new_invalid_voters().is_empty(),
1125			n_disabled_validators = ?env.disabled_indices().len(),
1126			"Is spam?"
1127		);
1128
1129		// This check is responsible for all clearing of spam slots. It runs
1130		// whenever a vote is imported from on or off chain, and decrements
1131		// slots whenever a candidate is newly backed, confirmed, or has our
1132		// own vote.
1133		if !potential_spam {
1134			self.spam_slots.clear(&(session, candidate_hash));
1135
1136		// Potential spam:
1137		} else if !import_result.new_invalid_voters().is_empty() {
1138			let mut free_spam_slots_available = false;
1139			// Only allow import if at least one validator voting invalid, has not exceeded
1140			// its spam slots:
1141			for index in import_result.new_invalid_voters() {
1142				// Disputes can only be triggered via an invalidity stating vote, thus we only
1143				// need to increase spam slots on invalid votes. (If we did not, we would also
1144				// increase spam slots for backing validators for example - as validators have to
1145				// provide some opposing vote for dispute-distribution).
1146				free_spam_slots_available |=
1147					self.spam_slots.add_unconfirmed(session, candidate_hash, *index);
1148			}
1149			if !free_spam_slots_available {
1150				gum::debug!(
1151					target: LOG_TARGET,
1152					?candidate_hash,
1153					?session,
1154					invalid_voters = ?import_result.new_invalid_voters(),
1155					"Rejecting import because of full spam slots."
1156				);
1157				return Ok(ImportStatementsResult::InvalidImport)
1158			}
1159		}
1160
1161		// Participate in dispute if we did not cast a vote before and actually have keys to cast a
1162		// local vote. Disputes should fall in one of the categories below, otherwise we will
1163		// refrain from participation:
1164		// - `is_included` lands in prioritised queue
1165		// - `is_confirmed` | `is_backed` lands in best effort queue
1166		// We don't participate in disputes on finalized candidates.
1167		if own_vote_missing && is_disputed && allow_participation {
1168			let priority = ParticipationPriority::with_priority_if(is_included);
1169			gum::trace!(
1170				target: LOG_TARGET,
1171				?candidate_hash,
1172				?priority,
1173				"Queuing participation for candidate"
1174			);
1175			if priority.is_priority() {
1176				self.metrics.on_queued_priority_participation();
1177			} else {
1178				self.metrics.on_queued_best_effort_participation();
1179			}
1180			let request_timer = self.metrics.time_participation_pipeline();
1181			let r = self
1182				.participation
1183				.queue_participation(
1184					ctx,
1185					priority,
1186					ParticipationRequest::new(
1187						new_state.candidate_receipt().clone(),
1188						session,
1189						env.executor_params().clone(),
1190						request_timer,
1191					),
1192				)
1193				.await;
1194			log_error(r)?;
1195		} else {
1196			gum::trace!(
1197				target: LOG_TARGET,
1198				?candidate_hash,
1199				?is_confirmed,
1200				?own_vote_missing,
1201				?is_disputed,
1202				?allow_participation,
1203				?is_included,
1204				?is_backed,
1205				"Will not queue participation for candidate"
1206			);
1207
1208			if !allow_participation {
1209				self.metrics.on_refrained_participation();
1210			}
1211		}
1212
1213		// Also send any already existing approval vote on new disputes:
1214		if import_result.is_freshly_disputed() {
1215			let our_approval_votes = new_state.own_approval_votes().into_iter().flatten();
1216			for (validator_index, sig) in our_approval_votes {
1217				let pub_key = match env.validators().get(validator_index) {
1218					None => {
1219						gum::error!(
1220							target: LOG_TARGET,
1221							?validator_index,
1222							?session,
1223							"Could not find pub key in `SessionInfo` for our own approval vote!"
1224						);
1225						continue
1226					},
1227					Some(k) => k,
1228				};
1229				let statement = SignedDisputeStatement::new_unchecked_from_trusted_source(
1230					DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking),
1231					candidate_hash,
1232					session,
1233					pub_key.clone(),
1234					sig.clone(),
1235				);
1236				gum::trace!(
1237					target: LOG_TARGET,
1238					?candidate_hash,
1239					?session,
1240					?validator_index,
1241					"Sending out own approval vote"
1242				);
1243				match make_dispute_message(
1244					env.session_info(),
1245					&new_state.votes(),
1246					statement,
1247					validator_index,
1248				) {
1249					Err(err) => {
1250						gum::error!(
1251							target: LOG_TARGET,
1252							?err,
1253							"No ongoing dispute, but we checked there is one!"
1254						);
1255					},
1256					Ok(dispute_message) => {
1257						ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message))
1258							.await;
1259					},
1260				};
1261			}
1262		}
1263
1264		// All good, update recent disputes if state has changed:
1265		if let Some(new_status) = new_state.dispute_status() {
1266			// Only bother with db access, if there was an actual change.
1267			if import_result.dispute_state_changed() {
1268				let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
1269
1270				let status =
1271					recent_disputes.entry((session, candidate_hash)).or_insert_with(|| {
1272						gum::info!(
1273							target: LOG_TARGET,
1274							?candidate_hash,
1275							session,
1276							"New dispute initiated for candidate.",
1277						);
1278						DisputeStatus::active()
1279					});
1280
1281				*status = *new_status;
1282
1283				gum::trace!(
1284					target: LOG_TARGET,
1285					?candidate_hash,
1286					?status,
1287					has_concluded_for = ?new_state.has_concluded_for(),
1288					has_concluded_against = ?new_state.has_concluded_against(),
1289					"Writing recent disputes with updates for candidate"
1290				);
1291				overlay_db.write_recent_disputes(recent_disputes);
1292			}
1293		}
1294
1295		// Notify ChainSelection if a dispute has concluded against a candidate. ChainSelection
1296		// will need to mark the candidate's relay parent as reverted.
1297		if import_result.has_fresh_byzantine_threshold_against() {
1298			let blocks_including = self.scraper.get_blocks_including_candidate(&candidate_hash);
1299			for (parent_block_number, parent_block_hash) in &blocks_including {
1300				gum::warn!(
1301					target: LOG_TARGET,
1302					?candidate_hash,
1303					?parent_block_number,
1304					?parent_block_hash,
1305					"Dispute has just concluded against the candidate hash noted. Its parent will be marked as reverted."
1306				);
1307			}
1308			if blocks_including.len() > 0 {
1309				ctx.send_message(ChainSelectionMessage::RevertBlocks(blocks_including)).await;
1310			} else {
1311				gum::warn!(
1312					target: LOG_TARGET,
1313					?candidate_hash,
1314					?session,
1315					"Could not find an including block for candidate against which a dispute has concluded."
1316				);
1317			}
1318		}
1319
1320		// Update metrics:
1321		if import_result.is_freshly_disputed() {
1322			self.metrics.on_open();
1323		}
1324		self.metrics.on_valid_votes(import_result.imported_valid_votes());
1325		self.metrics.on_invalid_votes(import_result.imported_invalid_votes());
1326		gum::trace!(
1327			target: LOG_TARGET,
1328			?candidate_hash,
1329			?session,
1330			imported_approval_votes = ?import_result.imported_approval_votes(),
1331			imported_valid_votes = ?import_result.imported_valid_votes(),
1332			imported_invalid_votes = ?import_result.imported_invalid_votes(),
1333			total_valid_votes = ?import_result.new_state().votes().valid.raw().len(),
1334			total_invalid_votes = ?import_result.new_state().votes().invalid.len(),
1335			confirmed = ?import_result.new_state().is_confirmed(),
1336			"Import summary"
1337		);
1338
1339		self.metrics.on_approval_votes(import_result.imported_approval_votes());
1340		if import_result.is_freshly_concluded_for() {
1341			gum::info!(
1342				target: LOG_TARGET,
1343				?candidate_hash,
1344				session,
1345				"Dispute on candidate concluded with 'valid' result",
1346			);
1347			for (statement, validator_index) in own_statements.iter() {
1348				if statement.statement().indicates_invalidity() {
1349					gum::warn!(
1350						target: LOG_TARGET,
1351						?candidate_hash,
1352						?validator_index,
1353						"Voted against a candidate that was concluded valid.",
1354					);
1355				}
1356			}
1357			for validator_index in new_state.votes().invalid.keys() {
1358				gum::info!(
1359					target: LOG_TARGET,
1360					?candidate_hash,
1361					?validator_index,
1362					?session,
1363					"Disabled offchain for voting invalid against a valid candidate",
1364				);
1365				self.offchain_disabled_validators
1366					.insert_against_valid(session, *validator_index);
1367			}
1368			self.metrics.on_concluded_valid();
1369		}
1370		if import_result.is_freshly_concluded_against() {
1371			gum::info!(
1372				target: LOG_TARGET,
1373				?candidate_hash,
1374				session,
1375				"Dispute on candidate concluded with 'invalid' result",
1376			);
1377			for (statement, validator_index) in own_statements.iter() {
1378				if statement.statement().indicates_validity() {
1379					gum::warn!(
1380						target: LOG_TARGET,
1381						?candidate_hash,
1382						?validator_index,
1383						"Voted for a candidate that was concluded invalid.",
1384					);
1385				}
1386			}
1387			for (validator_index, (kind, _sig)) in new_state.votes().valid.raw() {
1388				let is_backer = kind.is_backing();
1389				gum::info!(
1390					target: LOG_TARGET,
1391					?candidate_hash,
1392					?validator_index,
1393					?session,
1394					?is_backer,
1395					"Disabled offchain for voting valid for an invalid candidate",
1396				);
1397				self.offchain_disabled_validators.insert_for_invalid(
1398					session,
1399					*validator_index,
1400					is_backer,
1401				);
1402			}
1403			self.metrics.on_concluded_invalid();
1404		}
1405
1406		// After validators are disabled, revisit active disputes to unactivate those where all
1407		// raising parties are now disabled
1408		if import_result.is_freshly_concluded_for() || import_result.is_freshly_concluded_against()
1409		{
1410			self.revisit_active_disputes_after_disabling(overlay_db, session)?;
1411		}
1412
1413		// Only write when votes have changed.
1414		if let Some(votes) = import_result.into_updated_votes() {
1415			overlay_db.write_candidate_votes(session, candidate_hash, votes.into());
1416		}
1417
1418		Ok(ImportStatementsResult::ValidImport)
1419	}
1420
1421	async fn issue_local_statement<Context>(
1422		&mut self,
1423		ctx: &mut Context,
1424		overlay_db: &mut OverlayedBackend<'_, impl Backend>,
1425		candidate_hash: CandidateHash,
1426		candidate_receipt: CandidateReceipt,
1427		session: SessionIndex,
1428		valid: bool,
1429		now: Timestamp,
1430	) -> Result<()> {
1431		gum::trace!(
1432			target: LOG_TARGET,
1433			?candidate_hash,
1434			?session,
1435			?valid,
1436			?now,
1437			"Issuing local statement for candidate!"
1438		);
1439
1440		// Load environment:
1441		let env = match CandidateEnvironment::new(
1442			ctx,
1443			&mut self.runtime_info,
1444			session,
1445			candidate_receipt.descriptor.relay_parent(),
1446			self.offchain_disabled_validators.iter(session),
1447			&mut self.controlled_validator_indices,
1448		)
1449		.await
1450		{
1451			None => {
1452				gum::warn!(
1453					target: LOG_TARGET,
1454					session,
1455					"Missing info for session which has an active dispute",
1456				);
1457
1458				return Ok(())
1459			},
1460			Some(env) => env,
1461		};
1462
1463		let votes = overlay_db
1464			.load_candidate_votes(session, &candidate_hash)?
1465			.map(CandidateVotes::from)
1466			.unwrap_or_else(|| CandidateVotes {
1467				candidate_receipt: candidate_receipt.clone(),
1468				valid: ValidCandidateVotes::new(),
1469				invalid: BTreeMap::new(),
1470			});
1471
1472		// Sign a statement for each validator index we control which has
1473		// not already voted. This should generally be maximum 1 statement.
1474		let voted_indices = votes.voted_indices();
1475		let mut statements = Vec::new();
1476
1477		let controlled_indices = env.controlled_indices();
1478		for index in controlled_indices {
1479			if voted_indices.contains(&index) {
1480				continue
1481			}
1482
1483			let keystore = self.keystore.clone() as Arc<_>;
1484			let res = SignedDisputeStatement::sign_explicit(
1485				&keystore,
1486				valid,
1487				candidate_hash,
1488				session,
1489				env.validators()
1490					.get(*index)
1491					.expect("`controlled_indices` are derived from `validators`; qed")
1492					.clone(),
1493			);
1494
1495			match res {
1496				Ok(Some(signed_dispute_statement)) => {
1497					statements.push((signed_dispute_statement, *index));
1498				},
1499				Ok(None) => {},
1500				Err(err) => {
1501					gum::error!(
1502						target: LOG_TARGET,
1503						?err,
1504						"Encountered keystore error while signing dispute statement",
1505					);
1506				},
1507			}
1508		}
1509
1510		// Get our message out:
1511		for (statement, index) in &statements {
1512			let dispute_message =
1513				match make_dispute_message(env.session_info(), &votes, statement.clone(), *index) {
1514					Err(err) => {
1515						gum::debug!(target: LOG_TARGET, ?err, "Creating dispute message failed.");
1516						continue
1517					},
1518					Ok(dispute_message) => dispute_message,
1519				};
1520
1521			ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message)).await;
1522		}
1523
1524		// Do import
1525		if !statements.is_empty() {
1526			match self
1527				.handle_import_statements(
1528					ctx,
1529					overlay_db,
1530					MaybeCandidateReceipt::Provides(candidate_receipt),
1531					session,
1532					statements,
1533					now,
1534				)
1535				.await?
1536			{
1537				ImportStatementsResult::InvalidImport => {
1538					gum::error!(
1539						target: LOG_TARGET,
1540						?candidate_hash,
1541						?session,
1542						"`handle_import_statements` considers our own votes invalid!"
1543					);
1544				},
1545				ImportStatementsResult::ValidImport => {
1546					gum::trace!(
1547						target: LOG_TARGET,
1548						?candidate_hash,
1549						?session,
1550						"`handle_import_statements` successfully imported our vote!"
1551					);
1552				},
1553			}
1554		}
1555
1556		Ok(())
1557	}
1558
1559	fn session_is_ancient(&self, session_idx: SessionIndex) -> bool {
1560		return session_idx < self.highest_session_seen.saturating_sub(DISPUTE_WINDOW.get() - 1)
1561	}
1562
1563	/// Revisit active non-confirmed disputes after validators have been disabled.
1564	/// Unactivates disputes where all raising parties (invalid voters) are now disabled.
1565	fn revisit_active_disputes_after_disabling(
1566		&mut self,
1567		overlay_db: &mut OverlayedBackend<'_, impl Backend>,
1568		session: SessionIndex,
1569	) -> FatalResult<()> {
1570		let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
1571		let mut disputes_to_remove = Vec::new();
1572
1573		// Create session bounds for efficient iteration
1574		let session_start = (session, CandidateHash(Hash::zero()));
1575		let session_end = (session + 1, CandidateHash(Hash::zero()));
1576
1577		for ((dispute_session, candidate_hash), status) in
1578			recent_disputes.range(&session_start..&session_end)
1579		{
1580			debug_assert_eq!(session, *dispute_session);
1581			// Only check unconfirmed
1582			if status.is_confirmed_concluded() {
1583				continue
1584			}
1585			let Some(votes) = overlay_db.load_candidate_votes(*dispute_session, candidate_hash)?
1586			else {
1587				continue
1588			};
1589			// Check if all invalid voters (raising parties) are disabled
1590			if !votes.invalid.is_empty() &&
1591				votes.invalid.iter().all(|(_, validator_index, _)| {
1592					self.offchain_disabled_validators.is_disabled(session, *validator_index)
1593				}) {
1594				disputes_to_remove.push((*dispute_session, *candidate_hash));
1595
1596				gum::info!(
1597					target: LOG_TARGET,
1598					session = dispute_session,
1599					?candidate_hash,
1600					invalid_voters = ?votes.invalid.iter().map(|(_, idx, _)| *idx).collect::<Vec<_>>(),
1601					"Unactivating dispute where all raising parties are now disabled"
1602				);
1603			}
1604		}
1605
1606		// Remove them from RecentDisputes (setting status to inactive)
1607		if !disputes_to_remove.is_empty() {
1608			for key in disputes_to_remove {
1609				recent_disputes.remove(&key);
1610				self.metrics.on_unactivated_dispute();
1611			}
1612			overlay_db.write_recent_disputes(recent_disputes);
1613		}
1614
1615		Ok(())
1616	}
1617}
1618
1619/// Messages to be handled in this subsystem.
1620enum MuxedMessage {
1621	/// Messages from other subsystems.
1622	Subsystem(FromOrchestra<DisputeCoordinatorMessage>),
1623	/// Messages from participation workers.
1624	Participation(participation::WorkerMessage),
1625}
1626
1627#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
1628impl MuxedMessage {
1629	async fn receive<Context>(
1630		ctx: &mut Context,
1631		from_sender: &mut participation::WorkerMessageReceiver,
1632	) -> FatalResult<Self> {
1633		// We are only fusing here to make `select` happy, in reality we will quit if the stream
1634		// ends.
1635		let from_overseer = ctx.recv().fuse();
1636		futures::pin_mut!(from_overseer, from_sender);
1637		futures::select!(
1638			msg = from_overseer => Ok(Self::Subsystem(msg.map_err(FatalError::SubsystemReceive)?)),
1639			msg = from_sender.next() => Ok(Self::Participation(msg.ok_or(FatalError::ParticipationWorkerReceiverExhausted)?)),
1640		)
1641	}
1642}
1643
1644#[derive(Debug, Clone)]
1645enum MaybeCandidateReceipt {
1646	/// Directly provides the candidate receipt.
1647	Provides(CandidateReceipt),
1648	/// Assumes it was seen before by means of seconded message.
1649	AssumeBackingVotePresent(CandidateHash),
1650}
1651
1652impl MaybeCandidateReceipt {
1653	/// Retrieve `CandidateHash` for the corresponding candidate.
1654	pub fn hash(&self) -> CandidateHash {
1655		match self {
1656			Self::Provides(receipt) => receipt.hash(),
1657			Self::AssumeBackingVotePresent(hash) => *hash,
1658		}
1659	}
1660}
1661
1662/// Determine the best block and its block number.
1663/// Assumes `block_descriptions` are sorted from the one
1664/// with the lowest `BlockNumber` to the highest.
1665fn determine_undisputed_chain(
1666	overlay_db: &mut OverlayedBackend<'_, impl Backend>,
1667	base_number: BlockNumber,
1668	base_hash: Hash,
1669	block_descriptions: Vec<BlockDescription>,
1670) -> Result<(BlockNumber, Hash)> {
1671	let last = block_descriptions
1672		.last()
1673		.map(|e| (base_number + block_descriptions.len() as BlockNumber, e.block_hash))
1674		.unwrap_or((base_number, base_hash));
1675
1676	// Fast path for no disputes.
1677	let recent_disputes = match overlay_db.load_recent_disputes()? {
1678		None => return Ok(last),
1679		Some(a) if a.is_empty() => return Ok(last),
1680		Some(a) => a,
1681	};
1682
1683	let is_possibly_invalid = |session, candidate_hash| {
1684		recent_disputes
1685			.get(&(session, candidate_hash))
1686			.map_or(false, |status| status.is_possibly_invalid())
1687	};
1688
1689	for (i, BlockDescription { session, candidates, .. }) in block_descriptions.iter().enumerate() {
1690		if candidates.iter().any(|c| is_possibly_invalid(*session, *c)) {
1691			if i == 0 {
1692				return Ok((base_number, base_hash))
1693			} else {
1694				return Ok((base_number + i as BlockNumber, block_descriptions[i - 1].block_hash))
1695			}
1696		}
1697	}
1698
1699	Ok(last)
1700}
1701
1702/// Ideally, we want to use the top `byzantine_threshold` offenders here based on the amount of
1703/// stake slashed. However, given that slashing might be applied with a delay, we want to have
1704/// some list of offenders as soon as disputes conclude offchain. This list only approximates
1705/// the top offenders and only accounts for lost disputes. But that should be good enough to
1706/// prevent spam attacks.
1707#[derive(Default)]
1708pub struct OffchainDisabledValidators {
1709	per_session: BTreeMap<SessionIndex, LostSessionDisputes>,
1710}
1711
1712struct LostSessionDisputes {
1713	// We separate lost disputes to prioritize "for invalid" offenders. And among those, we
1714	// prioritize backing votes the most. There's no need to limit the size of these sets, as they
1715	// are already limited by the number of validators in the session. We use `LruMap` to ensure
1716	// the iteration order prioritizes most recently disputes lost over older ones in case we reach
1717	// the limit.
1718	backers_for_invalid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
1719	for_invalid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
1720	against_valid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
1721}
1722
1723impl Default for LostSessionDisputes {
1724	fn default() -> Self {
1725		Self {
1726			backers_for_invalid: LruMap::new(UnlimitedCompact),
1727			for_invalid: LruMap::new(UnlimitedCompact),
1728			against_valid: LruMap::new(UnlimitedCompact),
1729		}
1730	}
1731}
1732
1733impl OffchainDisabledValidators {
1734	/// Creates a new instance populated from concluded disputes
1735	pub fn new_from_state(
1736		disputes: &RecentDisputes,
1737		load_candidate_votes: impl Fn(SessionIndex, &CandidateHash) -> Option<CandidateVotes>,
1738		earliest_session: SessionIndex,
1739	) -> Self {
1740		let mut disabled_validators = Self::default();
1741
1742		// Process concluded disputes to identify validators that should be disabled
1743		for ((session, candidate_hash), dispute_status) in disputes {
1744			let session = *session;
1745			// Only process concluded disputes
1746			if dispute_status.concluded_at().is_none() {
1747				continue
1748			}
1749			if session < earliest_session {
1750				continue
1751			}
1752
1753			// Get votes for this dispute
1754			let votes = match load_candidate_votes(session, candidate_hash) {
1755				Some(votes) => votes,
1756				None => continue,
1757			};
1758
1759			// Process votes based on dispute outcome
1760			if dispute_status.has_concluded_for() {
1761				// Dispute concluded with candidate being valid - track validators that voted
1762				// against
1763				for (validator_index, _) in votes.invalid.iter() {
1764					disabled_validators.insert_against_valid(session, *validator_index);
1765				}
1766			} else if dispute_status.has_concluded_against() {
1767				// Dispute concluded with candidate being invalid - track validators that voted for
1768				for (validator_index, (kind, _)) in votes.valid.raw().iter() {
1769					let is_backer = kind.is_backing();
1770					disabled_validators.insert_for_invalid(session, *validator_index, is_backer);
1771				}
1772			}
1773		}
1774
1775		disabled_validators
1776	}
1777
1778	/// Prune state for ancient disputes.
1779	pub fn prune_old(&mut self, up_to_excluding: SessionIndex) {
1780		// split_off returns everything after the given key, including the key.
1781		let mut relevant = self.per_session.split_off(&up_to_excluding);
1782		std::mem::swap(&mut relevant, &mut self.per_session);
1783	}
1784
1785	/// Disable a validator who voted for an invalid candidate.
1786	pub fn insert_for_invalid(
1787		&mut self,
1788		session_index: SessionIndex,
1789		validator_index: ValidatorIndex,
1790		is_backer: bool,
1791	) {
1792		let entry = self.per_session.entry(session_index).or_default();
1793		if is_backer {
1794			entry.backers_for_invalid.insert(validator_index, ());
1795		} else {
1796			entry.for_invalid.insert(validator_index, ());
1797		}
1798	}
1799
1800	/// Disable a validator who voted against a valid candidate.
1801	pub fn insert_against_valid(
1802		&mut self,
1803		session_index: SessionIndex,
1804		validator_index: ValidatorIndex,
1805	) {
1806		self.per_session
1807			.entry(session_index)
1808			.or_default()
1809			.against_valid
1810			.insert(validator_index, ());
1811	}
1812
1813	/// Iterate over all validators that are offchain disabled.
1814	/// The order of iteration prioritizes `for_invalid` offenders (and backers among those) over
1815	/// `against_valid` offenders. And most recently lost disputes over older ones.
1816	/// NOTE: the iterator might contain duplicates.
1817	pub fn iter(&self, session_index: SessionIndex) -> impl Iterator<Item = ValidatorIndex> + '_ {
1818		self.per_session.get(&session_index).into_iter().flat_map(|e| {
1819			e.backers_for_invalid
1820				.iter()
1821				.chain(e.for_invalid.iter())
1822				.chain(e.against_valid.iter())
1823				.map(|(i, _)| *i)
1824		})
1825	}
1826
1827	/// Check if a validator is disabled for a given session.
1828	pub fn is_disabled(
1829		&self,
1830		session_index: SessionIndex,
1831		validator_index: ValidatorIndex,
1832	) -> bool {
1833		self.per_session
1834			.get(&session_index)
1835			.map(|session_disputes| {
1836				session_disputes.backers_for_invalid.peek(&validator_index).is_some() ||
1837					session_disputes.for_invalid.peek(&validator_index).is_some() ||
1838					session_disputes.against_valid.peek(&validator_index).is_some()
1839			})
1840			.unwrap_or(false)
1841	}
1842}