1use futures::{
29 channel::mpsc::{self, UnboundedReceiver},
30 prelude::*,
31 ready,
32};
33#[cfg(feature = "std")]
34use log::trace;
35
36use parking_lot::Mutex;
37
38use std::{
39 collections::VecDeque,
40 hash::Hash,
41 pin::Pin,
42 sync::Arc,
43 task::{Context, Poll},
44};
45
46use crate::{
47 round::State as RoundState, validate_commit, voter_set::VoterSet, weights::VoteWeight,
48 BlockNumberOps, CatchUp, Chain, Commit, CommitValidationResult, CompactCommit, Equivocation,
49 HistoricalVotes, Message, Precommit, Prevote, PrimaryPropose, SignedMessage, LOG_TARGET,
50};
51use past_rounds::PastRounds;
52use voting_round::{State as VotingRoundState, VotingRound};
53
54mod past_rounds;
55mod voting_round;
56
57pub trait Environment<H: Eq, N: BlockNumberOps>: Chain<H, N> {
61 type Timer: Future<Output = Result<(), Self::Error>> + Unpin;
64 type BestChain: Future<Output = Result<Option<(H, N)>, Self::Error>> + Send + Unpin;
67 type Id: Clone + Eq + Ord + std::fmt::Debug;
69 type Signature: Eq + Clone;
71 type In: Stream<Item = Result<SignedMessage<H, N, Self::Signature, Self::Id>, Self::Error>>
73 + Unpin;
74 type Out: Sink<Message<H, N>, Error = Self::Error> + Unpin;
76 type Error: From<crate::Error> + ::std::error::Error;
78
79 fn best_chain_containing(&self, base: H) -> Self::BestChain;
84
85 fn round_data(&self, round: u64) -> RoundData<Self::Id, Self::Timer, Self::In, Self::Out>;
105
106 fn round_commit_timer(&self) -> Self::Timer;
110
111 fn proposed(&self, round: u64, propose: PrimaryPropose<H, N>) -> Result<(), Self::Error>;
113
114 fn prevoted(&self, round: u64, prevote: Prevote<H, N>) -> Result<(), Self::Error>;
116
117 fn precommitted(&self, round: u64, precommit: Precommit<H, N>) -> Result<(), Self::Error>;
119
120 fn completed(
125 &self,
126 round: u64,
127 state: RoundState<H, N>,
128 base: (H, N),
129 votes: &HistoricalVotes<H, N, Self::Signature, Self::Id>,
130 ) -> Result<(), Self::Error>;
131
132 fn concluded(
139 &self,
140 round: u64,
141 state: RoundState<H, N>,
142 base: (H, N),
143 votes: &HistoricalVotes<H, N, Self::Signature, Self::Id>,
144 ) -> Result<(), Self::Error>;
145
146 fn finalize_block(
149 &self,
150 hash: H,
151 number: N,
152 round: u64,
153 commit: Commit<H, N, Self::Signature, Self::Id>,
154 ) -> Result<(), Self::Error>;
155
156 fn prevote_equivocation(
158 &self,
159 round: u64,
160 equivocation: Equivocation<Self::Id, Prevote<H, N>, Self::Signature>,
161 );
162 fn precommit_equivocation(
164 &self,
165 round: u64,
166 equivocation: Equivocation<Self::Id, Precommit<H, N>, Self::Signature>,
167 );
168}
169
170#[derive(Debug, Clone, PartialEq, Eq)]
172pub enum CommunicationOut<H, N, S, Id> {
173 Commit(u64, Commit<H, N, S, Id>),
175}
176
177#[derive(Debug, Clone, PartialEq, Eq)]
179pub enum CommitProcessingOutcome {
180 Good(GoodCommit),
182 Bad(BadCommit),
184}
185
186#[cfg(any(test, feature = "test-helpers"))]
187impl CommitProcessingOutcome {
188 pub fn good() -> CommitProcessingOutcome {
190 CommitProcessingOutcome::Good(GoodCommit::new())
191 }
192
193 pub fn bad() -> CommitProcessingOutcome {
195 CommitProcessingOutcome::Bad(CommitValidationResult::default().into())
196 }
197}
198
199#[derive(Debug, Clone, PartialEq, Eq)]
201pub struct GoodCommit {
202 _priv: (), }
204
205impl GoodCommit {
206 pub(crate) fn new() -> Self {
207 GoodCommit { _priv: () }
208 }
209}
210
211#[derive(Debug, Clone, PartialEq, Eq)]
213pub struct BadCommit {
214 _priv: (), num_precommits: usize,
216 num_duplicated_precommits: usize,
217 num_equivocations: usize,
218 num_invalid_voters: usize,
219}
220
221impl BadCommit {
222 pub fn num_precommits(&self) -> usize {
224 self.num_precommits
225 }
226
227 pub fn num_duplicated(&self) -> usize {
229 self.num_duplicated_precommits
230 }
231
232 pub fn num_equivocations(&self) -> usize {
234 self.num_equivocations
235 }
236
237 pub fn num_invalid_voters(&self) -> usize {
239 self.num_invalid_voters
240 }
241}
242
243impl From<CommitValidationResult> for BadCommit {
244 fn from(r: CommitValidationResult) -> Self {
245 BadCommit {
246 num_precommits: r.num_precommits,
247 num_duplicated_precommits: r.num_duplicated_precommits,
248 num_equivocations: r.num_equivocations,
249 num_invalid_voters: r.num_invalid_voters,
250 _priv: (),
251 }
252 }
253}
254
255#[derive(Debug, Clone, PartialEq, Eq)]
257pub enum CatchUpProcessingOutcome {
258 Good(GoodCatchUp),
260 Bad(BadCatchUp),
263 Useless,
266}
267
268#[cfg(any(test, feature = "test-helpers"))]
269impl CatchUpProcessingOutcome {
270 pub fn bad() -> CatchUpProcessingOutcome {
272 CatchUpProcessingOutcome::Bad(BadCatchUp::new())
273 }
274
275 pub fn good() -> CatchUpProcessingOutcome {
277 CatchUpProcessingOutcome::Good(GoodCatchUp::new())
278 }
279}
280
281#[derive(Debug, Clone, PartialEq, Eq)]
283pub struct GoodCatchUp {
284 _priv: (), }
286
287impl GoodCatchUp {
288 pub(crate) fn new() -> Self {
289 GoodCatchUp { _priv: () }
290 }
291}
292
293#[derive(Debug, Clone, PartialEq, Eq)]
295pub struct BadCatchUp {
296 _priv: (), }
298
299impl BadCatchUp {
300 pub(crate) fn new() -> Self {
301 BadCatchUp { _priv: () }
302 }
303}
304
305pub enum Callback<O> {
309 Blank,
311 Work(Box<dyn FnMut(O) + Send>),
313}
314
315#[cfg(any(test, feature = "test-helpers"))]
316impl<O> Clone for Callback<O> {
317 fn clone(&self) -> Self {
318 Callback::Blank
319 }
320}
321
322impl<O> Callback<O> {
323 pub fn run(&mut self, o: O) {
325 match self {
326 Callback::Blank => {},
327 Callback::Work(cb) => cb(o),
328 }
329 }
330}
331
332#[cfg_attr(any(test, feature = "test-helpers"), derive(Clone))]
334pub enum CommunicationIn<H, N, S, Id> {
335 Commit(u64, CompactCommit<H, N, S, Id>, Callback<CommitProcessingOutcome>),
337 CatchUp(CatchUp<H, N, S, Id>, Callback<CatchUpProcessingOutcome>),
339}
340
341impl<H, N, S, Id> Unpin for CommunicationIn<H, N, S, Id> {}
342
343pub struct RoundData<Id, Timer, Input, Output> {
345 pub voter_id: Option<Id>,
347 pub prevote_timer: Timer,
350 pub precommit_timer: Timer,
352 pub incoming: Input,
354 pub outgoing: Output,
356}
357
358struct Buffered<S, I> {
359 inner: S,
360 buffer: VecDeque<I>,
361}
362
363impl<S: Sink<I> + Unpin, I> Buffered<S, I> {
364 fn new(inner: S) -> Buffered<S, I> {
365 Buffered { buffer: VecDeque::new(), inner }
366 }
367
368 fn push(&mut self, item: I) {
371 self.buffer.push_back(item);
372 }
373
374 fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), S::Error>> {
376 let polled = self.schedule_all(cx)?;
377
378 match polled {
379 Poll::Ready(()) => Sink::poll_flush(Pin::new(&mut self.inner), cx),
380 Poll::Pending => {
381 ready!(Sink::poll_flush(Pin::new(&mut self.inner), cx))?;
382 Poll::Pending
383 },
384 }
385 }
386
387 fn schedule_all(&mut self, cx: &mut Context) -> Poll<Result<(), S::Error>> {
388 while !self.buffer.is_empty() {
389 ready!(Sink::poll_ready(Pin::new(&mut self.inner), cx))?;
390
391 let item = self
392 .buffer
393 .pop_front()
394 .expect("we checked self.buffer.is_empty() just above; qed");
395 Sink::start_send(Pin::new(&mut self.inner), item)?;
396 }
397
398 Poll::Ready(Ok(()))
399 }
400}
401
402type FinalizedNotification<H, N, E> =
403 (H, N, u64, Commit<H, N, <E as Environment<H, N>>::Signature, <E as Environment<H, N>>::Id>);
404
405fn instantiate_last_round<H, N, E: Environment<H, N>>(
413 voters: VoterSet<E::Id>,
414 last_round_votes: Vec<SignedMessage<H, N, E::Signature, E::Id>>,
415 last_round_number: u64,
416 last_round_base: (H, N),
417 finalized_sender: mpsc::UnboundedSender<FinalizedNotification<H, N, E>>,
418 env: Arc<E>,
419) -> Option<VotingRound<H, N, E>>
420where
421 H: Clone + Eq + Ord + ::std::fmt::Debug,
422 N: Copy + BlockNumberOps + ::std::fmt::Debug,
423{
424 let last_round_tracker = crate::round::Round::new(crate::round::RoundParams {
425 voters,
426 base: last_round_base,
427 round_number: last_round_number,
428 });
429
430 let mut last_round = VotingRound::completed(last_round_tracker, finalized_sender, None, env);
432
433 for vote in last_round_votes {
434 last_round.handle_vote(vote).ok()?;
436 }
437
438 if last_round.round_state().completable {
439 Some(last_round)
440 } else {
441 None
442 }
443}
444
445struct InnerVoterState<H, N, E>
449where
450 H: Clone + Ord + std::fmt::Debug,
451 N: BlockNumberOps,
452 E: Environment<H, N>,
453{
454 best_round: VotingRound<H, N, E>,
455 past_rounds: PastRounds<H, N, E>,
456}
457
458pub struct Voter<H, N, E: Environment<H, N>, GlobalIn, GlobalOut>
477where
478 H: Clone + Eq + Ord + ::std::fmt::Debug,
479 N: Copy + BlockNumberOps + ::std::fmt::Debug,
480 GlobalIn: Stream<Item = Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
481 GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error = E::Error> + Unpin,
482{
483 env: Arc<E>,
484 voters: VoterSet<E::Id>,
485 inner: Arc<Mutex<InnerVoterState<H, N, E>>>,
486 finalized_notifications: UnboundedReceiver<FinalizedNotification<H, N, E>>,
487 last_finalized_number: N,
488 global_in: GlobalIn,
489 global_out: Buffered<GlobalOut, CommunicationOut<H, N, E::Signature, E::Id>>,
490 last_finalized_in_rounds: (H, N),
494}
495
496impl<'a, H: 'a, N, E: 'a, GlobalIn, GlobalOut> Voter<H, N, E, GlobalIn, GlobalOut>
497where
498 H: Clone + Ord + ::std::fmt::Debug + Sync + Send,
499 N: BlockNumberOps + Sync + Send,
500 E: Environment<H, N> + Sync + Send,
501 GlobalIn: Stream<Item = Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
502 GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error = E::Error> + Unpin,
503{
504 pub fn voter_state(&self) -> Box<dyn VoterState<E::Id> + 'a + Send + Sync>
506 where
507 <E as Environment<H, N>>::Signature: Send,
508 <E as Environment<H, N>>::Id: Hash + Send,
509 <E as Environment<H, N>>::Timer: Send,
510 <E as Environment<H, N>>::Out: Send,
511 <E as Environment<H, N>>::In: Send,
512 {
513 Box::new(SharedVoterState(self.inner.clone()))
514 }
515}
516
517impl<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> Voter<H, N, E, GlobalIn, GlobalOut>
518where
519 H: Clone + Eq + Ord + ::std::fmt::Debug,
520 N: Copy + BlockNumberOps + ::std::fmt::Debug,
521 GlobalIn: Stream<Item = Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
522 GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error = E::Error> + Unpin,
523{
524 pub fn new(
536 env: Arc<E>,
537 voters: VoterSet<E::Id>,
538 global_comms: (GlobalIn, GlobalOut),
539 last_round_number: u64,
540 last_round_votes: Vec<SignedMessage<H, N, E::Signature, E::Id>>,
541 last_round_base: (H, N),
542 last_finalized: (H, N),
543 ) -> Self {
544 let (finalized_sender, finalized_notifications) = mpsc::unbounded();
545 let last_finalized_number = last_finalized.1;
546
547 let mut past_rounds = PastRounds::new();
551 let mut last_round_state =
552 crate::bridge_state::bridge_state(RoundState::genesis(last_round_base.clone())).1;
553
554 if last_round_number > 0 {
555 let maybe_completed_last_round = instantiate_last_round(
556 voters.clone(),
557 last_round_votes,
558 last_round_number,
559 last_round_base,
560 finalized_sender.clone(),
561 env.clone(),
562 );
563
564 if let Some(mut last_round) = maybe_completed_last_round {
565 last_round_state = last_round.bridge_state();
566 past_rounds.push(&*env, last_round);
567 }
568
569 }
574
575 let best_round = VotingRound::new(
576 last_round_number + 1,
577 voters.clone(),
578 last_finalized.clone(),
579 Some(last_round_state),
580 finalized_sender,
581 env.clone(),
582 );
583
584 let (global_in, global_out) = global_comms;
585
586 let inner = Arc::new(Mutex::new(InnerVoterState { best_round, past_rounds }));
587
588 Voter {
589 env,
590 voters,
591 inner,
592 finalized_notifications,
593 last_finalized_number,
594 last_finalized_in_rounds: last_finalized,
595 global_in,
596 global_out: Buffered::new(global_out),
597 }
598 }
599
600 fn prune_background_rounds(&mut self, cx: &mut Context) -> Result<(), E::Error> {
601 {
602 let mut inner = self.inner.lock();
603
604 while let Poll::Ready(Some(item)) =
606 Stream::poll_next(Pin::new(&mut inner.past_rounds), cx)
607 {
608 let (number, commit) = item?;
609 self.global_out.push(CommunicationOut::Commit(number, commit));
610 }
611 }
612
613 while let Poll::Ready(res) =
614 Stream::poll_next(Pin::new(&mut self.finalized_notifications), cx)
615 {
616 let inner = self.inner.clone();
617 let mut inner = inner.lock();
618
619 let (f_hash, f_num, round, commit) =
620 res.expect("one sender always kept alive in self.best_round; qed");
621
622 inner.past_rounds.update_finalized(f_num);
623
624 if self.set_last_finalized_number(f_num) {
625 self.env.finalize_block(f_hash.clone(), f_num, round, commit)?;
626 }
627
628 if f_num > self.last_finalized_in_rounds.1 {
629 self.last_finalized_in_rounds = (f_hash, f_num);
630 }
631 }
632
633 Ok(())
634 }
635
636 fn process_incoming(&mut self, cx: &mut Context) -> Result<(), E::Error> {
645 while let Poll::Ready(Some(item)) = Stream::poll_next(Pin::new(&mut self.global_in), cx) {
646 match item? {
647 CommunicationIn::Commit(round_number, commit, mut process_commit_outcome) => {
648 trace!(
649 target: LOG_TARGET,
650 "Got commit for round_number {:?}: target_number: {:?}, target_hash: {:?}",
651 round_number,
652 commit.target_number,
653 commit.target_hash,
654 );
655
656 let commit: Commit<_, _, _, _> = commit.into();
657
658 let mut inner = self.inner.lock();
659
660 if let Some(commit) = inner.past_rounds.import_commit(round_number, commit) {
663 let validation_result = validate_commit(&commit, &self.voters, &*self.env)?;
666
667 if validation_result.is_valid() {
668 let last_finalized_number = &mut self.last_finalized_number;
672
673 inner.past_rounds.update_finalized(commit.target_number);
675
676 if commit.target_number > *last_finalized_number {
677 *last_finalized_number = commit.target_number;
678 self.env.finalize_block(
679 commit.target_hash.clone(),
680 commit.target_number,
681 round_number,
682 commit,
683 )?;
684 }
685
686 process_commit_outcome
687 .run(CommitProcessingOutcome::Good(GoodCommit::new()));
688 } else {
689 process_commit_outcome.run(CommitProcessingOutcome::Bad(
691 BadCommit::from(validation_result),
692 ));
693 }
694 } else {
695 process_commit_outcome
697 .run(CommitProcessingOutcome::Good(GoodCommit::new()));
698 }
699 },
700 CommunicationIn::CatchUp(catch_up, mut process_catch_up_outcome) => {
701 trace!(
702 target: LOG_TARGET,
703 "Got catch-up message for round {}",
704 catch_up.round_number
705 );
706
707 let mut inner = self.inner.lock();
708
709 let round = if let Some(round) = validate_catch_up(
710 catch_up,
711 &*self.env,
712 &self.voters,
713 inner.best_round.round_number(),
714 ) {
715 round
716 } else {
717 process_catch_up_outcome
718 .run(CatchUpProcessingOutcome::Bad(BadCatchUp::new()));
719 return Ok(())
720 };
721
722 let state = round.state();
723
724 let mut just_completed = VotingRound::completed(
727 round,
728 inner.best_round.finalized_sender(),
729 None,
730 self.env.clone(),
731 );
732
733 let new_best = VotingRound::new(
734 just_completed.round_number() + 1,
735 self.voters.clone(),
736 self.last_finalized_in_rounds.clone(),
737 Some(just_completed.bridge_state()),
738 inner.best_round.finalized_sender(),
739 self.env.clone(),
740 );
741
742 if let Some((f_hash, f_num)) = state.finalized.clone() {
745 if f_num > self.last_finalized_in_rounds.1 {
746 self.last_finalized_in_rounds = (f_hash, f_num);
747 }
748 }
749
750 self.env.completed(
751 just_completed.round_number(),
752 just_completed.round_state(),
753 just_completed.dag_base(),
754 just_completed.historical_votes(),
755 )?;
756
757 inner.past_rounds.push(&*self.env, just_completed);
758
759 let old_best = std::mem::replace(&mut inner.best_round, new_best);
760 inner.past_rounds.push(&*self.env, old_best);
761
762 process_catch_up_outcome
763 .run(CatchUpProcessingOutcome::Good(GoodCatchUp::new()));
764 },
765 }
766 }
767
768 Ok(())
769 }
770
771 fn process_best_round(&mut self, cx: &mut Context) -> Poll<Result<(), E::Error>> {
773 {
776 let mut inner = self.inner.lock();
777
778 let should_start_next = {
779 let completable = match inner.best_round.poll(cx)? {
780 Poll::Ready(()) => true,
781 Poll::Pending => false,
782 };
783
784 let precommitted =
786 matches!(inner.best_round.state(), Some(&VotingRoundState::Precommitted));
787
788 completable && precommitted
789 };
790
791 if !should_start_next {
792 return Poll::Pending
793 }
794
795 trace!(
796 target: LOG_TARGET,
797 "Best round at {} has become completable. Starting new best round at {}",
798 inner.best_round.round_number(),
799 inner.best_round.round_number() + 1,
800 );
801 }
802
803 self.completed_best_round()?;
804
805 self.poll_unpin(cx)
807 }
808
809 fn completed_best_round(&mut self) -> Result<(), E::Error> {
810 let mut inner = self.inner.lock();
811
812 self.env.completed(
813 inner.best_round.round_number(),
814 inner.best_round.round_state(),
815 inner.best_round.dag_base(),
816 inner.best_round.historical_votes(),
817 )?;
818
819 let old_round_number = inner.best_round.round_number();
820
821 let next_round = VotingRound::new(
822 old_round_number + 1,
823 self.voters.clone(),
824 self.last_finalized_in_rounds.clone(),
825 Some(inner.best_round.bridge_state()),
826 inner.best_round.finalized_sender(),
827 self.env.clone(),
828 );
829
830 let old_round = ::std::mem::replace(&mut inner.best_round, next_round);
831 inner.past_rounds.push(&*self.env, old_round);
832 Ok(())
833 }
834
835 fn set_last_finalized_number(&mut self, finalized_number: N) -> bool {
836 let last_finalized_number = &mut self.last_finalized_number;
837 if finalized_number > *last_finalized_number {
838 *last_finalized_number = finalized_number;
839 return true
840 }
841 false
842 }
843}
844
845impl<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> Future for Voter<H, N, E, GlobalIn, GlobalOut>
846where
847 H: Clone + Eq + Ord + ::std::fmt::Debug,
848 N: Copy + BlockNumberOps + ::std::fmt::Debug,
849 GlobalIn: Stream<Item = Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
850 GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error = E::Error> + Unpin,
851{
852 type Output = Result<(), E::Error>;
853
854 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), E::Error>> {
855 self.process_incoming(cx)?;
856 self.prune_background_rounds(cx)?;
857 let _ = self.global_out.poll(cx)?;
858
859 self.process_best_round(cx)
860 }
861}
862
863impl<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> Unpin for Voter<H, N, E, GlobalIn, GlobalOut>
864where
865 H: Clone + Eq + Ord + ::std::fmt::Debug,
866 N: Copy + BlockNumberOps + ::std::fmt::Debug,
867 GlobalIn: Stream<Item = Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
868 GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error = E::Error> + Unpin,
869{
870}
871
872pub trait VoterState<Id: Eq + std::hash::Hash> {
875 fn get(&self) -> report::VoterState<Id>;
878}
879
880pub mod report {
882 use crate::weights::{VoteWeight, VoterWeight};
883 use std::collections::{HashMap, HashSet};
884
885 #[derive(PartialEq, Eq, Clone)]
887 #[cfg_attr(test, derive(Debug))]
888 pub struct RoundState<Id: Eq + std::hash::Hash> {
889 pub total_weight: VoterWeight,
891 pub threshold_weight: VoterWeight,
893
894 pub prevote_current_weight: VoteWeight,
896 pub prevote_ids: HashSet<Id>,
898
899 pub precommit_current_weight: VoteWeight,
901 pub precommit_ids: HashSet<Id>,
903 }
904
905 #[derive(PartialEq, Eq)]
908 #[cfg_attr(test, derive(Debug))]
909 pub struct VoterState<Id: Eq + std::hash::Hash> {
910 pub background_rounds: HashMap<u64, RoundState<Id>>,
912 pub best_round: (u64, RoundState<Id>),
914 }
915}
916
917struct SharedVoterState<H, N, E>(Arc<Mutex<InnerVoterState<H, N, E>>>)
918where
919 H: Clone + Ord + std::fmt::Debug,
920 N: BlockNumberOps,
921 E: Environment<H, N>;
922
923impl<H, N, E> VoterState<E::Id> for SharedVoterState<H, N, E>
924where
925 H: Clone + Eq + Ord + std::fmt::Debug,
926 N: BlockNumberOps,
927 E: Environment<H, N>,
928 <E as Environment<H, N>>::Id: Hash,
929{
930 fn get(&self) -> report::VoterState<E::Id> {
931 let to_round_state = |voting_round: &VotingRound<H, N, E>| {
932 (
933 voting_round.round_number(),
934 report::RoundState {
935 total_weight: voting_round.voters().total_weight(),
936 threshold_weight: voting_round.voters().threshold(),
937 prevote_current_weight: voting_round.prevote_weight(),
938 prevote_ids: voting_round.prevote_ids().collect(),
939 precommit_current_weight: voting_round.precommit_weight(),
940 precommit_ids: voting_round.precommit_ids().collect(),
941 },
942 )
943 };
944
945 let inner = self.0.lock();
946 let best_round = to_round_state(&inner.best_round);
947 let background_rounds = inner.past_rounds.voting_rounds().map(to_round_state).collect();
948
949 report::VoterState { best_round, background_rounds }
950 }
951}
952
953fn validate_catch_up<H, N, S, I, E>(
957 catch_up: CatchUp<H, N, S, I>,
958 env: &E,
959 voters: &VoterSet<I>,
960 best_round_number: u64,
961) -> Option<crate::round::Round<I, H, N, S>>
962where
963 H: Clone + Eq + Ord + std::fmt::Debug,
964 N: BlockNumberOps + std::fmt::Debug,
965 S: Clone + Eq,
966 I: Clone + Eq + std::fmt::Debug + Ord,
967 E: Environment<H, N>,
968{
969 if catch_up.round_number <= best_round_number {
970 trace!(target: LOG_TARGET, "Ignoring because best round number is {}", best_round_number);
971
972 return None
973 }
974
975 {
977 let mut map = std::collections::BTreeMap::new();
978
979 for prevote in &catch_up.prevotes {
980 if !voters.contains(&prevote.id) {
981 trace!(
982 target: LOG_TARGET,
983 "Ignoring invalid catch up, invalid voter: {:?}",
984 prevote.id,
985 );
986
987 return None
988 }
989
990 map.entry(prevote.id.clone()).or_insert((false, false)).0 = true;
991 }
992
993 for precommit in &catch_up.precommits {
994 if !voters.contains(&precommit.id) {
995 trace!(
996 target: LOG_TARGET,
997 "Ignoring invalid catch up, invalid voter: {:?}",
998 precommit.id,
999 );
1000
1001 return None
1002 }
1003
1004 map.entry(precommit.id.clone()).or_insert((false, false)).1 = true;
1005 }
1006
1007 let (pv, pc) = map.into_iter().fold(
1008 (VoteWeight(0), VoteWeight(0)),
1009 |(mut pv, mut pc), (id, (prevoted, precommitted))| {
1010 if let Some(v) = voters.get(&id) {
1011 if prevoted {
1012 pv = pv + v.weight();
1013 }
1014
1015 if precommitted {
1016 pc = pc + v.weight();
1017 }
1018 }
1019
1020 (pv, pc)
1021 },
1022 );
1023
1024 let threshold = voters.threshold();
1025 if pv < threshold || pc < threshold {
1026 trace!(target: LOG_TARGET, "Ignoring invalid catch up, missing voter threshold");
1027
1028 return None
1029 }
1030 }
1031
1032 let mut round = crate::round::Round::new(crate::round::RoundParams {
1033 round_number: catch_up.round_number,
1034 voters: voters.clone(),
1035 base: (catch_up.base_hash.clone(), catch_up.base_number),
1036 });
1037
1038 for crate::SignedPrevote { prevote, id, signature } in catch_up.prevotes {
1040 match round.import_prevote(env, prevote, id, signature) {
1041 Ok(_) => {},
1042 Err(e) => {
1043 trace!(
1044 target: LOG_TARGET,
1045 "Ignoring invalid catch up, error importing prevote: {:?}",
1046 e,
1047 );
1048
1049 return None
1050 },
1051 }
1052 }
1053
1054 for crate::SignedPrecommit { precommit, id, signature } in catch_up.precommits {
1056 match round.import_precommit(env, precommit, id, signature) {
1057 Ok(_) => {},
1058 Err(e) => {
1059 trace!(
1060 target: LOG_TARGET,
1061 "Ignoring invalid catch up, error importing precommit: {:?}",
1062 e,
1063 );
1064
1065 return None
1066 },
1067 }
1068 }
1069
1070 let state = round.state();
1071 if !state.completable {
1072 return None
1073 }
1074
1075 Some(round)
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080 use super::*;
1081 use crate::{
1082 testing::{
1083 self,
1084 chain::GENESIS_HASH,
1085 environment::{Environment, Id, Signature},
1086 },
1087 weights::{VoteWeight, VoterWeight},
1088 SignedPrecommit,
1089 };
1090 use futures::{executor::LocalPool, task::SpawnExt};
1091 use futures_timer::Delay;
1092 use std::{collections::HashSet, iter, time::Duration};
1093
1094 #[test]
1095 fn talking_to_myself() {
1096 let local_id = Id(5);
1097 let voters = VoterSet::new(std::iter::once((local_id, 100))).unwrap();
1098
1099 let (network, routing_task) = testing::environment::make_network();
1100
1101 let global_comms = network.make_global_comms();
1102 let env = Arc::new(Environment::new(network, local_id));
1103
1104 let last_finalized = env.with_chain(|chain| {
1106 chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1107 chain.last_finalized()
1108 });
1109
1110 let finalized = env.finalized_stream();
1112 let voter = Voter::new(
1113 env.clone(),
1114 voters,
1115 global_comms,
1116 0,
1117 Vec::new(),
1118 last_finalized,
1119 last_finalized,
1120 );
1121
1122 let mut pool = LocalPool::new();
1123 pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
1124 pool.spawner().spawn(routing_task).unwrap();
1125
1126 pool.run_until(
1128 finalized
1129 .take_while(|&(_, n, _)| future::ready(n < 6))
1130 .for_each(|_| future::ready(())),
1131 )
1132 }
1133
1134 #[test]
1135 fn finalizing_at_fault_threshold() {
1136 let voters = VoterSet::new((0..10).map(|i| (Id(i), 1))).expect("nonempty");
1138
1139 let (network, routing_task) = testing::environment::make_network();
1140 let mut pool = LocalPool::new();
1141
1142 let finalized_streams = (0..7)
1144 .map(|i| {
1145 let local_id = Id(i);
1146 let env = Arc::new(Environment::new(network.clone(), local_id));
1148 let last_finalized = env.with_chain(|chain| {
1149 chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1150 chain.last_finalized()
1151 });
1152
1153 let finalized = env.finalized_stream();
1155 let voter = Voter::new(
1156 env.clone(),
1157 voters.clone(),
1158 network.make_global_comms(),
1159 0,
1160 Vec::new(),
1161 last_finalized,
1162 last_finalized,
1163 );
1164
1165 pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
1166
1167 finalized
1169 .take_while(|&(_, n, _)| future::ready(n < 6))
1170 .for_each(|_| future::ready(()))
1171 })
1172 .collect::<Vec<_>>();
1173
1174 pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1175
1176 pool.run_until(future::join_all(finalized_streams.into_iter()));
1177 }
1178
1179 #[test]
1180 fn exposing_voter_state() {
1181 let num_voters = 10;
1182 let voters_online = 7;
1183 let voters = VoterSet::new((0..num_voters).map(|i| (Id(i), 1))).expect("nonempty");
1184
1185 let (network, routing_task) = testing::environment::make_network();
1186 let mut pool = LocalPool::new();
1187
1188 let (finalized_streams, voter_states): (Vec<_>, Vec<_>) = (0..voters_online)
1190 .map(|i| {
1191 let local_id = Id(i);
1192 let env = Arc::new(Environment::new(network.clone(), local_id));
1194 let last_finalized = env.with_chain(|chain| {
1195 chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1196 chain.last_finalized()
1197 });
1198
1199 let finalized = env.finalized_stream();
1201 let voter = Voter::new(
1202 env.clone(),
1203 voters.clone(),
1204 network.make_global_comms(),
1205 0,
1206 Vec::new(),
1207 last_finalized,
1208 last_finalized,
1209 );
1210 let voter_state = voter.voter_state();
1211
1212 pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
1213
1214 (
1215 finalized
1217 .take_while(|&(_, n, _)| future::ready(n < 6))
1218 .for_each(|_| future::ready(())),
1219 voter_state,
1220 )
1221 })
1222 .unzip();
1223
1224 let voter_state = &voter_states[0];
1225 voter_states.iter().all(|vs| vs.get() == voter_state.get());
1226
1227 let expected_round_state = report::RoundState::<Id> {
1228 total_weight: VoterWeight::new(num_voters.into()).expect("nonzero"),
1229 threshold_weight: VoterWeight::new(voters_online.into()).expect("nonzero"),
1230 prevote_current_weight: VoteWeight(0),
1231 prevote_ids: Default::default(),
1232 precommit_current_weight: VoteWeight(0),
1233 precommit_ids: Default::default(),
1234 };
1235
1236 assert_eq!(
1237 voter_state.get(),
1238 report::VoterState {
1239 background_rounds: Default::default(),
1240 best_round: (1, expected_round_state.clone()),
1241 }
1242 );
1243
1244 pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1245 pool.run_until(future::join_all(finalized_streams.into_iter()));
1246
1247 assert_eq!(voter_state.get().best_round, (2, expected_round_state.clone()));
1248 }
1249
1250 #[test]
1251 fn broadcast_commit() {
1252 let local_id = Id(5);
1253 let voters = VoterSet::new([(local_id, 100)].iter().cloned()).expect("nonempty");
1254
1255 let (network, routing_task) = testing::environment::make_network();
1256 let (commits, _) = network.make_global_comms();
1257
1258 let global_comms = network.make_global_comms();
1259 let env = Arc::new(Environment::new(network, local_id));
1260
1261 let last_finalized = env.with_chain(|chain| {
1263 chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1264 chain.last_finalized()
1265 });
1266
1267 let voter = Voter::new(
1269 env.clone(),
1270 voters.clone(),
1271 global_comms,
1272 0,
1273 Vec::new(),
1274 last_finalized,
1275 last_finalized,
1276 );
1277
1278 let mut pool = LocalPool::new();
1279 pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
1280 pool.spawner().spawn(routing_task).unwrap();
1281
1282 pool.run_until(commits.take(1).for_each(|_| future::ready(())))
1284 }
1285
1286 #[test]
1287 fn broadcast_commit_only_if_newer() {
1288 let local_id = Id(5);
1289 let test_id = Id(42);
1290 let voters =
1291 VoterSet::new([(local_id, 100), (test_id, 201)].iter().cloned()).expect("nonempty");
1292
1293 let (network, routing_task) = testing::environment::make_network();
1294 let (commits_stream, commits_sink) = network.make_global_comms();
1295 let (round_stream, round_sink) = network.make_round_comms(1, test_id);
1296
1297 let prevote = Message::Prevote(Prevote { target_hash: "E", target_number: 6 });
1298
1299 let precommit = Message::Precommit(Precommit { target_hash: "E", target_number: 6 });
1300
1301 let commit = (
1302 1,
1303 Commit {
1304 target_hash: "E",
1305 target_number: 6,
1306 precommits: vec![SignedPrecommit {
1307 precommit: Precommit { target_hash: "E", target_number: 6 },
1308 signature: Signature(test_id.0),
1309 id: test_id,
1310 }],
1311 },
1312 );
1313
1314 let global_comms = network.make_global_comms();
1315 let env = Arc::new(Environment::new(network, local_id));
1316
1317 let last_finalized = env.with_chain(|chain| {
1319 chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1320 chain.last_finalized()
1321 });
1322
1323 let voter = Voter::new(
1325 env.clone(),
1326 voters.clone(),
1327 global_comms,
1328 0,
1329 Vec::new(),
1330 last_finalized,
1331 last_finalized,
1332 );
1333
1334 let mut pool = LocalPool::new();
1335 pool.spawner().spawn(voter.map(|v| v.expect("Error voting: {:?}"))).unwrap();
1336 pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1337
1338 pool.spawner()
1339 .spawn(
1340 round_stream
1341 .into_future()
1342 .then(|(value, stream)| {
1343 assert!(match value {
1345 Some(Ok(SignedMessage {
1346 message: Message::Prevote(_),
1347 id: Id(5),
1348 ..
1349 })) => true,
1350 _ => false,
1351 });
1352 let votes = vec![prevote, precommit].into_iter().map(Result::Ok);
1353 futures::stream::iter(votes).forward(round_sink).map(|_| stream) })
1355 .then(|stream| {
1356 stream
1357 .take_while(|value| match value {
1358 Ok(SignedMessage {
1360 message: Message::Precommit(_),
1361 id: Id(5),
1362 ..
1363 }) => future::ready(false),
1364 _ => future::ready(true),
1365 })
1366 .for_each(|_| future::ready(()))
1367 })
1368 .then(move |_| {
1369 stream::iter(iter::once(Ok(CommunicationOut::Commit(commit.0, commit.1))))
1371 .forward(commits_sink)
1372 })
1373 .map(|_| ()),
1374 )
1375 .unwrap();
1376
1377 let res = pool.run_until(
1378 commits_stream.into_future().then(|(_, stream)| {
1380 let await_second = stream.take(1).for_each(|_| future::ready(()));
1382 let delay = Delay::new(Duration::from_millis(500));
1383 future::select(await_second, delay)
1384 }),
1385 );
1386
1387 match res {
1388 future::Either::Right(((), _work)) => {
1389 },
1391 _ => panic!("Unexpected result"),
1392 }
1393 }
1394
1395 #[test]
1396 fn import_commit_for_any_round() {
1397 let local_id = Id(5);
1398 let test_id = Id(42);
1399 let voters =
1400 VoterSet::new([(local_id, 100), (test_id, 201)].iter().cloned()).expect("nonempty");
1401
1402 let (network, routing_task) = testing::environment::make_network();
1403 let (_, commits_sink) = network.make_global_comms();
1404
1405 let commit = Commit {
1407 target_hash: "E",
1408 target_number: 6,
1409 precommits: vec![SignedPrecommit {
1410 precommit: Precommit { target_hash: "E", target_number: 6 },
1411 signature: Signature(test_id.0),
1412 id: test_id,
1413 }],
1414 };
1415
1416 let global_comms = network.make_global_comms();
1417 let env = Arc::new(Environment::new(network, local_id));
1418
1419 let last_finalized = env.with_chain(|chain| {
1421 chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1422 chain.last_finalized()
1423 });
1424
1425 let voter = Voter::new(
1427 env.clone(),
1428 voters.clone(),
1429 global_comms,
1430 1,
1431 Vec::new(),
1432 last_finalized,
1433 last_finalized,
1434 );
1435
1436 let mut pool = LocalPool::new();
1437 pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
1438 pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1439
1440 pool.spawner()
1442 .spawn(
1443 stream::iter(iter::once(Ok(CommunicationOut::Commit(0, commit.clone()))))
1444 .forward(commits_sink)
1445 .map(|_| ()),
1446 )
1447 .unwrap();
1448
1449 let finalized = pool
1451 .run_until(env.finalized_stream().into_future().map(move |(msg, _)| msg.unwrap().2));
1452
1453 assert_eq!(finalized, commit);
1454 }
1455
1456 #[test]
1457 fn skips_to_latest_round_after_catch_up() {
1458 let voters = VoterSet::new((0..3).map(|i| (Id(i), 1u64))).expect("nonempty");
1460 let total_weight = voters.total_weight();
1461 let threshold_weight = voters.threshold();
1462 let voter_ids: HashSet<Id> = (0..3).map(|i| Id(i)).collect();
1463
1464 let (network, routing_task) = testing::environment::make_network();
1465 let mut pool = LocalPool::new();
1466
1467 pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1468
1469 let (env, unsynced_voter) = {
1471 let local_id = Id(4);
1472
1473 let env = Arc::new(Environment::new(network.clone(), local_id));
1474 let last_finalized = env.with_chain(|chain| {
1475 chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1476 chain.last_finalized()
1477 });
1478
1479 let voter = Voter::new(
1480 env.clone(),
1481 voters.clone(),
1482 network.make_global_comms(),
1483 0,
1484 Vec::new(),
1485 last_finalized,
1486 last_finalized,
1487 );
1488
1489 (env, voter)
1490 };
1491
1492 let pv = |id| crate::SignedPrevote {
1493 prevote: crate::Prevote { target_hash: "C", target_number: 4 },
1494 id: Id(id),
1495 signature: Signature(99),
1496 };
1497
1498 let pc = |id| crate::SignedPrecommit {
1499 precommit: crate::Precommit { target_hash: "C", target_number: 4 },
1500 id: Id(id),
1501 signature: Signature(99),
1502 };
1503
1504 network.send_message(CommunicationIn::CatchUp(
1506 CatchUp {
1507 base_number: 1,
1508 base_hash: GENESIS_HASH,
1509 round_number: 5,
1510 prevotes: vec![pv(0), pv(1), pv(2)],
1511 precommits: vec![pc(0), pc(1), pc(2)],
1512 },
1513 Callback::Blank,
1514 ));
1515
1516 let voter_state = unsynced_voter.voter_state();
1517 assert_eq!(voter_state.get().background_rounds.get(&5), None);
1518
1519 pool.spawner().spawn(unsynced_voter.map(|_| ())).unwrap();
1521
1522 let caught_up = future::poll_fn(|_| {
1526 if voter_state.get().best_round.0 == 6 {
1527 Poll::Ready(())
1528 } else {
1529 Poll::Pending
1530 }
1531 });
1532
1533 let finalized = env.finalized_stream().take(1).into_future();
1534
1535 pool.run_until(caught_up.then(|_| finalized.map(|_| ())));
1536
1537 assert_eq!(
1538 voter_state.get().best_round,
1539 (
1540 6,
1541 report::RoundState::<Id> {
1542 total_weight,
1543 threshold_weight,
1544 prevote_current_weight: VoteWeight(0),
1545 prevote_ids: Default::default(),
1546 precommit_current_weight: VoteWeight(0),
1547 precommit_ids: Default::default(),
1548 }
1549 )
1550 );
1551
1552 assert_eq!(
1553 voter_state.get().background_rounds.get(&5),
1554 Some(&report::RoundState::<Id> {
1555 total_weight,
1556 threshold_weight,
1557 prevote_current_weight: VoteWeight(3),
1558 prevote_ids: voter_ids.clone(),
1559 precommit_current_weight: VoteWeight(3),
1560 precommit_ids: voter_ids,
1561 })
1562 );
1563 }
1564
1565 #[test]
1566 fn pick_up_from_prior_without_grandparent_state() {
1567 let local_id = Id(5);
1568 let voters = VoterSet::new(std::iter::once((local_id, 100))).expect("nonempty");
1569
1570 let (network, routing_task) = testing::environment::make_network();
1571
1572 let global_comms = network.make_global_comms();
1573 let env = Arc::new(Environment::new(network, local_id));
1574
1575 let last_finalized = env.with_chain(|chain| {
1577 chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1578 chain.last_finalized()
1579 });
1580
1581 let voter = Voter::new(
1583 env.clone(),
1584 voters,
1585 global_comms,
1586 10,
1587 Vec::new(),
1588 last_finalized,
1589 last_finalized,
1590 );
1591
1592 let mut pool = LocalPool::new();
1593 pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
1594 pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1595
1596 pool.run_until(
1598 env.finalized_stream()
1599 .take_while(|&(_, n, _)| future::ready(n < 6))
1600 .for_each(|_| future::ready(())),
1601 )
1602 }
1603
1604 #[test]
1605 fn pick_up_from_prior_with_grandparent_state() {
1606 let local_id = Id(99);
1607 let voters = VoterSet::new((0..100).map(|i| (Id(i), 1))).expect("nonempty");
1608
1609 let (network, routing_task) = testing::environment::make_network();
1610
1611 let global_comms = network.make_global_comms();
1612 let env = Arc::new(Environment::new(network.clone(), local_id));
1613 let outer_env = env.clone();
1614
1615 let last_finalized = env.with_chain(|chain| {
1617 chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1618 chain.last_finalized()
1619 });
1620
1621 let mut pool = LocalPool::new();
1622 let mut last_round_votes = Vec::new();
1623
1624 for id in 0..67 {
1627 let prevote = Message::Prevote(Prevote { target_hash: "E", target_number: 6 });
1628 let precommit = if id < 66 {
1629 Message::Precommit(Precommit { target_hash: "D", target_number: 5 })
1630 } else {
1631 Message::Precommit(Precommit { target_hash: "E", target_number: 6 })
1632 };
1633
1634 last_round_votes.push(SignedMessage {
1635 message: prevote.clone(),
1636 signature: Signature(id),
1637 id: Id(id),
1638 });
1639
1640 last_round_votes.push(SignedMessage {
1641 message: precommit.clone(),
1642 signature: Signature(id),
1643 id: Id(id),
1644 });
1645
1646 let (_, round_sink) = network.make_round_comms(2, Id(id));
1651 let msgs = stream::iter(iter::once(Ok(prevote)).chain(iter::once(Ok(precommit))));
1652 pool.spawner().spawn(msgs.forward(round_sink).map(|r| r.unwrap())).unwrap();
1653 }
1654
1655 let sender = Id(67);
1658 let (_, round_sink) = network.make_round_comms(1, sender);
1659 let last_precommit = Message::Precommit(Precommit { target_hash: "D", target_number: 3 });
1660 pool.spawner()
1661 .spawn(
1662 stream::iter(iter::once(Ok(last_precommit)))
1663 .forward(round_sink)
1664 .map(|r| r.unwrap()),
1665 )
1666 .unwrap();
1667
1668 let voter = Voter::new(
1670 env.clone(),
1671 voters,
1672 global_comms,
1673 1,
1674 last_round_votes,
1675 last_finalized,
1676 last_finalized,
1677 );
1678
1679 pool.spawner()
1680 .spawn(voter.map_err(|_| panic!("Error voting")).map(|_| ()))
1681 .unwrap();
1682 pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1683
1684 let (round_stream, _) = network.make_round_comms(3, Id(1000));
1688 pool.run_until(
1689 round_stream
1690 .skip_while(move |v| {
1691 let v = v.as_ref().unwrap();
1692 if let Message::Prevote(_) = v.message {
1693 future::ready(v.id != local_id)
1694 } else {
1695 future::ready(true)
1696 }
1697 })
1698 .into_future()
1699 .map(|_| ()),
1700 );
1701
1702 assert_eq!(outer_env.last_completed_and_concluded(), (2, 1));
1703 }
1704}