finality_grandpa/voter/
mod.rs

1// Copyright 2018-2019 Parity Technologies (UK) Ltd
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A voter in GRANDPA. This transitions between rounds and casts votes.
16//!
17//! Voters rely on some external context to function:
18//!   - setting timers to cast votes.
19//!   - incoming vote streams.
20//!   - providing voter weights.
21//!   - getting the local voter id.
22//!
23//!  The local voter id is used to check whether to cast votes for a given
24//!  round. If no local id is defined or if it's not part of the voter set then
25//!  votes will not be pushed to the sink. The protocol state machine still
26//!  transitions state as if the votes had been pushed out.
27
28use futures::{
29	channel::mpsc::{self, UnboundedReceiver},
30	prelude::*,
31	ready,
32};
33#[cfg(feature = "std")]
34use log::trace;
35
36use parking_lot::Mutex;
37
38use std::{
39	collections::VecDeque,
40	hash::Hash,
41	pin::Pin,
42	sync::Arc,
43	task::{Context, Poll},
44};
45
46use crate::{
47	round::State as RoundState, validate_commit, voter_set::VoterSet, weights::VoteWeight,
48	BlockNumberOps, CatchUp, Chain, Commit, CommitValidationResult, CompactCommit, Equivocation,
49	HistoricalVotes, Message, Precommit, Prevote, PrimaryPropose, SignedMessage, LOG_TARGET,
50};
51use past_rounds::PastRounds;
52use voting_round::{State as VotingRoundState, VotingRound};
53
54mod past_rounds;
55mod voting_round;
56
57/// Necessary environment for a voter.
58///
59/// This encapsulates the database and networking layers of the chain.
60pub trait Environment<H: Eq, N: BlockNumberOps>: Chain<H, N> {
61	/// Associated timer type for the environment. See also [`Self::round_data`] and
62	/// [`Self::round_commit_timer`].
63	type Timer: Future<Output = Result<(), Self::Error>> + Unpin;
64	/// Associated future type for the environment used when asynchronously computing the
65	/// best chain to vote on. See also [`Self::best_chain_containing`].
66	type BestChain: Future<Output = Result<Option<(H, N)>, Self::Error>> + Send + Unpin;
67	/// The associated Id for the Environment.
68	type Id: Clone + Eq + Ord + std::fmt::Debug;
69	/// The associated Signature type for the Environment.
70	type Signature: Eq + Clone;
71	/// The input stream used to communicate with the outside world.
72	type In: Stream<Item = Result<SignedMessage<H, N, Self::Signature, Self::Id>, Self::Error>>
73		+ Unpin;
74	/// The output stream used to communicate with the outside world.
75	type Out: Sink<Message<H, N>, Error = Self::Error> + Unpin;
76	/// The associated Error type.
77	type Error: From<crate::Error> + ::std::error::Error;
78
79	/// Return a future that will resolve to the hash of the best block whose chain
80	/// contains the given block hash, even if that block is `base` itself.
81	///
82	/// If `base` is unknown the future outputs `None`.
83	fn best_chain_containing(&self, base: H) -> Self::BestChain;
84
85	/// Produce data necessary to start a round of voting. This may also be called
86	/// with the round number of the most recently completed round, in which case
87	/// it should yield a valid input stream.
88	///
89	/// The input stream should provide messages which correspond to known blocks
90	/// only.
91	///
92	/// The voting logic will push unsigned messages over-eagerly into the
93	/// output stream. It is the job of this stream to determine if those messages
94	/// should be sent (for example, if the process actually controls a permissioned key)
95	/// and then to sign the message, multicast it to peers, and schedule it to be
96	/// returned by the `In` stream.
97	///
98	/// This allows the voting logic to maintain the invariant that only incoming messages
99	/// may alter the state, and the logic remains the same regardless of whether a node
100	/// is a regular voter, the proposer, or simply an observer.
101	///
102	/// Furthermore, this means that actual logic of creating and verifying
103	/// signatures is flexible and can be maintained outside this crate.
104	fn round_data(&self, round: u64) -> RoundData<Self::Id, Self::Timer, Self::In, Self::Out>;
105
106	/// Return a timer that will be used to delay the broadcast of a commit
107	/// message. This delay should not be static to minimize the amount of
108	/// commit messages that are sent (e.g. random value in [0, 1] seconds).
109	fn round_commit_timer(&self) -> Self::Timer;
110
111	/// Note that we've done a primary proposal in the given round.
112	fn proposed(&self, round: u64, propose: PrimaryPropose<H, N>) -> Result<(), Self::Error>;
113
114	/// Note that we have prevoted in the given round.
115	fn prevoted(&self, round: u64, prevote: Prevote<H, N>) -> Result<(), Self::Error>;
116
117	/// Note that we have precommitted in the given round.
118	fn precommitted(&self, round: u64, precommit: Precommit<H, N>) -> Result<(), Self::Error>;
119
120	/// Note that a round is completed. This is called when a round has been
121	/// voted in and the next round can start. The round may continue to be run
122	/// in the background until _concluded_.
123	/// Should return an error when something fatal occurs.
124	fn completed(
125		&self,
126		round: u64,
127		state: RoundState<H, N>,
128		base: (H, N),
129		votes: &HistoricalVotes<H, N, Self::Signature, Self::Id>,
130	) -> Result<(), Self::Error>;
131
132	/// Note that a round has concluded. This is called when a round has been
133	/// `completed` and additionally, the round's estimate has been finalized.
134	///
135	/// There may be more votes than when `completed`, and it is the responsibility
136	/// of the `Environment` implementation to deduplicate. However, the caller guarantees
137	/// that the votes passed to `completed` for this round are a prefix of the votes passed here.
138	fn concluded(
139		&self,
140		round: u64,
141		state: RoundState<H, N>,
142		base: (H, N),
143		votes: &HistoricalVotes<H, N, Self::Signature, Self::Id>,
144	) -> Result<(), Self::Error>;
145
146	/// Called when a block should be finalized.
147	// TODO: make this a future that resolves when it's e.g. written to disk?
148	fn finalize_block(
149		&self,
150		hash: H,
151		number: N,
152		round: u64,
153		commit: Commit<H, N, Self::Signature, Self::Id>,
154	) -> Result<(), Self::Error>;
155
156	/// Note that an equivocation in prevotes has occurred.
157	fn prevote_equivocation(
158		&self,
159		round: u64,
160		equivocation: Equivocation<Self::Id, Prevote<H, N>, Self::Signature>,
161	);
162	/// Note that an equivocation in precommits has occurred.
163	fn precommit_equivocation(
164		&self,
165		round: u64,
166		equivocation: Equivocation<Self::Id, Precommit<H, N>, Self::Signature>,
167	);
168}
169
170/// Communication between nodes that is not round-localized.
171#[derive(Debug, Clone, PartialEq, Eq)]
172pub enum CommunicationOut<H, N, S, Id> {
173	/// A commit message.
174	Commit(u64, Commit<H, N, S, Id>),
175}
176
177/// The outcome of processing a commit.
178#[derive(Debug, Clone, PartialEq, Eq)]
179pub enum CommitProcessingOutcome {
180	/// It was beneficial to process this commit.
181	Good(GoodCommit),
182	/// It wasn't beneficial to process this commit. We wasted resources.
183	Bad(BadCommit),
184}
185
186#[cfg(any(test, feature = "test-helpers"))]
187impl CommitProcessingOutcome {
188	/// Returns a `Good` instance of commit processing outcome's opaque type. Useful for testing.
189	pub fn good() -> CommitProcessingOutcome {
190		CommitProcessingOutcome::Good(GoodCommit::new())
191	}
192
193	/// Returns a `Bad` instance of commit processing outcome's opaque type. Useful for testing.
194	pub fn bad() -> CommitProcessingOutcome {
195		CommitProcessingOutcome::Bad(CommitValidationResult::default().into())
196	}
197}
198
199/// The result of processing for a good commit.
200#[derive(Debug, Clone, PartialEq, Eq)]
201pub struct GoodCommit {
202	_priv: (), // lets us add stuff without breaking API.
203}
204
205impl GoodCommit {
206	pub(crate) fn new() -> Self {
207		GoodCommit { _priv: () }
208	}
209}
210
211/// The result of processing for a bad commit
212#[derive(Debug, Clone, PartialEq, Eq)]
213pub struct BadCommit {
214	_priv: (), // lets us add stuff without breaking API.
215	num_precommits: usize,
216	num_duplicated_precommits: usize,
217	num_equivocations: usize,
218	num_invalid_voters: usize,
219}
220
221impl BadCommit {
222	/// Get the number of precommits
223	pub fn num_precommits(&self) -> usize {
224		self.num_precommits
225	}
226
227	/// Get the number of duplicated precommits
228	pub fn num_duplicated(&self) -> usize {
229		self.num_duplicated_precommits
230	}
231
232	/// Get the number of equivocations in the precommits
233	pub fn num_equivocations(&self) -> usize {
234		self.num_equivocations
235	}
236
237	/// Get the number of invalid voters in the precommits
238	pub fn num_invalid_voters(&self) -> usize {
239		self.num_invalid_voters
240	}
241}
242
243impl From<CommitValidationResult> for BadCommit {
244	fn from(r: CommitValidationResult) -> Self {
245		BadCommit {
246			num_precommits: r.num_precommits,
247			num_duplicated_precommits: r.num_duplicated_precommits,
248			num_equivocations: r.num_equivocations,
249			num_invalid_voters: r.num_invalid_voters,
250			_priv: (),
251		}
252	}
253}
254
255/// The outcome of processing a catch up.
256#[derive(Debug, Clone, PartialEq, Eq)]
257pub enum CatchUpProcessingOutcome {
258	/// It was beneficial to process this catch up.
259	Good(GoodCatchUp),
260	/// It wasn't beneficial to process this catch up, it is invalid and we
261	/// wasted resources.
262	Bad(BadCatchUp),
263	/// The catch up wasn't processed because it is useless, e.g. it is for a
264	/// round lower than we're currently in.
265	Useless,
266}
267
268#[cfg(any(test, feature = "test-helpers"))]
269impl CatchUpProcessingOutcome {
270	/// Returns a `Bad` instance of catch up processing outcome's opaque type. Useful for testing.
271	pub fn bad() -> CatchUpProcessingOutcome {
272		CatchUpProcessingOutcome::Bad(BadCatchUp::new())
273	}
274
275	/// Returns a `Good` instance of catch up processing outcome's opaque type. Useful for testing.
276	pub fn good() -> CatchUpProcessingOutcome {
277		CatchUpProcessingOutcome::Good(GoodCatchUp::new())
278	}
279}
280
281/// The result of processing for a good catch up.
282#[derive(Debug, Clone, PartialEq, Eq)]
283pub struct GoodCatchUp {
284	_priv: (), // lets us add stuff without breaking API.
285}
286
287impl GoodCatchUp {
288	pub(crate) fn new() -> Self {
289		GoodCatchUp { _priv: () }
290	}
291}
292
293/// The result of processing for a bad catch up.
294#[derive(Debug, Clone, PartialEq, Eq)]
295pub struct BadCatchUp {
296	_priv: (), // lets us add stuff without breaking API.
297}
298
299impl BadCatchUp {
300	pub(crate) fn new() -> Self {
301		BadCatchUp { _priv: () }
302	}
303}
304
305/// Callback used to pass information about the outcome of importing a given
306/// message (e.g. vote, commit, catch up). Useful to propagate data to the
307/// network after making sure the import is successful.
308pub enum Callback<O> {
309	/// Default value.
310	Blank,
311	/// Callback to execute given a processing outcome.
312	Work(Box<dyn FnMut(O) + Send>),
313}
314
315#[cfg(any(test, feature = "test-helpers"))]
316impl<O> Clone for Callback<O> {
317	fn clone(&self) -> Self {
318		Callback::Blank
319	}
320}
321
322impl<O> Callback<O> {
323	/// Do the work associated with the callback, if any.
324	pub fn run(&mut self, o: O) {
325		match self {
326			Callback::Blank => {},
327			Callback::Work(cb) => cb(o),
328		}
329	}
330}
331
332/// Communication between nodes that is not round-localized.
333#[cfg_attr(any(test, feature = "test-helpers"), derive(Clone))]
334pub enum CommunicationIn<H, N, S, Id> {
335	/// A commit message.
336	Commit(u64, CompactCommit<H, N, S, Id>, Callback<CommitProcessingOutcome>),
337	/// A catch up message.
338	CatchUp(CatchUp<H, N, S, Id>, Callback<CatchUpProcessingOutcome>),
339}
340
341impl<H, N, S, Id> Unpin for CommunicationIn<H, N, S, Id> {}
342
343/// Data necessary to participate in a round.
344pub struct RoundData<Id, Timer, Input, Output> {
345	/// Local voter id (if any.)
346	pub voter_id: Option<Id>,
347	/// Timer before prevotes can be cast. This should be Start + 2T
348	/// where T is the gossip time estimate.
349	pub prevote_timer: Timer,
350	/// Timer before precommits can be cast. This should be Start + 4T
351	pub precommit_timer: Timer,
352	/// Incoming messages.
353	pub incoming: Input,
354	/// Outgoing messages.
355	pub outgoing: Output,
356}
357
358struct Buffered<S, I> {
359	inner: S,
360	buffer: VecDeque<I>,
361}
362
363impl<S: Sink<I> + Unpin, I> Buffered<S, I> {
364	fn new(inner: S) -> Buffered<S, I> {
365		Buffered { buffer: VecDeque::new(), inner }
366	}
367
368	// push an item into the buffered sink.
369	// the sink _must_ be driven to completion with `poll` afterwards.
370	fn push(&mut self, item: I) {
371		self.buffer.push_back(item);
372	}
373
374	// returns ready when the sink and the buffer are completely flushed.
375	fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), S::Error>> {
376		let polled = self.schedule_all(cx)?;
377
378		match polled {
379			Poll::Ready(()) => Sink::poll_flush(Pin::new(&mut self.inner), cx),
380			Poll::Pending => {
381				ready!(Sink::poll_flush(Pin::new(&mut self.inner), cx))?;
382				Poll::Pending
383			},
384		}
385	}
386
387	fn schedule_all(&mut self, cx: &mut Context) -> Poll<Result<(), S::Error>> {
388		while !self.buffer.is_empty() {
389			ready!(Sink::poll_ready(Pin::new(&mut self.inner), cx))?;
390
391			let item = self
392				.buffer
393				.pop_front()
394				.expect("we checked self.buffer.is_empty() just above; qed");
395			Sink::start_send(Pin::new(&mut self.inner), item)?;
396		}
397
398		Poll::Ready(Ok(()))
399	}
400}
401
402type FinalizedNotification<H, N, E> =
403	(H, N, u64, Commit<H, N, <E as Environment<H, N>>::Signature, <E as Environment<H, N>>::Id>);
404
405// Instantiates the given last round, to be backgrounded until its estimate is finalized.
406//
407// This round must be completable based on the passed votes (and if not, `None` will be returned),
408// but it may be the case that there are some more votes to propagate in order to push
409// the estimate backwards and conclude the round (i.e. finalize its estimate).
410//
411// may only be called with non-zero last round.
412fn instantiate_last_round<H, N, E: Environment<H, N>>(
413	voters: VoterSet<E::Id>,
414	last_round_votes: Vec<SignedMessage<H, N, E::Signature, E::Id>>,
415	last_round_number: u64,
416	last_round_base: (H, N),
417	finalized_sender: mpsc::UnboundedSender<FinalizedNotification<H, N, E>>,
418	env: Arc<E>,
419) -> Option<VotingRound<H, N, E>>
420where
421	H: Clone + Eq + Ord + ::std::fmt::Debug,
422	N: Copy + BlockNumberOps + ::std::fmt::Debug,
423{
424	let last_round_tracker = crate::round::Round::new(crate::round::RoundParams {
425		voters,
426		base: last_round_base,
427		round_number: last_round_number,
428	});
429
430	// start as completed so we don't cast votes.
431	let mut last_round = VotingRound::completed(last_round_tracker, finalized_sender, None, env);
432
433	for vote in last_round_votes {
434		// bail if any votes are bad.
435		last_round.handle_vote(vote).ok()?;
436	}
437
438	if last_round.round_state().completable {
439		Some(last_round)
440	} else {
441		None
442	}
443}
444
445// The inner state of a voter aggregating the currently running round state
446// (i.e. best and background rounds). This state exists separately since it's
447// useful to wrap in a `Arc<Mutex<_>>` for sharing.
448struct InnerVoterState<H, N, E>
449where
450	H: Clone + Ord + std::fmt::Debug,
451	N: BlockNumberOps,
452	E: Environment<H, N>,
453{
454	best_round: VotingRound<H, N, E>,
455	past_rounds: PastRounds<H, N, E>,
456}
457
458/// A future that maintains and multiplexes between different rounds,
459/// and caches votes.
460///
461/// This voter also implements the commit protocol.
462/// The commit protocol allows a node to broadcast a message that finalizes a
463/// given block and includes a set of precommits as proof.
464///
465/// - When a round is completable and we precommitted we start a commit timer
466/// and start accepting commit messages;
467/// - When we receive a commit message if it targets a block higher than what
468/// we've finalized we validate it and import its precommits if valid;
469/// - When our commit timer triggers we check if we've received any commit
470/// message for a block equal to what we've finalized, if we haven't then we
471/// broadcast a commit.
472///
473/// Additionally, we also listen to commit messages from rounds that aren't
474/// currently running, we validate the commit and dispatch a finalization
475/// notification (if any) to the environment.
476pub struct Voter<H, N, E: Environment<H, N>, GlobalIn, GlobalOut>
477where
478	H: Clone + Eq + Ord + ::std::fmt::Debug,
479	N: Copy + BlockNumberOps + ::std::fmt::Debug,
480	GlobalIn: Stream<Item = Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
481	GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error = E::Error> + Unpin,
482{
483	env: Arc<E>,
484	voters: VoterSet<E::Id>,
485	inner: Arc<Mutex<InnerVoterState<H, N, E>>>,
486	finalized_notifications: UnboundedReceiver<FinalizedNotification<H, N, E>>,
487	last_finalized_number: N,
488	global_in: GlobalIn,
489	global_out: Buffered<GlobalOut, CommunicationOut<H, N, E::Signature, E::Id>>,
490	// the commit protocol might finalize further than the current round (if we're
491	// behind), we keep track of last finalized in round so we don't violate any
492	// assumptions from round-to-round.
493	last_finalized_in_rounds: (H, N),
494}
495
496impl<'a, H: 'a, N, E: 'a, GlobalIn, GlobalOut> Voter<H, N, E, GlobalIn, GlobalOut>
497where
498	H: Clone + Ord + ::std::fmt::Debug + Sync + Send,
499	N: BlockNumberOps + Sync + Send,
500	E: Environment<H, N> + Sync + Send,
501	GlobalIn: Stream<Item = Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
502	GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error = E::Error> + Unpin,
503{
504	/// Returns an object allowing to query the voter state.
505	pub fn voter_state(&self) -> Box<dyn VoterState<E::Id> + 'a + Send + Sync>
506	where
507		<E as Environment<H, N>>::Signature: Send,
508		<E as Environment<H, N>>::Id: Hash + Send,
509		<E as Environment<H, N>>::Timer: Send,
510		<E as Environment<H, N>>::Out: Send,
511		<E as Environment<H, N>>::In: Send,
512	{
513		Box::new(SharedVoterState(self.inner.clone()))
514	}
515}
516
517impl<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> Voter<H, N, E, GlobalIn, GlobalOut>
518where
519	H: Clone + Eq + Ord + ::std::fmt::Debug,
520	N: Copy + BlockNumberOps + ::std::fmt::Debug,
521	GlobalIn: Stream<Item = Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
522	GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error = E::Error> + Unpin,
523{
524	/// Create new `Voter` tracker with given round number and base block.
525	///
526	/// Provide data about the last completed round. If there is no
527	/// known last completed round, the genesis state (round number 0, no votes, genesis base),
528	/// should be provided. When available, all messages required to complete
529	/// the last round should be provided.
530	///
531	/// The input stream for commit messages should provide commits which
532	/// correspond to known blocks only (including all its precommits). It
533	/// is also responsible for validating the signature data in commit
534	/// messages.
535	pub fn new(
536		env: Arc<E>,
537		voters: VoterSet<E::Id>,
538		global_comms: (GlobalIn, GlobalOut),
539		last_round_number: u64,
540		last_round_votes: Vec<SignedMessage<H, N, E::Signature, E::Id>>,
541		last_round_base: (H, N),
542		last_finalized: (H, N),
543	) -> Self {
544		let (finalized_sender, finalized_notifications) = mpsc::unbounded();
545		let last_finalized_number = last_finalized.1;
546
547		// re-start the last round and queue all messages to be processed on first poll.
548		// keep it in the background so we can push the estimate backwards until finalized
549		// by actually waiting for more messages.
550		let mut past_rounds = PastRounds::new();
551		let mut last_round_state =
552			crate::bridge_state::bridge_state(RoundState::genesis(last_round_base.clone())).1;
553
554		if last_round_number > 0 {
555			let maybe_completed_last_round = instantiate_last_round(
556				voters.clone(),
557				last_round_votes,
558				last_round_number,
559				last_round_base,
560				finalized_sender.clone(),
561				env.clone(),
562			);
563
564			if let Some(mut last_round) = maybe_completed_last_round {
565				last_round_state = last_round.bridge_state();
566				past_rounds.push(&*env, last_round);
567			}
568
569			// when there is no information about the last completed round,
570			// the best we can do is assume that the estimate == the given base
571			// and that it is finalized. This is always the case for the genesis
572			// round of a set.
573		}
574
575		let best_round = VotingRound::new(
576			last_round_number + 1,
577			voters.clone(),
578			last_finalized.clone(),
579			Some(last_round_state),
580			finalized_sender,
581			env.clone(),
582		);
583
584		let (global_in, global_out) = global_comms;
585
586		let inner = Arc::new(Mutex::new(InnerVoterState { best_round, past_rounds }));
587
588		Voter {
589			env,
590			voters,
591			inner,
592			finalized_notifications,
593			last_finalized_number,
594			last_finalized_in_rounds: last_finalized,
595			global_in,
596			global_out: Buffered::new(global_out),
597		}
598	}
599
600	fn prune_background_rounds(&mut self, cx: &mut Context) -> Result<(), E::Error> {
601		{
602			let mut inner = self.inner.lock();
603
604			// Do work on all background rounds, broadcasting any commits generated.
605			while let Poll::Ready(Some(item)) =
606				Stream::poll_next(Pin::new(&mut inner.past_rounds), cx)
607			{
608				let (number, commit) = item?;
609				self.global_out.push(CommunicationOut::Commit(number, commit));
610			}
611		}
612
613		while let Poll::Ready(res) =
614			Stream::poll_next(Pin::new(&mut self.finalized_notifications), cx)
615		{
616			let inner = self.inner.clone();
617			let mut inner = inner.lock();
618
619			let (f_hash, f_num, round, commit) =
620				res.expect("one sender always kept alive in self.best_round; qed");
621
622			inner.past_rounds.update_finalized(f_num);
623
624			if self.set_last_finalized_number(f_num) {
625				self.env.finalize_block(f_hash.clone(), f_num, round, commit)?;
626			}
627
628			if f_num > self.last_finalized_in_rounds.1 {
629				self.last_finalized_in_rounds = (f_hash, f_num);
630			}
631		}
632
633		Ok(())
634	}
635
636	/// Process all incoming messages from other nodes.
637	///
638	/// Commit messages are handled with extra care. If a commit message references
639	/// a currently backgrounded round, we send it to that round so that when we commit
640	/// on that round, our commit message will be informed by those that we've seen.
641	///
642	/// Otherwise, we will simply handle the commit and issue a finalization command
643	/// to the environment.
644	fn process_incoming(&mut self, cx: &mut Context) -> Result<(), E::Error> {
645		while let Poll::Ready(Some(item)) = Stream::poll_next(Pin::new(&mut self.global_in), cx) {
646			match item? {
647				CommunicationIn::Commit(round_number, commit, mut process_commit_outcome) => {
648					trace!(
649						target: LOG_TARGET,
650						"Got commit for round_number {:?}: target_number: {:?}, target_hash: {:?}",
651						round_number,
652						commit.target_number,
653						commit.target_hash,
654					);
655
656					let commit: Commit<_, _, _, _> = commit.into();
657
658					let mut inner = self.inner.lock();
659
660					// if the commit is for a background round dispatch to round committer.
661					// that returns Some if there wasn't one.
662					if let Some(commit) = inner.past_rounds.import_commit(round_number, commit) {
663						// otherwise validate the commit and signal the finalized block from the
664						// commit to the environment (if valid and higher than current finalized)
665						let validation_result = validate_commit(&commit, &self.voters, &*self.env)?;
666
667						if validation_result.is_valid() {
668							// this can't be moved to a function because the compiler
669							// will complain about getting two mutable borrows to self
670							// (due to the call to `self.rounds.get_mut`).
671							let last_finalized_number = &mut self.last_finalized_number;
672
673							// clean up any background rounds
674							inner.past_rounds.update_finalized(commit.target_number);
675
676							if commit.target_number > *last_finalized_number {
677								*last_finalized_number = commit.target_number;
678								self.env.finalize_block(
679									commit.target_hash.clone(),
680									commit.target_number,
681									round_number,
682									commit,
683								)?;
684							}
685
686							process_commit_outcome
687								.run(CommitProcessingOutcome::Good(GoodCommit::new()));
688						} else {
689							// Failing validation of a commit is bad.
690							process_commit_outcome.run(CommitProcessingOutcome::Bad(
691								BadCommit::from(validation_result),
692							));
693						}
694					} else {
695						// Import to backgrounded round is good.
696						process_commit_outcome
697							.run(CommitProcessingOutcome::Good(GoodCommit::new()));
698					}
699				},
700				CommunicationIn::CatchUp(catch_up, mut process_catch_up_outcome) => {
701					trace!(
702						target: LOG_TARGET,
703						"Got catch-up message for round {}",
704						catch_up.round_number
705					);
706
707					let mut inner = self.inner.lock();
708
709					let round = if let Some(round) = validate_catch_up(
710						catch_up,
711						&*self.env,
712						&self.voters,
713						inner.best_round.round_number(),
714					) {
715						round
716					} else {
717						process_catch_up_outcome
718							.run(CatchUpProcessingOutcome::Bad(BadCatchUp::new()));
719						return Ok(())
720					};
721
722					let state = round.state();
723
724					// beyond this point, we set this round to the past and
725					// start voting in the next round.
726					let mut just_completed = VotingRound::completed(
727						round,
728						inner.best_round.finalized_sender(),
729						None,
730						self.env.clone(),
731					);
732
733					let new_best = VotingRound::new(
734						just_completed.round_number() + 1,
735						self.voters.clone(),
736						self.last_finalized_in_rounds.clone(),
737						Some(just_completed.bridge_state()),
738						inner.best_round.finalized_sender(),
739						self.env.clone(),
740					);
741
742					// update last-finalized in rounds _after_ starting new round.
743					// otherwise the base could be too eagerly set forward.
744					if let Some((f_hash, f_num)) = state.finalized.clone() {
745						if f_num > self.last_finalized_in_rounds.1 {
746							self.last_finalized_in_rounds = (f_hash, f_num);
747						}
748					}
749
750					self.env.completed(
751						just_completed.round_number(),
752						just_completed.round_state(),
753						just_completed.dag_base(),
754						just_completed.historical_votes(),
755					)?;
756
757					inner.past_rounds.push(&*self.env, just_completed);
758
759					let old_best = std::mem::replace(&mut inner.best_round, new_best);
760					inner.past_rounds.push(&*self.env, old_best);
761
762					process_catch_up_outcome
763						.run(CatchUpProcessingOutcome::Good(GoodCatchUp::new()));
764				},
765			}
766		}
767
768		Ok(())
769	}
770
771	// process the logic of the best round.
772	fn process_best_round(&mut self, cx: &mut Context) -> Poll<Result<(), E::Error>> {
773		// If the current `best_round` is completable and we've already precommitted,
774		// we start a new round at `best_round + 1`.
775		{
776			let mut inner = self.inner.lock();
777
778			let should_start_next = {
779				let completable = match inner.best_round.poll(cx)? {
780					Poll::Ready(()) => true,
781					Poll::Pending => false,
782				};
783
784				// start when we've cast all votes.
785				let precommitted =
786					matches!(inner.best_round.state(), Some(&VotingRoundState::Precommitted));
787
788				completable && precommitted
789			};
790
791			if !should_start_next {
792				return Poll::Pending
793			}
794
795			trace!(
796				target: LOG_TARGET,
797				"Best round at {} has become completable. Starting new best round at {}",
798				inner.best_round.round_number(),
799				inner.best_round.round_number() + 1,
800			);
801		}
802
803		self.completed_best_round()?;
804
805		// round has been updated. so we need to re-poll.
806		self.poll_unpin(cx)
807	}
808
809	fn completed_best_round(&mut self) -> Result<(), E::Error> {
810		let mut inner = self.inner.lock();
811
812		self.env.completed(
813			inner.best_round.round_number(),
814			inner.best_round.round_state(),
815			inner.best_round.dag_base(),
816			inner.best_round.historical_votes(),
817		)?;
818
819		let old_round_number = inner.best_round.round_number();
820
821		let next_round = VotingRound::new(
822			old_round_number + 1,
823			self.voters.clone(),
824			self.last_finalized_in_rounds.clone(),
825			Some(inner.best_round.bridge_state()),
826			inner.best_round.finalized_sender(),
827			self.env.clone(),
828		);
829
830		let old_round = ::std::mem::replace(&mut inner.best_round, next_round);
831		inner.past_rounds.push(&*self.env, old_round);
832		Ok(())
833	}
834
835	fn set_last_finalized_number(&mut self, finalized_number: N) -> bool {
836		let last_finalized_number = &mut self.last_finalized_number;
837		if finalized_number > *last_finalized_number {
838			*last_finalized_number = finalized_number;
839			return true
840		}
841		false
842	}
843}
844
845impl<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> Future for Voter<H, N, E, GlobalIn, GlobalOut>
846where
847	H: Clone + Eq + Ord + ::std::fmt::Debug,
848	N: Copy + BlockNumberOps + ::std::fmt::Debug,
849	GlobalIn: Stream<Item = Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
850	GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error = E::Error> + Unpin,
851{
852	type Output = Result<(), E::Error>;
853
854	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), E::Error>> {
855		self.process_incoming(cx)?;
856		self.prune_background_rounds(cx)?;
857		let _ = self.global_out.poll(cx)?;
858
859		self.process_best_round(cx)
860	}
861}
862
863impl<H, N, E: Environment<H, N>, GlobalIn, GlobalOut> Unpin for Voter<H, N, E, GlobalIn, GlobalOut>
864where
865	H: Clone + Eq + Ord + ::std::fmt::Debug,
866	N: Copy + BlockNumberOps + ::std::fmt::Debug,
867	GlobalIn: Stream<Item = Result<CommunicationIn<H, N, E::Signature, E::Id>, E::Error>> + Unpin,
868	GlobalOut: Sink<CommunicationOut<H, N, E::Signature, E::Id>, Error = E::Error> + Unpin,
869{
870}
871
872/// Trait for querying the state of the voter. Used by `Voter` to return a queryable object
873/// without exposing too many data types.
874pub trait VoterState<Id: Eq + std::hash::Hash> {
875	/// Returns a plain data type, `report::VoterState`, describing the current state
876	/// of the voter relevant to the voting process.
877	fn get(&self) -> report::VoterState<Id>;
878}
879
880/// Contains a number of data transfer objects for reporting data to the outside world.
881pub mod report {
882	use crate::weights::{VoteWeight, VoterWeight};
883	use std::collections::{HashMap, HashSet};
884
885	/// Basic data struct for the state of a round.
886	#[derive(PartialEq, Eq, Clone)]
887	#[cfg_attr(test, derive(Debug))]
888	pub struct RoundState<Id: Eq + std::hash::Hash> {
889		/// Total weight of all votes.
890		pub total_weight: VoterWeight,
891		/// The threshold voter weight.
892		pub threshold_weight: VoterWeight,
893
894		/// Current weight of the prevotes.
895		pub prevote_current_weight: VoteWeight,
896		/// The identities of nodes that have cast prevotes so far.
897		pub prevote_ids: HashSet<Id>,
898
899		/// Current weight of the precommits.
900		pub precommit_current_weight: VoteWeight,
901		/// The identities of nodes that have cast precommits so far.
902		pub precommit_ids: HashSet<Id>,
903	}
904
905	/// Basic data struct for the current state of the voter in a form suitable
906	/// for passing on to other systems.
907	#[derive(PartialEq, Eq)]
908	#[cfg_attr(test, derive(Debug))]
909	pub struct VoterState<Id: Eq + std::hash::Hash> {
910		/// Voting rounds running in the background.
911		pub background_rounds: HashMap<u64, RoundState<Id>>,
912		/// The current best voting round.
913		pub best_round: (u64, RoundState<Id>),
914	}
915}
916
917struct SharedVoterState<H, N, E>(Arc<Mutex<InnerVoterState<H, N, E>>>)
918where
919	H: Clone + Ord + std::fmt::Debug,
920	N: BlockNumberOps,
921	E: Environment<H, N>;
922
923impl<H, N, E> VoterState<E::Id> for SharedVoterState<H, N, E>
924where
925	H: Clone + Eq + Ord + std::fmt::Debug,
926	N: BlockNumberOps,
927	E: Environment<H, N>,
928	<E as Environment<H, N>>::Id: Hash,
929{
930	fn get(&self) -> report::VoterState<E::Id> {
931		let to_round_state = |voting_round: &VotingRound<H, N, E>| {
932			(
933				voting_round.round_number(),
934				report::RoundState {
935					total_weight: voting_round.voters().total_weight(),
936					threshold_weight: voting_round.voters().threshold(),
937					prevote_current_weight: voting_round.prevote_weight(),
938					prevote_ids: voting_round.prevote_ids().collect(),
939					precommit_current_weight: voting_round.precommit_weight(),
940					precommit_ids: voting_round.precommit_ids().collect(),
941				},
942			)
943		};
944
945		let inner = self.0.lock();
946		let best_round = to_round_state(&inner.best_round);
947		let background_rounds = inner.past_rounds.voting_rounds().map(to_round_state).collect();
948
949		report::VoterState { best_round, background_rounds }
950	}
951}
952
953/// Validate the given catch up and return a completed round with all prevotes
954/// and precommits from the catch up imported. If the catch up is invalid `None`
955/// is returned instead.
956fn validate_catch_up<H, N, S, I, E>(
957	catch_up: CatchUp<H, N, S, I>,
958	env: &E,
959	voters: &VoterSet<I>,
960	best_round_number: u64,
961) -> Option<crate::round::Round<I, H, N, S>>
962where
963	H: Clone + Eq + Ord + std::fmt::Debug,
964	N: BlockNumberOps + std::fmt::Debug,
965	S: Clone + Eq,
966	I: Clone + Eq + std::fmt::Debug + Ord,
967	E: Environment<H, N>,
968{
969	if catch_up.round_number <= best_round_number {
970		trace!(target: LOG_TARGET, "Ignoring because best round number is {}", best_round_number);
971
972		return None
973	}
974
975	// check threshold support in prevotes and precommits.
976	{
977		let mut map = std::collections::BTreeMap::new();
978
979		for prevote in &catch_up.prevotes {
980			if !voters.contains(&prevote.id) {
981				trace!(
982					target: LOG_TARGET,
983					"Ignoring invalid catch up, invalid voter: {:?}",
984					prevote.id,
985				);
986
987				return None
988			}
989
990			map.entry(prevote.id.clone()).or_insert((false, false)).0 = true;
991		}
992
993		for precommit in &catch_up.precommits {
994			if !voters.contains(&precommit.id) {
995				trace!(
996					target: LOG_TARGET,
997					"Ignoring invalid catch up, invalid voter: {:?}",
998					precommit.id,
999				);
1000
1001				return None
1002			}
1003
1004			map.entry(precommit.id.clone()).or_insert((false, false)).1 = true;
1005		}
1006
1007		let (pv, pc) = map.into_iter().fold(
1008			(VoteWeight(0), VoteWeight(0)),
1009			|(mut pv, mut pc), (id, (prevoted, precommitted))| {
1010				if let Some(v) = voters.get(&id) {
1011					if prevoted {
1012						pv = pv + v.weight();
1013					}
1014
1015					if precommitted {
1016						pc = pc + v.weight();
1017					}
1018				}
1019
1020				(pv, pc)
1021			},
1022		);
1023
1024		let threshold = voters.threshold();
1025		if pv < threshold || pc < threshold {
1026			trace!(target: LOG_TARGET, "Ignoring invalid catch up, missing voter threshold");
1027
1028			return None
1029		}
1030	}
1031
1032	let mut round = crate::round::Round::new(crate::round::RoundParams {
1033		round_number: catch_up.round_number,
1034		voters: voters.clone(),
1035		base: (catch_up.base_hash.clone(), catch_up.base_number),
1036	});
1037
1038	// import prevotes first.
1039	for crate::SignedPrevote { prevote, id, signature } in catch_up.prevotes {
1040		match round.import_prevote(env, prevote, id, signature) {
1041			Ok(_) => {},
1042			Err(e) => {
1043				trace!(
1044					target: LOG_TARGET,
1045					"Ignoring invalid catch up, error importing prevote: {:?}",
1046					e,
1047				);
1048
1049				return None
1050			},
1051		}
1052	}
1053
1054	// then precommits.
1055	for crate::SignedPrecommit { precommit, id, signature } in catch_up.precommits {
1056		match round.import_precommit(env, precommit, id, signature) {
1057			Ok(_) => {},
1058			Err(e) => {
1059				trace!(
1060					target: LOG_TARGET,
1061					"Ignoring invalid catch up, error importing precommit: {:?}",
1062					e,
1063				);
1064
1065				return None
1066			},
1067		}
1068	}
1069
1070	let state = round.state();
1071	if !state.completable {
1072		return None
1073	}
1074
1075	Some(round)
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080	use super::*;
1081	use crate::{
1082		testing::{
1083			self,
1084			chain::GENESIS_HASH,
1085			environment::{Environment, Id, Signature},
1086		},
1087		weights::{VoteWeight, VoterWeight},
1088		SignedPrecommit,
1089	};
1090	use futures::{executor::LocalPool, task::SpawnExt};
1091	use futures_timer::Delay;
1092	use std::{collections::HashSet, iter, time::Duration};
1093
1094	#[test]
1095	fn talking_to_myself() {
1096		let local_id = Id(5);
1097		let voters = VoterSet::new(std::iter::once((local_id, 100))).unwrap();
1098
1099		let (network, routing_task) = testing::environment::make_network();
1100
1101		let global_comms = network.make_global_comms();
1102		let env = Arc::new(Environment::new(network, local_id));
1103
1104		// initialize chain
1105		let last_finalized = env.with_chain(|chain| {
1106			chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1107			chain.last_finalized()
1108		});
1109
1110		// run voter in background. scheduling it to shut down at the end.
1111		let finalized = env.finalized_stream();
1112		let voter = Voter::new(
1113			env.clone(),
1114			voters,
1115			global_comms,
1116			0,
1117			Vec::new(),
1118			last_finalized,
1119			last_finalized,
1120		);
1121
1122		let mut pool = LocalPool::new();
1123		pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
1124		pool.spawner().spawn(routing_task).unwrap();
1125
1126		// wait for the best block to finalize.
1127		pool.run_until(
1128			finalized
1129				.take_while(|&(_, n, _)| future::ready(n < 6))
1130				.for_each(|_| future::ready(())),
1131		)
1132	}
1133
1134	#[test]
1135	fn finalizing_at_fault_threshold() {
1136		// 10 voters
1137		let voters = VoterSet::new((0..10).map(|i| (Id(i), 1))).expect("nonempty");
1138
1139		let (network, routing_task) = testing::environment::make_network();
1140		let mut pool = LocalPool::new();
1141
1142		// 3 voters offline.
1143		let finalized_streams = (0..7)
1144			.map(|i| {
1145				let local_id = Id(i);
1146				// initialize chain
1147				let env = Arc::new(Environment::new(network.clone(), local_id));
1148				let last_finalized = env.with_chain(|chain| {
1149					chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1150					chain.last_finalized()
1151				});
1152
1153				// run voter in background. scheduling it to shut down at the end.
1154				let finalized = env.finalized_stream();
1155				let voter = Voter::new(
1156					env.clone(),
1157					voters.clone(),
1158					network.make_global_comms(),
1159					0,
1160					Vec::new(),
1161					last_finalized,
1162					last_finalized,
1163				);
1164
1165				pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
1166
1167				// wait for the best block to be finalized by all honest voters
1168				finalized
1169					.take_while(|&(_, n, _)| future::ready(n < 6))
1170					.for_each(|_| future::ready(()))
1171			})
1172			.collect::<Vec<_>>();
1173
1174		pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1175
1176		pool.run_until(future::join_all(finalized_streams.into_iter()));
1177	}
1178
1179	#[test]
1180	fn exposing_voter_state() {
1181		let num_voters = 10;
1182		let voters_online = 7;
1183		let voters = VoterSet::new((0..num_voters).map(|i| (Id(i), 1))).expect("nonempty");
1184
1185		let (network, routing_task) = testing::environment::make_network();
1186		let mut pool = LocalPool::new();
1187
1188		// some voters offline
1189		let (finalized_streams, voter_states): (Vec<_>, Vec<_>) = (0..voters_online)
1190			.map(|i| {
1191				let local_id = Id(i);
1192				// initialize chain
1193				let env = Arc::new(Environment::new(network.clone(), local_id));
1194				let last_finalized = env.with_chain(|chain| {
1195					chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1196					chain.last_finalized()
1197				});
1198
1199				// run voter in background. scheduling it to shut down at the end.
1200				let finalized = env.finalized_stream();
1201				let voter = Voter::new(
1202					env.clone(),
1203					voters.clone(),
1204					network.make_global_comms(),
1205					0,
1206					Vec::new(),
1207					last_finalized,
1208					last_finalized,
1209				);
1210				let voter_state = voter.voter_state();
1211
1212				pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
1213
1214				(
1215					// wait for the best block to be finalized by all honest voters
1216					finalized
1217						.take_while(|&(_, n, _)| future::ready(n < 6))
1218						.for_each(|_| future::ready(())),
1219					voter_state,
1220				)
1221			})
1222			.unzip();
1223
1224		let voter_state = &voter_states[0];
1225		voter_states.iter().all(|vs| vs.get() == voter_state.get());
1226
1227		let expected_round_state = report::RoundState::<Id> {
1228			total_weight: VoterWeight::new(num_voters.into()).expect("nonzero"),
1229			threshold_weight: VoterWeight::new(voters_online.into()).expect("nonzero"),
1230			prevote_current_weight: VoteWeight(0),
1231			prevote_ids: Default::default(),
1232			precommit_current_weight: VoteWeight(0),
1233			precommit_ids: Default::default(),
1234		};
1235
1236		assert_eq!(
1237			voter_state.get(),
1238			report::VoterState {
1239				background_rounds: Default::default(),
1240				best_round: (1, expected_round_state.clone()),
1241			}
1242		);
1243
1244		pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1245		pool.run_until(future::join_all(finalized_streams.into_iter()));
1246
1247		assert_eq!(voter_state.get().best_round, (2, expected_round_state.clone()));
1248	}
1249
1250	#[test]
1251	fn broadcast_commit() {
1252		let local_id = Id(5);
1253		let voters = VoterSet::new([(local_id, 100)].iter().cloned()).expect("nonempty");
1254
1255		let (network, routing_task) = testing::environment::make_network();
1256		let (commits, _) = network.make_global_comms();
1257
1258		let global_comms = network.make_global_comms();
1259		let env = Arc::new(Environment::new(network, local_id));
1260
1261		// initialize chain
1262		let last_finalized = env.with_chain(|chain| {
1263			chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1264			chain.last_finalized()
1265		});
1266
1267		// run voter in background. scheduling it to shut down at the end.
1268		let voter = Voter::new(
1269			env.clone(),
1270			voters.clone(),
1271			global_comms,
1272			0,
1273			Vec::new(),
1274			last_finalized,
1275			last_finalized,
1276		);
1277
1278		let mut pool = LocalPool::new();
1279		pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
1280		pool.spawner().spawn(routing_task).unwrap();
1281
1282		// wait for the node to broadcast a commit message
1283		pool.run_until(commits.take(1).for_each(|_| future::ready(())))
1284	}
1285
1286	#[test]
1287	fn broadcast_commit_only_if_newer() {
1288		let local_id = Id(5);
1289		let test_id = Id(42);
1290		let voters =
1291			VoterSet::new([(local_id, 100), (test_id, 201)].iter().cloned()).expect("nonempty");
1292
1293		let (network, routing_task) = testing::environment::make_network();
1294		let (commits_stream, commits_sink) = network.make_global_comms();
1295		let (round_stream, round_sink) = network.make_round_comms(1, test_id);
1296
1297		let prevote = Message::Prevote(Prevote { target_hash: "E", target_number: 6 });
1298
1299		let precommit = Message::Precommit(Precommit { target_hash: "E", target_number: 6 });
1300
1301		let commit = (
1302			1,
1303			Commit {
1304				target_hash: "E",
1305				target_number: 6,
1306				precommits: vec![SignedPrecommit {
1307					precommit: Precommit { target_hash: "E", target_number: 6 },
1308					signature: Signature(test_id.0),
1309					id: test_id,
1310				}],
1311			},
1312		);
1313
1314		let global_comms = network.make_global_comms();
1315		let env = Arc::new(Environment::new(network, local_id));
1316
1317		// initialize chain
1318		let last_finalized = env.with_chain(|chain| {
1319			chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1320			chain.last_finalized()
1321		});
1322
1323		// run voter in background. scheduling it to shut down at the end.
1324		let voter = Voter::new(
1325			env.clone(),
1326			voters.clone(),
1327			global_comms,
1328			0,
1329			Vec::new(),
1330			last_finalized,
1331			last_finalized,
1332		);
1333
1334		let mut pool = LocalPool::new();
1335		pool.spawner().spawn(voter.map(|v| v.expect("Error voting: {:?}"))).unwrap();
1336		pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1337
1338		pool.spawner()
1339			.spawn(
1340				round_stream
1341					.into_future()
1342					.then(|(value, stream)| {
1343						// wait for a prevote
1344						assert!(match value {
1345							Some(Ok(SignedMessage {
1346								message: Message::Prevote(_),
1347								id: Id(5),
1348								..
1349							})) => true,
1350							_ => false,
1351						});
1352						let votes = vec![prevote, precommit].into_iter().map(Result::Ok);
1353						futures::stream::iter(votes).forward(round_sink).map(|_| stream) // send our prevote
1354					})
1355					.then(|stream| {
1356						stream
1357							.take_while(|value| match value {
1358								// wait for a precommit
1359								Ok(SignedMessage {
1360									message: Message::Precommit(_),
1361									id: Id(5),
1362									..
1363								}) => future::ready(false),
1364								_ => future::ready(true),
1365							})
1366							.for_each(|_| future::ready(()))
1367					})
1368					.then(move |_| {
1369						// send our commit
1370						stream::iter(iter::once(Ok(CommunicationOut::Commit(commit.0, commit.1))))
1371							.forward(commits_sink)
1372					})
1373					.map(|_| ()),
1374			)
1375			.unwrap();
1376
1377		let res = pool.run_until(
1378			// wait for the first commit (ours)
1379			commits_stream.into_future().then(|(_, stream)| {
1380				// the second commit should never arrive
1381				let await_second = stream.take(1).for_each(|_| future::ready(()));
1382				let delay = Delay::new(Duration::from_millis(500));
1383				future::select(await_second, delay)
1384			}),
1385		);
1386
1387		match res {
1388			future::Either::Right(((), _work)) => {
1389				// the future timed out as expected
1390			},
1391			_ => panic!("Unexpected result"),
1392		}
1393	}
1394
1395	#[test]
1396	fn import_commit_for_any_round() {
1397		let local_id = Id(5);
1398		let test_id = Id(42);
1399		let voters =
1400			VoterSet::new([(local_id, 100), (test_id, 201)].iter().cloned()).expect("nonempty");
1401
1402		let (network, routing_task) = testing::environment::make_network();
1403		let (_, commits_sink) = network.make_global_comms();
1404
1405		// this is a commit for a previous round
1406		let commit = Commit {
1407			target_hash: "E",
1408			target_number: 6,
1409			precommits: vec![SignedPrecommit {
1410				precommit: Precommit { target_hash: "E", target_number: 6 },
1411				signature: Signature(test_id.0),
1412				id: test_id,
1413			}],
1414		};
1415
1416		let global_comms = network.make_global_comms();
1417		let env = Arc::new(Environment::new(network, local_id));
1418
1419		// initialize chain
1420		let last_finalized = env.with_chain(|chain| {
1421			chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1422			chain.last_finalized()
1423		});
1424
1425		// run voter in background.
1426		let voter = Voter::new(
1427			env.clone(),
1428			voters.clone(),
1429			global_comms,
1430			1,
1431			Vec::new(),
1432			last_finalized,
1433			last_finalized,
1434		);
1435
1436		let mut pool = LocalPool::new();
1437		pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
1438		pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1439
1440		// Send the commit message.
1441		pool.spawner()
1442			.spawn(
1443				stream::iter(iter::once(Ok(CommunicationOut::Commit(0, commit.clone()))))
1444					.forward(commits_sink)
1445					.map(|_| ()),
1446			)
1447			.unwrap();
1448
1449		// Wait for the commit message to be processed.
1450		let finalized = pool
1451			.run_until(env.finalized_stream().into_future().map(move |(msg, _)| msg.unwrap().2));
1452
1453		assert_eq!(finalized, commit);
1454	}
1455
1456	#[test]
1457	fn skips_to_latest_round_after_catch_up() {
1458		// 3 voters
1459		let voters = VoterSet::new((0..3).map(|i| (Id(i), 1u64))).expect("nonempty");
1460		let total_weight = voters.total_weight();
1461		let threshold_weight = voters.threshold();
1462		let voter_ids: HashSet<Id> = (0..3).map(|i| Id(i)).collect();
1463
1464		let (network, routing_task) = testing::environment::make_network();
1465		let mut pool = LocalPool::new();
1466
1467		pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1468
1469		// initialize unsynced voter at round 0
1470		let (env, unsynced_voter) = {
1471			let local_id = Id(4);
1472
1473			let env = Arc::new(Environment::new(network.clone(), local_id));
1474			let last_finalized = env.with_chain(|chain| {
1475				chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1476				chain.last_finalized()
1477			});
1478
1479			let voter = Voter::new(
1480				env.clone(),
1481				voters.clone(),
1482				network.make_global_comms(),
1483				0,
1484				Vec::new(),
1485				last_finalized,
1486				last_finalized,
1487			);
1488
1489			(env, voter)
1490		};
1491
1492		let pv = |id| crate::SignedPrevote {
1493			prevote: crate::Prevote { target_hash: "C", target_number: 4 },
1494			id: Id(id),
1495			signature: Signature(99),
1496		};
1497
1498		let pc = |id| crate::SignedPrecommit {
1499			precommit: crate::Precommit { target_hash: "C", target_number: 4 },
1500			id: Id(id),
1501			signature: Signature(99),
1502		};
1503
1504		// send in a catch-up message for round 5.
1505		network.send_message(CommunicationIn::CatchUp(
1506			CatchUp {
1507				base_number: 1,
1508				base_hash: GENESIS_HASH,
1509				round_number: 5,
1510				prevotes: vec![pv(0), pv(1), pv(2)],
1511				precommits: vec![pc(0), pc(1), pc(2)],
1512			},
1513			Callback::Blank,
1514		));
1515
1516		let voter_state = unsynced_voter.voter_state();
1517		assert_eq!(voter_state.get().background_rounds.get(&5), None);
1518
1519		// spawn the voter in the background
1520		pool.spawner().spawn(unsynced_voter.map(|_| ())).unwrap();
1521
1522		// wait until it's caught up, it should skip to round 6 and send a
1523		// finality notification for the block that was finalized by catching
1524		// up.
1525		let caught_up = future::poll_fn(|_| {
1526			if voter_state.get().best_round.0 == 6 {
1527				Poll::Ready(())
1528			} else {
1529				Poll::Pending
1530			}
1531		});
1532
1533		let finalized = env.finalized_stream().take(1).into_future();
1534
1535		pool.run_until(caught_up.then(|_| finalized.map(|_| ())));
1536
1537		assert_eq!(
1538			voter_state.get().best_round,
1539			(
1540				6,
1541				report::RoundState::<Id> {
1542					total_weight,
1543					threshold_weight,
1544					prevote_current_weight: VoteWeight(0),
1545					prevote_ids: Default::default(),
1546					precommit_current_weight: VoteWeight(0),
1547					precommit_ids: Default::default(),
1548				}
1549			)
1550		);
1551
1552		assert_eq!(
1553			voter_state.get().background_rounds.get(&5),
1554			Some(&report::RoundState::<Id> {
1555				total_weight,
1556				threshold_weight,
1557				prevote_current_weight: VoteWeight(3),
1558				prevote_ids: voter_ids.clone(),
1559				precommit_current_weight: VoteWeight(3),
1560				precommit_ids: voter_ids,
1561			})
1562		);
1563	}
1564
1565	#[test]
1566	fn pick_up_from_prior_without_grandparent_state() {
1567		let local_id = Id(5);
1568		let voters = VoterSet::new(std::iter::once((local_id, 100))).expect("nonempty");
1569
1570		let (network, routing_task) = testing::environment::make_network();
1571
1572		let global_comms = network.make_global_comms();
1573		let env = Arc::new(Environment::new(network, local_id));
1574
1575		// initialize chain
1576		let last_finalized = env.with_chain(|chain| {
1577			chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1578			chain.last_finalized()
1579		});
1580
1581		// run voter in background. scheduling it to shut down at the end.
1582		let voter = Voter::new(
1583			env.clone(),
1584			voters,
1585			global_comms,
1586			10,
1587			Vec::new(),
1588			last_finalized,
1589			last_finalized,
1590		);
1591
1592		let mut pool = LocalPool::new();
1593		pool.spawner().spawn(voter.map(|v| v.expect("Error voting"))).unwrap();
1594		pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1595
1596		// wait for the best block to finalize.
1597		pool.run_until(
1598			env.finalized_stream()
1599				.take_while(|&(_, n, _)| future::ready(n < 6))
1600				.for_each(|_| future::ready(())),
1601		)
1602	}
1603
1604	#[test]
1605	fn pick_up_from_prior_with_grandparent_state() {
1606		let local_id = Id(99);
1607		let voters = VoterSet::new((0..100).map(|i| (Id(i), 1))).expect("nonempty");
1608
1609		let (network, routing_task) = testing::environment::make_network();
1610
1611		let global_comms = network.make_global_comms();
1612		let env = Arc::new(Environment::new(network.clone(), local_id));
1613		let outer_env = env.clone();
1614
1615		// initialize chain
1616		let last_finalized = env.with_chain(|chain| {
1617			chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]);
1618			chain.last_finalized()
1619		});
1620
1621		let mut pool = LocalPool::new();
1622		let mut last_round_votes = Vec::new();
1623
1624		// round 1 state on disk: 67 prevotes for "E". 66 precommits for "D". 1 precommit "E".
1625		// the round is completable, but the estimate ("E") is not finalized.
1626		for id in 0..67 {
1627			let prevote = Message::Prevote(Prevote { target_hash: "E", target_number: 6 });
1628			let precommit = if id < 66 {
1629				Message::Precommit(Precommit { target_hash: "D", target_number: 5 })
1630			} else {
1631				Message::Precommit(Precommit { target_hash: "E", target_number: 6 })
1632			};
1633
1634			last_round_votes.push(SignedMessage {
1635				message: prevote.clone(),
1636				signature: Signature(id),
1637				id: Id(id),
1638			});
1639
1640			last_round_votes.push(SignedMessage {
1641				message: precommit.clone(),
1642				signature: Signature(id),
1643				id: Id(id),
1644			});
1645
1646			// round 2 has the same votes.
1647			//
1648			// this means we wouldn't be able to start round 3 until
1649			// the estimate of round-1 moves backwards.
1650			let (_, round_sink) = network.make_round_comms(2, Id(id));
1651			let msgs = stream::iter(iter::once(Ok(prevote)).chain(iter::once(Ok(precommit))));
1652			pool.spawner().spawn(msgs.forward(round_sink).map(|r| r.unwrap())).unwrap();
1653		}
1654
1655		// round 1 fresh communication. we send one more precommit for "D" so the estimate
1656		// moves backwards.
1657		let sender = Id(67);
1658		let (_, round_sink) = network.make_round_comms(1, sender);
1659		let last_precommit = Message::Precommit(Precommit { target_hash: "D", target_number: 3 });
1660		pool.spawner()
1661			.spawn(
1662				stream::iter(iter::once(Ok(last_precommit)))
1663					.forward(round_sink)
1664					.map(|r| r.unwrap()),
1665			)
1666			.unwrap();
1667
1668		// run voter in background. scheduling it to shut down at the end.
1669		let voter = Voter::new(
1670			env.clone(),
1671			voters,
1672			global_comms,
1673			1,
1674			last_round_votes,
1675			last_finalized,
1676			last_finalized,
1677		);
1678
1679		pool.spawner()
1680			.spawn(voter.map_err(|_| panic!("Error voting")).map(|_| ()))
1681			.unwrap();
1682		pool.spawner().spawn(routing_task.map(|_| ())).unwrap();
1683
1684		// wait until we see a prevote on round 3 from our local ID,
1685		// indicating that the round 3 has started.
1686
1687		let (round_stream, _) = network.make_round_comms(3, Id(1000));
1688		pool.run_until(
1689			round_stream
1690				.skip_while(move |v| {
1691					let v = v.as_ref().unwrap();
1692					if let Message::Prevote(_) = v.message {
1693						future::ready(v.id != local_id)
1694					} else {
1695						future::ready(true)
1696					}
1697				})
1698				.into_future()
1699				.map(|_| ()),
1700		);
1701
1702		assert_eq!(outer_env.last_completed_and_concluded(), (2, 1));
1703	}
1704}