1use std::{
20 collections::{BTreeMap, VecDeque},
21 sync::Arc,
22};
23
24use futures::{
25 channel::{mpsc, oneshot},
26 FutureExt, StreamExt,
27};
28
29use sc_keystore::LocalKeystore;
30
31use polkadot_node_primitives::{
32 disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement,
33 Timestamp, DISPUTE_WINDOW,
34};
35use polkadot_node_subsystem::{
36 messages::{
37 ApprovalVotingParallelMessage, BlockDescription, ChainSelectionMessage,
38 DisputeCoordinatorMessage, DisputeDistributionMessage, ImportStatementsResult,
39 },
40 overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError,
41};
42use polkadot_node_subsystem_util::{
43 runtime::{self, key_ownership_proof, submit_report_dispute_lost, RuntimeInfo},
44 ControlledValidatorIndices,
45};
46use polkadot_primitives::{
47 slashing, BlockNumber, CandidateHash, CandidateReceiptV2 as CandidateReceipt, CompactStatement,
48 DisputeStatement, DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex,
49 ValidDisputeStatementKind, ValidatorId, ValidatorIndex,
50};
51use schnellru::{LruMap, UnlimitedCompact};
52
53use crate::{
54 db::{self, v1::RecentDisputes},
55 error::{log_error, FatalError, FatalResult, JfyiError, JfyiResult, Result},
56 import::{CandidateEnvironment, CandidateVoteState},
57 is_potential_spam,
58 metrics::Metrics,
59 scraping::ScrapedUpdates,
60 status::{get_active_with_status, Clock},
61 DisputeCoordinatorSubsystem, LOG_TARGET,
62};
63
64use super::{
65 backend::Backend,
66 make_dispute_message,
67 participation::{
68 self, Participation, ParticipationPriority, ParticipationRequest, ParticipationStatement,
69 WorkerMessageReceiver,
70 },
71 scraping::ChainScraper,
72 spam_slots::SpamSlots,
73 OverlayedBackend,
74};
75
76const CHAIN_IMPORT_MAX_BATCH_SIZE: usize = 8;
81
82pub struct InitialData {
84 pub participations: Vec<(ParticipationPriority, ParticipationRequest)>,
85 pub votes: Vec<ScrapedOnChainVotes>,
86 pub leaf: ActivatedLeaf,
87}
88
89pub(crate) struct Initialized {
95 keystore: Arc<LocalKeystore>,
96 runtime_info: RuntimeInfo,
97 offchain_disabled_validators: OffchainDisabledValidators,
100 controlled_validator_indices: ControlledValidatorIndices,
102 highest_session_seen: SessionIndex,
105 gaps_in_cache: bool,
107 spam_slots: SpamSlots,
108 participation: Participation,
109 scraper: ChainScraper,
110 participation_receiver: WorkerMessageReceiver,
111 chain_import_backlog: VecDeque<ScrapedOnChainVotes>,
122 metrics: Metrics,
123}
124
125#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
126impl Initialized {
127 pub fn new(
129 subsystem: DisputeCoordinatorSubsystem,
130 runtime_info: RuntimeInfo,
131 spam_slots: SpamSlots,
132 scraper: ChainScraper,
133 highest_session_seen: SessionIndex,
134 gaps_in_cache: bool,
135 offchain_disabled_validators: OffchainDisabledValidators,
136 controlled_validator_indices: ControlledValidatorIndices,
137 ) -> Self {
138 let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem;
139
140 let (participation_sender, participation_receiver) = mpsc::channel(1);
141 let participation = Participation::new(participation_sender, metrics.clone());
142
143 Self {
144 keystore,
145 runtime_info,
146 offchain_disabled_validators,
147 controlled_validator_indices,
148 highest_session_seen,
149 gaps_in_cache,
150 spam_slots,
151 scraper,
152 participation,
153 participation_receiver,
154 chain_import_backlog: VecDeque::new(),
155 metrics,
156 }
157 }
158
159 pub async fn run<B, Context>(
163 mut self,
164 mut ctx: Context,
165 mut backend: B,
166 mut initial_data: Option<InitialData>,
167 clock: Box<dyn Clock>,
168 ) -> FatalResult<()>
169 where
170 B: Backend,
171 {
172 loop {
173 let res =
174 self.run_until_error(&mut ctx, &mut backend, &mut initial_data, &*clock).await;
175 if let Ok(()) = res {
176 gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
177 return Ok(())
178 }
179 log_error(res)?;
180 }
181 }
182
183 async fn run_until_error<B, Context>(
189 &mut self,
190 ctx: &mut Context,
191 backend: &mut B,
192 initial_data: &mut Option<InitialData>,
193 clock: &dyn Clock,
194 ) -> Result<()>
195 where
196 B: Backend,
197 {
198 if let Some(InitialData { participations, votes: on_chain_votes, leaf: first_leaf }) =
199 initial_data.take()
200 {
201 for (priority, request) in participations {
202 self.participation.queue_participation(ctx, priority, request).await?;
203 }
204
205 let mut overlay_db = OverlayedBackend::new(backend);
206
207 self.process_chain_import_backlog(
208 ctx,
209 &mut overlay_db,
210 on_chain_votes,
211 clock.now(),
212 first_leaf.hash,
213 )
214 .await;
215
216 if !overlay_db.is_empty() {
217 let ops = overlay_db.into_write_ops();
218 backend.write(ops)?;
219 }
220
221 self.participation
223 .process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf))
224 .await?;
225 }
226
227 loop {
228 gum::trace!(target: LOG_TARGET, "Waiting for message");
229 let mut overlay_db = OverlayedBackend::new(backend);
230 let default_confirm = Box::new(|| Ok(()));
231 let confirm_write =
232 match MuxedMessage::receive(ctx, &mut self.participation_receiver).await? {
233 MuxedMessage::Participation(msg) => {
234 gum::trace!(target: LOG_TARGET, "MuxedMessage::Participation");
235 let ParticipationStatement {
236 session,
237 candidate_hash,
238 candidate_receipt,
239 outcome,
240 } = self.participation.get_participation_result(ctx, msg).await?;
241 if let Some(valid) = outcome.validity() {
242 gum::trace!(
243 target: LOG_TARGET,
244 ?session,
245 ?candidate_hash,
246 ?valid,
247 "Issuing local statement based on participation outcome."
248 );
249 self.issue_local_statement(
250 ctx,
251 &mut overlay_db,
252 candidate_hash,
253 candidate_receipt,
254 session,
255 valid,
256 clock.now(),
257 )
258 .await?;
259 } else {
260 gum::warn!(target: LOG_TARGET, ?outcome, "Dispute participation failed");
261 }
262 default_confirm
263 },
264 MuxedMessage::Subsystem(msg) => match msg {
265 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
266 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
267 gum::trace!(target: LOG_TARGET, "OverseerSignal::ActiveLeaves");
268 self.process_active_leaves_update(
269 ctx,
270 &mut overlay_db,
271 update,
272 clock.now(),
273 )
274 .await?;
275 default_confirm
276 },
277 FromOrchestra::Signal(OverseerSignal::BlockFinalized(_, n)) => {
278 gum::trace!(target: LOG_TARGET, "OverseerSignal::BlockFinalized");
279 self.scraper.process_finalized_block(&n);
280 default_confirm
281 },
282 FromOrchestra::Communication { msg } =>
283 self.handle_incoming(ctx, &mut overlay_db, msg, clock.now()).await?,
284 },
285 };
286
287 if !overlay_db.is_empty() {
288 let ops = overlay_db.into_write_ops();
289 backend.write(ops)?;
290 }
291 confirm_write()?;
294 }
295 }
296
297 async fn process_active_leaves_update<Context>(
298 &mut self,
299 ctx: &mut Context,
300 overlay_db: &mut OverlayedBackend<'_, impl Backend>,
301 update: ActiveLeavesUpdate,
302 now: u64,
303 ) -> Result<()> {
304 gum::trace!(target: LOG_TARGET, timestamp = now, "Processing ActiveLeavesUpdate");
305 let scraped_updates =
306 self.scraper.process_active_leaves_update(ctx.sender(), &update).await?;
307 log_error(
308 self.participation
309 .bump_to_priority_for_candidates(ctx, &scraped_updates.included_receipts)
310 .await,
311 )?;
312 self.participation.process_active_leaves_update(ctx, &update).await?;
313
314 if let Some(new_leaf) = update.activated {
315 gum::trace!(
316 target: LOG_TARGET,
317 leaf_hash = ?new_leaf.hash,
318 block_number = new_leaf.number,
319 "Processing ActivatedLeaf"
320 );
321
322 let session_idx =
323 self.runtime_info.get_session_index_for_child(ctx.sender(), new_leaf.hash).await;
324
325 match session_idx {
326 Ok(session_idx)
327 if self.gaps_in_cache || session_idx > self.highest_session_seen =>
328 {
329 self.runtime_info.pin_block(session_idx, new_leaf.unpin_handle);
331 let prune_up_to = session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1);
334 let fetch_lower_bound =
335 if !self.gaps_in_cache && self.highest_session_seen > prune_up_to {
336 self.highest_session_seen + 1
337 } else {
338 prune_up_to
339 };
340
341 for idx in fetch_lower_bound..=session_idx {
343 if let Err(err) = self
344 .runtime_info
345 .get_session_info_by_index(ctx.sender(), new_leaf.hash, idx)
346 .await
347 {
348 gum::debug!(
349 target: LOG_TARGET,
350 session_idx,
351 leaf_hash = ?new_leaf.hash,
352 ?err,
353 "Error caching SessionInfo on ActiveLeaves update"
354 );
355 self.gaps_in_cache = true;
356 }
357 }
358
359 self.highest_session_seen = session_idx;
360
361 db::v1::note_earliest_session(overlay_db, prune_up_to)?;
362 self.spam_slots.prune_old(prune_up_to);
363 self.offchain_disabled_validators.prune_old(prune_up_to);
364 },
365 Ok(_) => { },
366 Err(err) => {
367 gum::debug!(
368 target: LOG_TARGET,
369 ?err,
370 "Failed to update session cache for disputes - can't fetch session index",
371 );
372 },
373 }
374
375 let ScrapedUpdates { unapplied_slashes, on_chain_votes, .. } = scraped_updates;
376
377 self.process_unapplied_slashes(ctx, new_leaf.hash, unapplied_slashes).await;
378
379 gum::trace!(
380 target: LOG_TARGET,
381 timestamp = now,
382 "Will process {} onchain votes",
383 on_chain_votes.len()
384 );
385
386 self.process_chain_import_backlog(ctx, overlay_db, on_chain_votes, now, new_leaf.hash)
387 .await;
388 }
389
390 gum::trace!(target: LOG_TARGET, timestamp = now, "Done processing ActiveLeavesUpdate");
391 Ok(())
392 }
393
394 async fn process_unapplied_slashes<Context>(
397 &mut self,
398 ctx: &mut Context,
399 relay_parent: Hash,
400 unapplied_slashes: Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>,
401 ) {
402 for (session_index, candidate_hash, pending) in unapplied_slashes {
403 gum::info!(
404 target: LOG_TARGET,
405 ?session_index,
406 ?candidate_hash,
407 n_slashes = pending.keys.len(),
408 "Processing unapplied validator slashes",
409 );
410
411 let pinned_hash = self.runtime_info.get_block_in_session(session_index);
412 let inclusions = self.scraper.get_blocks_including_candidate(&candidate_hash);
413 if pinned_hash.is_none() && inclusions.is_empty() {
414 gum::info!(
415 target: LOG_TARGET,
416 ?session_index,
417 "Couldn't find blocks in the session for an unapplied slash",
418 );
419 return
420 }
421
422 let mut key_ownership_proofs = Vec::new();
426 let mut dispute_proofs = Vec::new();
427
428 let blocks_in_the_session =
429 pinned_hash.into_iter().chain(inclusions.into_iter().map(|(_n, h)| h));
430 for hash in blocks_in_the_session {
431 for (validator_index, validator_id) in pending.keys.iter() {
432 let res = key_ownership_proof(ctx.sender(), hash, validator_id.clone()).await;
433
434 match res {
435 Ok(Some(key_ownership_proof)) => {
436 key_ownership_proofs.push(key_ownership_proof);
437 let time_slot =
438 slashing::DisputesTimeSlot::new(session_index, candidate_hash);
439 let dispute_proof = slashing::DisputeProof {
440 time_slot,
441 kind: pending.kind,
442 validator_index: *validator_index,
443 validator_id: validator_id.clone(),
444 };
445 dispute_proofs.push(dispute_proof);
446 },
447 Ok(None) => {},
448 Err(runtime::Error::RuntimeRequest(RuntimeApiError::NotSupported {
449 ..
450 })) => {
451 gum::debug!(
452 target: LOG_TARGET,
453 ?session_index,
454 ?candidate_hash,
455 ?validator_id,
456 "Key ownership proof not yet supported.",
457 );
458 },
459 Err(error) => {
460 gum::warn!(
461 target: LOG_TARGET,
462 ?error,
463 ?session_index,
464 ?candidate_hash,
465 ?validator_id,
466 "Could not generate key ownership proof",
467 );
468 },
469 }
470 }
471
472 if !key_ownership_proofs.is_empty() {
473 debug_assert_eq!(key_ownership_proofs.len(), pending.keys.len());
476 break
477 }
478 }
479
480 let expected_keys = pending.keys.len();
481 let resolved_keys = key_ownership_proofs.len();
482 if resolved_keys < expected_keys {
483 gum::warn!(
484 target: LOG_TARGET,
485 ?session_index,
486 ?candidate_hash,
487 "Could not generate key ownership proofs for {} keys",
488 expected_keys - resolved_keys,
489 );
490 }
491 debug_assert_eq!(resolved_keys, dispute_proofs.len());
492
493 for (key_ownership_proof, dispute_proof) in
494 key_ownership_proofs.into_iter().zip(dispute_proofs.into_iter())
495 {
496 let validator_id = dispute_proof.validator_id.clone();
497
498 gum::info!(
499 target: LOG_TARGET,
500 ?session_index,
501 ?candidate_hash,
502 key_ownership_proof_len = key_ownership_proof.len(),
503 "Trying to submit a slashing report",
504 );
505
506 let res = submit_report_dispute_lost(
507 ctx.sender(),
508 relay_parent,
509 dispute_proof,
510 key_ownership_proof,
511 )
512 .await;
513
514 match res {
515 Err(runtime::Error::RuntimeRequest(RuntimeApiError::NotSupported {
516 ..
517 })) => {
518 gum::debug!(
519 target: LOG_TARGET,
520 ?session_index,
521 ?candidate_hash,
522 "Reporting pending slash not yet supported",
523 );
524 },
525 Err(error) => {
526 gum::warn!(
527 target: LOG_TARGET,
528 ?error,
529 ?session_index,
530 ?candidate_hash,
531 "Error reporting pending slash",
532 );
533 },
534 Ok(Some(())) => {
535 gum::info!(
536 target: LOG_TARGET,
537 ?session_index,
538 ?candidate_hash,
539 ?validator_id,
540 "Successfully reported pending slash",
541 );
542 },
543 Ok(None) => {
544 gum::debug!(
545 target: LOG_TARGET,
546 ?session_index,
547 ?candidate_hash,
548 ?validator_id,
549 "Duplicate pending slash report",
550 );
551 },
552 }
553 }
554 }
555 }
556
557 async fn process_chain_import_backlog<Context>(
561 &mut self,
562 ctx: &mut Context,
563 overlay_db: &mut OverlayedBackend<'_, impl Backend>,
564 new_votes: Vec<ScrapedOnChainVotes>,
565 now: u64,
566 block_hash: Hash,
567 ) {
568 let mut chain_import_backlog = std::mem::take(&mut self.chain_import_backlog);
569 chain_import_backlog.extend(new_votes);
570 let import_range =
571 0..std::cmp::min(CHAIN_IMPORT_MAX_BATCH_SIZE, chain_import_backlog.len());
572 for votes in chain_import_backlog.drain(import_range) {
575 let res = self.process_on_chain_votes(ctx, overlay_db, votes, now, block_hash).await;
576 match res {
577 Ok(()) => {},
578 Err(error) => {
579 gum::warn!(target: LOG_TARGET, ?error, "Skipping scraping block due to error",);
580 },
581 };
582 }
583 self.chain_import_backlog = chain_import_backlog;
584 }
585
586 async fn process_on_chain_votes<Context>(
589 &mut self,
590 ctx: &mut Context,
591 overlay_db: &mut OverlayedBackend<'_, impl Backend>,
592 votes: ScrapedOnChainVotes,
593 now: u64,
594 block_hash: Hash,
595 ) -> Result<()> {
596 let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = votes;
597
598 if backing_validators_per_candidate.is_empty() && disputes.is_empty() {
599 return Ok(())
600 }
601
602 for (candidate_receipt, backers) in backing_validators_per_candidate {
605 let relay_parent = candidate_receipt.descriptor.relay_parent();
607 let session_info = match self
608 .runtime_info
609 .get_session_info_by_index(ctx.sender(), relay_parent, session)
610 .await
611 {
612 Ok(extended_session_info) => &extended_session_info.session_info,
613 Err(err) => {
614 gum::warn!(
615 target: LOG_TARGET,
616 ?session,
617 ?err,
618 "Could not retrieve session info from RuntimeInfo",
619 );
620 return Ok(())
621 },
622 };
623
624 let candidate_hash = candidate_receipt.hash();
625 gum::trace!(
626 target: LOG_TARGET,
627 ?candidate_hash,
628 ?relay_parent,
629 "Importing backing votes from chain for candidate"
630 );
631 let statements = backers
632 .into_iter()
633 .filter_map(|(validator_index, attestation)| {
634 let validator_public: ValidatorId = session_info
635 .validators
636 .get(validator_index)
637 .or_else(|| {
638 gum::error!(
639 target: LOG_TARGET,
640 ?session,
641 ?validator_index,
642 "Missing public key for validator",
643 );
644 None
645 })
646 .cloned()?;
647 let validator_signature = attestation.signature().clone();
648 let valid_statement_kind =
649 match attestation.to_compact_statement(candidate_hash) {
650 CompactStatement::Seconded(_) =>
651 ValidDisputeStatementKind::BackingSeconded(relay_parent),
652 CompactStatement::Valid(_) =>
653 ValidDisputeStatementKind::BackingValid(relay_parent),
654 };
655 debug_assert!(
656 SignedDisputeStatement::new_checked(
657 DisputeStatement::Valid(valid_statement_kind.clone()),
658 candidate_hash,
659 session,
660 validator_public.clone(),
661 validator_signature.clone(),
662 ).is_ok(),
663 "Scraped backing votes had invalid signature! candidate: {:?}, session: {:?}, validator_public: {:?}, validator_index: {}",
664 candidate_hash,
665 session,
666 validator_public,
667 validator_index.0,
668 );
669 let signed_dispute_statement =
670 SignedDisputeStatement::new_unchecked_from_trusted_source(
671 DisputeStatement::Valid(valid_statement_kind.clone()),
672 candidate_hash,
673 session,
674 validator_public,
675 validator_signature,
676 );
677 Some((signed_dispute_statement, validator_index))
678 })
679 .collect();
680
681 let import_result = self
684 .handle_import_statements(
685 ctx,
686 overlay_db,
687 MaybeCandidateReceipt::Provides(candidate_receipt),
688 session,
689 statements,
690 now,
691 )
692 .await?;
693 match import_result {
694 ImportStatementsResult::ValidImport => gum::trace!(
695 target: LOG_TARGET,
696 ?relay_parent,
697 ?session,
698 "Imported backing votes from chain"
699 ),
700 ImportStatementsResult::InvalidImport => gum::warn!(
701 target: LOG_TARGET,
702 ?relay_parent,
703 ?session,
704 "Attempted import of on-chain backing votes failed"
705 ),
706 }
707 }
708
709 for DisputeStatementSet { candidate_hash, session, statements } in disputes {
712 gum::trace!(
713 target: LOG_TARGET,
714 ?candidate_hash,
715 ?session,
716 "Importing dispute votes from chain for candidate"
717 );
718 let session_info = match self
719 .runtime_info
720 .get_session_info_by_index(ctx.sender(), block_hash, session)
721 .await
722 {
723 Ok(extended_session_info) => &extended_session_info.session_info,
724 Err(err) => {
725 gum::warn!(
726 target: LOG_TARGET,
727 ?candidate_hash,
728 ?session,
729 ?err,
730 "Could not retrieve session info for recently concluded dispute"
731 );
732 continue
733 },
734 };
735
736 let statements = statements
737 .into_iter()
738 .filter_map(|(dispute_statement, validator_index, validator_signature)| {
739 let validator_public: ValidatorId = session_info
740 .validators
741 .get(validator_index)
742 .or_else(|| {
743 gum::error!(
744 target: LOG_TARGET,
745 ?candidate_hash,
746 ?session,
747 "Missing public key for validator {:?} that participated in concluded dispute",
748 &validator_index
749 );
750 None
751 })
752 .cloned()?;
753
754 Some((
755 SignedDisputeStatement::new_unchecked_from_trusted_source(
756 dispute_statement,
757 candidate_hash,
758 session,
759 validator_public,
760 validator_signature,
761 ),
762 validator_index,
763 ))
764 })
765 .collect::<Vec<_>>();
766 if statements.is_empty() {
767 gum::debug!(target: LOG_TARGET, "Skipping empty from chain dispute import");
768 continue
769 }
770 let import_result = self
771 .handle_import_statements(
772 ctx,
773 overlay_db,
774 MaybeCandidateReceipt::AssumeBackingVotePresent(candidate_hash),
776 session,
777 statements,
778 now,
779 )
780 .await?;
781 match import_result {
782 ImportStatementsResult::ValidImport => gum::trace!(
783 target: LOG_TARGET,
784 ?candidate_hash,
785 ?session,
786 "Imported statement of dispute from on-chain"
787 ),
788 ImportStatementsResult::InvalidImport => gum::warn!(
789 target: LOG_TARGET,
790 ?candidate_hash,
791 ?session,
792 "Attempted import of on-chain statement of dispute failed"
793 ),
794 }
795 }
796
797 Ok(())
798 }
799
800 async fn handle_incoming<Context>(
801 &mut self,
802 ctx: &mut Context,
803 overlay_db: &mut OverlayedBackend<'_, impl Backend>,
804 message: DisputeCoordinatorMessage,
805 now: Timestamp,
806 ) -> Result<Box<dyn FnOnce() -> JfyiResult<()>>> {
807 match message {
808 DisputeCoordinatorMessage::ImportStatements {
809 candidate_receipt,
810 session,
811 statements,
812 pending_confirmation,
813 } => {
814 gum::trace!(
815 target: LOG_TARGET,
816 candidate_hash = ?candidate_receipt.hash(),
817 ?session,
818 "DisputeCoordinatorMessage::ImportStatements"
819 );
820 let outcome = self
821 .handle_import_statements(
822 ctx,
823 overlay_db,
824 MaybeCandidateReceipt::Provides(candidate_receipt),
825 session,
826 statements,
827 now,
828 )
829 .await?;
830 let report = move || match pending_confirmation {
831 Some(pending_confirmation) => pending_confirmation
832 .send(outcome)
833 .map_err(|_| JfyiError::DisputeImportOneshotSend),
834 None => Ok(()),
835 };
836
837 match outcome {
838 ImportStatementsResult::InvalidImport => {
839 report()?;
840 },
841 ImportStatementsResult::ValidImport => return Ok(Box::new(report)),
843 }
844 },
845 DisputeCoordinatorMessage::RecentDisputes(tx) => {
846 gum::trace!(target: LOG_TARGET, "Loading recent disputes from db");
847 let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
848 disputes
849 } else {
850 BTreeMap::new()
851 };
852 gum::trace!(target: LOG_TARGET, "Loaded recent disputes from db");
853
854 let _ = tx.send(recent_disputes);
855 },
856 DisputeCoordinatorMessage::ActiveDisputes(tx) => {
857 gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::ActiveDisputes");
858 let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? {
859 disputes
860 } else {
861 BTreeMap::new()
862 };
863
864 let _ = tx.send(
865 get_active_with_status(recent_disputes.into_iter(), now)
866 .collect::<BTreeMap<_, _>>(),
867 );
868 },
869 DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => {
870 gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::QueryCandidateVotes");
871 let mut query_output = Vec::new();
872 for (session_index, candidate_hash) in query {
873 if let Some(v) =
874 overlay_db.load_candidate_votes(session_index, &candidate_hash)?
875 {
876 query_output.push((session_index, candidate_hash, v.into()));
877 } else {
878 gum::debug!(
879 target: LOG_TARGET,
880 session_index,
881 "No votes found for candidate",
882 );
883 }
884 }
885 let _ = tx.send(query_output);
886 },
887 DisputeCoordinatorMessage::IssueLocalStatement(
888 session,
889 candidate_hash,
890 candidate_receipt,
891 valid,
892 ) => {
893 gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::IssueLocalStatement");
894 self.issue_local_statement(
895 ctx,
896 overlay_db,
897 candidate_hash,
898 candidate_receipt,
899 session,
900 valid,
901 now,
902 )
903 .await?;
904 },
905 DisputeCoordinatorMessage::DetermineUndisputedChain {
906 base: (base_number, base_hash),
907 block_descriptions,
908 tx,
909 } => {
910 gum::trace!(
911 target: LOG_TARGET,
912 "DisputeCoordinatorMessage::DetermineUndisputedChain"
913 );
914
915 let undisputed_chain = determine_undisputed_chain(
916 overlay_db,
917 base_number,
918 base_hash,
919 block_descriptions,
920 )?;
921
922 let _ = tx.send(undisputed_chain);
923 },
924 }
925
926 Ok(Box::new(|| Ok(())))
927 }
928
929 async fn handle_import_statements<Context>(
934 &mut self,
935 ctx: &mut Context,
936 overlay_db: &mut OverlayedBackend<'_, impl Backend>,
937 candidate_receipt: MaybeCandidateReceipt,
938 session: SessionIndex,
939 statements: Vec<(SignedDisputeStatement, ValidatorIndex)>,
940 now: Timestamp,
941 ) -> FatalResult<ImportStatementsResult> {
942 gum::trace!(target: LOG_TARGET, ?statements, "In handle import statements");
943 if self.session_is_ancient(session) {
944 return Ok(ImportStatementsResult::InvalidImport)
946 }
947
948 let candidate_hash = candidate_receipt.hash();
949 let votes_in_db = overlay_db.load_candidate_votes(session, &candidate_hash)?;
950 let relay_parent = match &candidate_receipt {
951 MaybeCandidateReceipt::Provides(candidate_receipt) =>
952 candidate_receipt.descriptor().relay_parent(),
953 MaybeCandidateReceipt::AssumeBackingVotePresent(candidate_hash) => match &votes_in_db {
954 Some(votes) => votes.candidate_receipt.descriptor().relay_parent(),
955 None => {
956 gum::warn!(
957 target: LOG_TARGET,
958 session,
959 ?candidate_hash,
960 "Cannot obtain relay parent without `CandidateReceipt` available!"
961 );
962 return Ok(ImportStatementsResult::InvalidImport)
963 },
964 },
965 };
966
967 let env = match CandidateEnvironment::new(
968 ctx,
969 &mut self.runtime_info,
970 session,
971 relay_parent,
972 self.offchain_disabled_validators.iter(session),
973 &mut self.controlled_validator_indices,
974 )
975 .await
976 {
977 None => {
978 gum::warn!(
979 target: LOG_TARGET,
980 session,
981 "We are lacking a `SessionInfo` for handling import of statements."
982 );
983
984 return Ok(ImportStatementsResult::InvalidImport)
985 },
986 Some(env) => env,
987 };
988
989 let n_validators = env.validators().len();
990
991 gum::trace!(
992 target: LOG_TARGET,
993 ?candidate_hash,
994 ?session,
995 ?n_validators,
996 "Number of validators"
997 );
998
999 let old_state = match votes_in_db.map(CandidateVotes::from) {
1008 Some(votes) => CandidateVoteState::new(votes, &env, now),
1009 None =>
1010 if let MaybeCandidateReceipt::Provides(candidate_receipt) = candidate_receipt {
1011 CandidateVoteState::new_from_receipt(candidate_receipt)
1012 } else {
1013 gum::warn!(
1014 target: LOG_TARGET,
1015 session,
1016 ?candidate_hash,
1017 "Cannot import votes, without `CandidateReceipt` available!"
1018 );
1019 return Ok(ImportStatementsResult::InvalidImport)
1020 },
1021 };
1022
1023 gum::trace!(target: LOG_TARGET, ?candidate_hash, ?session, "Loaded votes");
1024
1025 let controlled_indices = env.controlled_indices();
1026 let own_statements = statements
1027 .iter()
1028 .filter(|(statement, validator_index)| {
1029 controlled_indices.contains(validator_index) &&
1030 *statement.candidate_hash() == candidate_hash
1031 })
1032 .cloned()
1033 .collect::<Vec<_>>();
1034
1035 let import_result = {
1036 let intermediate_result = old_state.import_statements(&env, statements, now);
1037
1038 if intermediate_result.is_freshly_disputed() ||
1043 intermediate_result.is_freshly_concluded()
1044 {
1045 gum::trace!(
1046 target: LOG_TARGET,
1047 ?candidate_hash,
1048 ?session,
1049 "Requesting approval signatures"
1050 );
1051 let (tx, rx) = oneshot::channel();
1052 ctx.send_unbounded_message(
1063 ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(
1064 candidate_hash,
1065 tx,
1066 ),
1067 );
1068
1069 match rx.await {
1070 Err(_) => {
1071 gum::warn!(
1072 target: LOG_TARGET,
1073 "Fetch for approval votes got cancelled, only expected during shutdown!"
1074 );
1075 intermediate_result
1076 },
1077 Ok(votes) => {
1078 gum::trace!(
1079 target: LOG_TARGET,
1080 count = votes.len(),
1081 "Successfully received approval votes."
1082 );
1083 intermediate_result.import_approval_votes(&env, votes, now)
1084 },
1085 }
1086 } else {
1087 gum::trace!(
1088 target: LOG_TARGET,
1089 ?candidate_hash,
1090 ?session,
1091 "Not requested approval signatures"
1092 );
1093 intermediate_result
1094 }
1095 };
1096
1097 gum::trace!(
1098 target: LOG_TARGET,
1099 ?candidate_hash,
1100 ?session,
1101 ?n_validators,
1102 "Import result ready"
1103 );
1104
1105 let new_state = import_result.new_state();
1106
1107 let is_included = self.scraper.is_candidate_included(&candidate_hash);
1108 let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
1109 let own_vote_missing = new_state.own_vote_missing();
1110 let is_disputed = new_state.is_disputed();
1111 let is_confirmed = new_state.is_confirmed();
1112 let is_disabled = |v: &ValidatorIndex| env.disabled_indices().contains(v);
1113 let potential_spam =
1114 is_potential_spam(&self.scraper, &new_state, &candidate_hash, is_disabled);
1115 let allow_participation = !potential_spam;
1116
1117 gum::trace!(
1118 target: LOG_TARGET,
1119 ?own_vote_missing,
1120 ?potential_spam,
1121 ?is_included,
1122 ?candidate_hash,
1123 confirmed = ?new_state.is_confirmed(),
1124 has_invalid_voters = ?!import_result.new_invalid_voters().is_empty(),
1125 n_disabled_validators = ?env.disabled_indices().len(),
1126 "Is spam?"
1127 );
1128
1129 if !potential_spam {
1134 self.spam_slots.clear(&(session, candidate_hash));
1135
1136 } else if !import_result.new_invalid_voters().is_empty() {
1138 let mut free_spam_slots_available = false;
1139 for index in import_result.new_invalid_voters() {
1142 free_spam_slots_available |=
1147 self.spam_slots.add_unconfirmed(session, candidate_hash, *index);
1148 }
1149 if !free_spam_slots_available {
1150 gum::debug!(
1151 target: LOG_TARGET,
1152 ?candidate_hash,
1153 ?session,
1154 invalid_voters = ?import_result.new_invalid_voters(),
1155 "Rejecting import because of full spam slots."
1156 );
1157 return Ok(ImportStatementsResult::InvalidImport)
1158 }
1159 }
1160
1161 if own_vote_missing && is_disputed && allow_participation {
1168 let priority = ParticipationPriority::with_priority_if(is_included);
1169 gum::trace!(
1170 target: LOG_TARGET,
1171 ?candidate_hash,
1172 ?priority,
1173 "Queuing participation for candidate"
1174 );
1175 if priority.is_priority() {
1176 self.metrics.on_queued_priority_participation();
1177 } else {
1178 self.metrics.on_queued_best_effort_participation();
1179 }
1180 let request_timer = self.metrics.time_participation_pipeline();
1181 let r = self
1182 .participation
1183 .queue_participation(
1184 ctx,
1185 priority,
1186 ParticipationRequest::new(
1187 new_state.candidate_receipt().clone(),
1188 session,
1189 env.executor_params().clone(),
1190 request_timer,
1191 ),
1192 )
1193 .await;
1194 log_error(r)?;
1195 } else {
1196 gum::trace!(
1197 target: LOG_TARGET,
1198 ?candidate_hash,
1199 ?is_confirmed,
1200 ?own_vote_missing,
1201 ?is_disputed,
1202 ?allow_participation,
1203 ?is_included,
1204 ?is_backed,
1205 "Will not queue participation for candidate"
1206 );
1207
1208 if !allow_participation {
1209 self.metrics.on_refrained_participation();
1210 }
1211 }
1212
1213 if import_result.is_freshly_disputed() {
1215 let our_approval_votes = new_state.own_approval_votes().into_iter().flatten();
1216 for (validator_index, sig) in our_approval_votes {
1217 let pub_key = match env.validators().get(validator_index) {
1218 None => {
1219 gum::error!(
1220 target: LOG_TARGET,
1221 ?validator_index,
1222 ?session,
1223 "Could not find pub key in `SessionInfo` for our own approval vote!"
1224 );
1225 continue
1226 },
1227 Some(k) => k,
1228 };
1229 let statement = SignedDisputeStatement::new_unchecked_from_trusted_source(
1230 DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalChecking),
1231 candidate_hash,
1232 session,
1233 pub_key.clone(),
1234 sig.clone(),
1235 );
1236 gum::trace!(
1237 target: LOG_TARGET,
1238 ?candidate_hash,
1239 ?session,
1240 ?validator_index,
1241 "Sending out own approval vote"
1242 );
1243 match make_dispute_message(
1244 env.session_info(),
1245 &new_state.votes(),
1246 statement,
1247 validator_index,
1248 ) {
1249 Err(err) => {
1250 gum::error!(
1251 target: LOG_TARGET,
1252 ?err,
1253 "No ongoing dispute, but we checked there is one!"
1254 );
1255 },
1256 Ok(dispute_message) => {
1257 ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message))
1258 .await;
1259 },
1260 };
1261 }
1262 }
1263
1264 if let Some(new_status) = new_state.dispute_status() {
1266 if import_result.dispute_state_changed() {
1268 let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
1269
1270 let status =
1271 recent_disputes.entry((session, candidate_hash)).or_insert_with(|| {
1272 gum::info!(
1273 target: LOG_TARGET,
1274 ?candidate_hash,
1275 session,
1276 "New dispute initiated for candidate.",
1277 );
1278 DisputeStatus::active()
1279 });
1280
1281 *status = *new_status;
1282
1283 gum::trace!(
1284 target: LOG_TARGET,
1285 ?candidate_hash,
1286 ?status,
1287 has_concluded_for = ?new_state.has_concluded_for(),
1288 has_concluded_against = ?new_state.has_concluded_against(),
1289 "Writing recent disputes with updates for candidate"
1290 );
1291 overlay_db.write_recent_disputes(recent_disputes);
1292 }
1293 }
1294
1295 if import_result.has_fresh_byzantine_threshold_against() {
1298 let blocks_including = self.scraper.get_blocks_including_candidate(&candidate_hash);
1299 for (parent_block_number, parent_block_hash) in &blocks_including {
1300 gum::warn!(
1301 target: LOG_TARGET,
1302 ?candidate_hash,
1303 ?parent_block_number,
1304 ?parent_block_hash,
1305 "Dispute has just concluded against the candidate hash noted. Its parent will be marked as reverted."
1306 );
1307 }
1308 if blocks_including.len() > 0 {
1309 ctx.send_message(ChainSelectionMessage::RevertBlocks(blocks_including)).await;
1310 } else {
1311 gum::warn!(
1312 target: LOG_TARGET,
1313 ?candidate_hash,
1314 ?session,
1315 "Could not find an including block for candidate against which a dispute has concluded."
1316 );
1317 }
1318 }
1319
1320 if import_result.is_freshly_disputed() {
1322 self.metrics.on_open();
1323 }
1324 self.metrics.on_valid_votes(import_result.imported_valid_votes());
1325 self.metrics.on_invalid_votes(import_result.imported_invalid_votes());
1326 gum::trace!(
1327 target: LOG_TARGET,
1328 ?candidate_hash,
1329 ?session,
1330 imported_approval_votes = ?import_result.imported_approval_votes(),
1331 imported_valid_votes = ?import_result.imported_valid_votes(),
1332 imported_invalid_votes = ?import_result.imported_invalid_votes(),
1333 total_valid_votes = ?import_result.new_state().votes().valid.raw().len(),
1334 total_invalid_votes = ?import_result.new_state().votes().invalid.len(),
1335 confirmed = ?import_result.new_state().is_confirmed(),
1336 "Import summary"
1337 );
1338
1339 self.metrics.on_approval_votes(import_result.imported_approval_votes());
1340 if import_result.is_freshly_concluded_for() {
1341 gum::info!(
1342 target: LOG_TARGET,
1343 ?candidate_hash,
1344 session,
1345 "Dispute on candidate concluded with 'valid' result",
1346 );
1347 for (statement, validator_index) in own_statements.iter() {
1348 if statement.statement().indicates_invalidity() {
1349 gum::warn!(
1350 target: LOG_TARGET,
1351 ?candidate_hash,
1352 ?validator_index,
1353 "Voted against a candidate that was concluded valid.",
1354 );
1355 }
1356 }
1357 for validator_index in new_state.votes().invalid.keys() {
1358 gum::info!(
1359 target: LOG_TARGET,
1360 ?candidate_hash,
1361 ?validator_index,
1362 ?session,
1363 "Disabled offchain for voting invalid against a valid candidate",
1364 );
1365 self.offchain_disabled_validators
1366 .insert_against_valid(session, *validator_index);
1367 }
1368 self.metrics.on_concluded_valid();
1369 }
1370 if import_result.is_freshly_concluded_against() {
1371 gum::info!(
1372 target: LOG_TARGET,
1373 ?candidate_hash,
1374 session,
1375 "Dispute on candidate concluded with 'invalid' result",
1376 );
1377 for (statement, validator_index) in own_statements.iter() {
1378 if statement.statement().indicates_validity() {
1379 gum::warn!(
1380 target: LOG_TARGET,
1381 ?candidate_hash,
1382 ?validator_index,
1383 "Voted for a candidate that was concluded invalid.",
1384 );
1385 }
1386 }
1387 for (validator_index, (kind, _sig)) in new_state.votes().valid.raw() {
1388 let is_backer = kind.is_backing();
1389 gum::info!(
1390 target: LOG_TARGET,
1391 ?candidate_hash,
1392 ?validator_index,
1393 ?session,
1394 ?is_backer,
1395 "Disabled offchain for voting valid for an invalid candidate",
1396 );
1397 self.offchain_disabled_validators.insert_for_invalid(
1398 session,
1399 *validator_index,
1400 is_backer,
1401 );
1402 }
1403 self.metrics.on_concluded_invalid();
1404 }
1405
1406 if import_result.is_freshly_concluded_for() || import_result.is_freshly_concluded_against()
1409 {
1410 self.revisit_active_disputes_after_disabling(overlay_db, session)?;
1411 }
1412
1413 if let Some(votes) = import_result.into_updated_votes() {
1415 overlay_db.write_candidate_votes(session, candidate_hash, votes.into());
1416 }
1417
1418 Ok(ImportStatementsResult::ValidImport)
1419 }
1420
1421 async fn issue_local_statement<Context>(
1422 &mut self,
1423 ctx: &mut Context,
1424 overlay_db: &mut OverlayedBackend<'_, impl Backend>,
1425 candidate_hash: CandidateHash,
1426 candidate_receipt: CandidateReceipt,
1427 session: SessionIndex,
1428 valid: bool,
1429 now: Timestamp,
1430 ) -> Result<()> {
1431 gum::trace!(
1432 target: LOG_TARGET,
1433 ?candidate_hash,
1434 ?session,
1435 ?valid,
1436 ?now,
1437 "Issuing local statement for candidate!"
1438 );
1439
1440 let env = match CandidateEnvironment::new(
1442 ctx,
1443 &mut self.runtime_info,
1444 session,
1445 candidate_receipt.descriptor.relay_parent(),
1446 self.offchain_disabled_validators.iter(session),
1447 &mut self.controlled_validator_indices,
1448 )
1449 .await
1450 {
1451 None => {
1452 gum::warn!(
1453 target: LOG_TARGET,
1454 session,
1455 "Missing info for session which has an active dispute",
1456 );
1457
1458 return Ok(())
1459 },
1460 Some(env) => env,
1461 };
1462
1463 let votes = overlay_db
1464 .load_candidate_votes(session, &candidate_hash)?
1465 .map(CandidateVotes::from)
1466 .unwrap_or_else(|| CandidateVotes {
1467 candidate_receipt: candidate_receipt.clone(),
1468 valid: ValidCandidateVotes::new(),
1469 invalid: BTreeMap::new(),
1470 });
1471
1472 let voted_indices = votes.voted_indices();
1475 let mut statements = Vec::new();
1476
1477 let controlled_indices = env.controlled_indices();
1478 for index in controlled_indices {
1479 if voted_indices.contains(&index) {
1480 continue
1481 }
1482
1483 let keystore = self.keystore.clone() as Arc<_>;
1484 let res = SignedDisputeStatement::sign_explicit(
1485 &keystore,
1486 valid,
1487 candidate_hash,
1488 session,
1489 env.validators()
1490 .get(*index)
1491 .expect("`controlled_indices` are derived from `validators`; qed")
1492 .clone(),
1493 );
1494
1495 match res {
1496 Ok(Some(signed_dispute_statement)) => {
1497 statements.push((signed_dispute_statement, *index));
1498 },
1499 Ok(None) => {},
1500 Err(err) => {
1501 gum::error!(
1502 target: LOG_TARGET,
1503 ?err,
1504 "Encountered keystore error while signing dispute statement",
1505 );
1506 },
1507 }
1508 }
1509
1510 for (statement, index) in &statements {
1512 let dispute_message =
1513 match make_dispute_message(env.session_info(), &votes, statement.clone(), *index) {
1514 Err(err) => {
1515 gum::debug!(target: LOG_TARGET, ?err, "Creating dispute message failed.");
1516 continue
1517 },
1518 Ok(dispute_message) => dispute_message,
1519 };
1520
1521 ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message)).await;
1522 }
1523
1524 if !statements.is_empty() {
1526 match self
1527 .handle_import_statements(
1528 ctx,
1529 overlay_db,
1530 MaybeCandidateReceipt::Provides(candidate_receipt),
1531 session,
1532 statements,
1533 now,
1534 )
1535 .await?
1536 {
1537 ImportStatementsResult::InvalidImport => {
1538 gum::error!(
1539 target: LOG_TARGET,
1540 ?candidate_hash,
1541 ?session,
1542 "`handle_import_statements` considers our own votes invalid!"
1543 );
1544 },
1545 ImportStatementsResult::ValidImport => {
1546 gum::trace!(
1547 target: LOG_TARGET,
1548 ?candidate_hash,
1549 ?session,
1550 "`handle_import_statements` successfully imported our vote!"
1551 );
1552 },
1553 }
1554 }
1555
1556 Ok(())
1557 }
1558
1559 fn session_is_ancient(&self, session_idx: SessionIndex) -> bool {
1560 return session_idx < self.highest_session_seen.saturating_sub(DISPUTE_WINDOW.get() - 1)
1561 }
1562
1563 fn revisit_active_disputes_after_disabling(
1566 &mut self,
1567 overlay_db: &mut OverlayedBackend<'_, impl Backend>,
1568 session: SessionIndex,
1569 ) -> FatalResult<()> {
1570 let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default();
1571 let mut disputes_to_remove = Vec::new();
1572
1573 let session_start = (session, CandidateHash(Hash::zero()));
1575 let session_end = (session + 1, CandidateHash(Hash::zero()));
1576
1577 for ((dispute_session, candidate_hash), status) in
1578 recent_disputes.range(&session_start..&session_end)
1579 {
1580 debug_assert_eq!(session, *dispute_session);
1581 if status.is_confirmed_concluded() {
1583 continue
1584 }
1585 let Some(votes) = overlay_db.load_candidate_votes(*dispute_session, candidate_hash)?
1586 else {
1587 continue
1588 };
1589 if !votes.invalid.is_empty() &&
1591 votes.invalid.iter().all(|(_, validator_index, _)| {
1592 self.offchain_disabled_validators.is_disabled(session, *validator_index)
1593 }) {
1594 disputes_to_remove.push((*dispute_session, *candidate_hash));
1595
1596 gum::info!(
1597 target: LOG_TARGET,
1598 session = dispute_session,
1599 ?candidate_hash,
1600 invalid_voters = ?votes.invalid.iter().map(|(_, idx, _)| *idx).collect::<Vec<_>>(),
1601 "Unactivating dispute where all raising parties are now disabled"
1602 );
1603 }
1604 }
1605
1606 if !disputes_to_remove.is_empty() {
1608 for key in disputes_to_remove {
1609 recent_disputes.remove(&key);
1610 self.metrics.on_unactivated_dispute();
1611 }
1612 overlay_db.write_recent_disputes(recent_disputes);
1613 }
1614
1615 Ok(())
1616 }
1617}
1618
1619enum MuxedMessage {
1621 Subsystem(FromOrchestra<DisputeCoordinatorMessage>),
1623 Participation(participation::WorkerMessage),
1625}
1626
1627#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)]
1628impl MuxedMessage {
1629 async fn receive<Context>(
1630 ctx: &mut Context,
1631 from_sender: &mut participation::WorkerMessageReceiver,
1632 ) -> FatalResult<Self> {
1633 let from_overseer = ctx.recv().fuse();
1636 futures::pin_mut!(from_overseer, from_sender);
1637 futures::select!(
1638 msg = from_overseer => Ok(Self::Subsystem(msg.map_err(FatalError::SubsystemReceive)?)),
1639 msg = from_sender.next() => Ok(Self::Participation(msg.ok_or(FatalError::ParticipationWorkerReceiverExhausted)?)),
1640 )
1641 }
1642}
1643
1644#[derive(Debug, Clone)]
1645enum MaybeCandidateReceipt {
1646 Provides(CandidateReceipt),
1648 AssumeBackingVotePresent(CandidateHash),
1650}
1651
1652impl MaybeCandidateReceipt {
1653 pub fn hash(&self) -> CandidateHash {
1655 match self {
1656 Self::Provides(receipt) => receipt.hash(),
1657 Self::AssumeBackingVotePresent(hash) => *hash,
1658 }
1659 }
1660}
1661
1662fn determine_undisputed_chain(
1666 overlay_db: &mut OverlayedBackend<'_, impl Backend>,
1667 base_number: BlockNumber,
1668 base_hash: Hash,
1669 block_descriptions: Vec<BlockDescription>,
1670) -> Result<(BlockNumber, Hash)> {
1671 let last = block_descriptions
1672 .last()
1673 .map(|e| (base_number + block_descriptions.len() as BlockNumber, e.block_hash))
1674 .unwrap_or((base_number, base_hash));
1675
1676 let recent_disputes = match overlay_db.load_recent_disputes()? {
1678 None => return Ok(last),
1679 Some(a) if a.is_empty() => return Ok(last),
1680 Some(a) => a,
1681 };
1682
1683 let is_possibly_invalid = |session, candidate_hash| {
1684 recent_disputes
1685 .get(&(session, candidate_hash))
1686 .map_or(false, |status| status.is_possibly_invalid())
1687 };
1688
1689 for (i, BlockDescription { session, candidates, .. }) in block_descriptions.iter().enumerate() {
1690 if candidates.iter().any(|c| is_possibly_invalid(*session, *c)) {
1691 if i == 0 {
1692 return Ok((base_number, base_hash))
1693 } else {
1694 return Ok((base_number + i as BlockNumber, block_descriptions[i - 1].block_hash))
1695 }
1696 }
1697 }
1698
1699 Ok(last)
1700}
1701
1702#[derive(Default)]
1708pub struct OffchainDisabledValidators {
1709 per_session: BTreeMap<SessionIndex, LostSessionDisputes>,
1710}
1711
1712struct LostSessionDisputes {
1713 backers_for_invalid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
1719 for_invalid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
1720 against_valid: LruMap<ValidatorIndex, (), UnlimitedCompact>,
1721}
1722
1723impl Default for LostSessionDisputes {
1724 fn default() -> Self {
1725 Self {
1726 backers_for_invalid: LruMap::new(UnlimitedCompact),
1727 for_invalid: LruMap::new(UnlimitedCompact),
1728 against_valid: LruMap::new(UnlimitedCompact),
1729 }
1730 }
1731}
1732
1733impl OffchainDisabledValidators {
1734 pub fn new_from_state(
1736 disputes: &RecentDisputes,
1737 load_candidate_votes: impl Fn(SessionIndex, &CandidateHash) -> Option<CandidateVotes>,
1738 earliest_session: SessionIndex,
1739 ) -> Self {
1740 let mut disabled_validators = Self::default();
1741
1742 for ((session, candidate_hash), dispute_status) in disputes {
1744 let session = *session;
1745 if dispute_status.concluded_at().is_none() {
1747 continue
1748 }
1749 if session < earliest_session {
1750 continue
1751 }
1752
1753 let votes = match load_candidate_votes(session, candidate_hash) {
1755 Some(votes) => votes,
1756 None => continue,
1757 };
1758
1759 if dispute_status.has_concluded_for() {
1761 for (validator_index, _) in votes.invalid.iter() {
1764 disabled_validators.insert_against_valid(session, *validator_index);
1765 }
1766 } else if dispute_status.has_concluded_against() {
1767 for (validator_index, (kind, _)) in votes.valid.raw().iter() {
1769 let is_backer = kind.is_backing();
1770 disabled_validators.insert_for_invalid(session, *validator_index, is_backer);
1771 }
1772 }
1773 }
1774
1775 disabled_validators
1776 }
1777
1778 pub fn prune_old(&mut self, up_to_excluding: SessionIndex) {
1780 let mut relevant = self.per_session.split_off(&up_to_excluding);
1782 std::mem::swap(&mut relevant, &mut self.per_session);
1783 }
1784
1785 pub fn insert_for_invalid(
1787 &mut self,
1788 session_index: SessionIndex,
1789 validator_index: ValidatorIndex,
1790 is_backer: bool,
1791 ) {
1792 let entry = self.per_session.entry(session_index).or_default();
1793 if is_backer {
1794 entry.backers_for_invalid.insert(validator_index, ());
1795 } else {
1796 entry.for_invalid.insert(validator_index, ());
1797 }
1798 }
1799
1800 pub fn insert_against_valid(
1802 &mut self,
1803 session_index: SessionIndex,
1804 validator_index: ValidatorIndex,
1805 ) {
1806 self.per_session
1807 .entry(session_index)
1808 .or_default()
1809 .against_valid
1810 .insert(validator_index, ());
1811 }
1812
1813 pub fn iter(&self, session_index: SessionIndex) -> impl Iterator<Item = ValidatorIndex> + '_ {
1818 self.per_session.get(&session_index).into_iter().flat_map(|e| {
1819 e.backers_for_invalid
1820 .iter()
1821 .chain(e.for_invalid.iter())
1822 .chain(e.against_valid.iter())
1823 .map(|(i, _)| *i)
1824 })
1825 }
1826
1827 pub fn is_disabled(
1829 &self,
1830 session_index: SessionIndex,
1831 validator_index: ValidatorIndex,
1832 ) -> bool {
1833 self.per_session
1834 .get(&session_index)
1835 .map(|session_disputes| {
1836 session_disputes.backers_for_invalid.peek(&validator_index).is_some() ||
1837 session_disputes.for_invalid.peek(&validator_index).is_some() ||
1838 session_disputes.against_valid.peek(&validator_index).is_some()
1839 })
1840 .unwrap_or(false)
1841 }
1842}