finality_grandpa/voter/
past_rounds.rs

1// Copyright 2018-2019 Parity Technologies (UK) Ltd
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Rounds that are not the current best round are run in the background.
16//!
17//! This module provides utilities for managing those rounds and producing commit
18//! messages from them. Any rounds that become irrelevant are dropped.
19//!
20//! Create a `PastRounds` struct, and drive it to completion while:
21//!   - Informing it of any new finalized block heights
22//!   - Passing it any validated commits (so backgrounded rounds don't produce conflicting ones)
23
24#[cfg(feature = "std")]
25use futures::ready;
26use futures::{
27	channel::mpsc,
28	prelude::*,
29	stream::{self, futures_unordered::FuturesUnordered},
30	task,
31};
32#[cfg(feature = "std")]
33use log::{debug, trace};
34
35use std::{
36	cmp,
37	collections::HashMap,
38	pin::Pin,
39	task::{Context, Poll},
40};
41
42use super::{voting_round::VotingRound, Environment};
43use crate::{BlockNumberOps, Commit, LOG_TARGET};
44
45// wraps a voting round with a new future that resolves when the round can
46// be discarded from the working set.
47//
48// that point is when the round-estimate is finalized.
49struct BackgroundRound<H, N, E: Environment<H, N>>
50where
51	H: Clone + Eq + Ord + ::std::fmt::Debug,
52	N: Copy + BlockNumberOps + ::std::fmt::Debug,
53{
54	inner: VotingRound<H, N, E>,
55	waker: Option<task::Waker>,
56	finalized_number: N,
57	round_committer: Option<RoundCommitter<H, N, E>>,
58}
59
60impl<H, N, E: Environment<H, N>> BackgroundRound<H, N, E>
61where
62	H: Clone + Eq + Ord + ::std::fmt::Debug,
63	N: Copy + BlockNumberOps + ::std::fmt::Debug,
64{
65	fn round_number(&self) -> u64 {
66		self.inner.round_number()
67	}
68
69	fn voting_round(&self) -> &VotingRound<H, N, E> {
70		&self.inner
71	}
72
73	fn is_done(&self) -> bool {
74		// no need to listen on a round anymore once the estimate is finalized.
75		//
76		// we map `None` to true because
77		//   - rounds are not backgrounded when incomplete unless we've skipped forward
78		//   - if we skipped forward we may never complete this round and we don't need
79		//     to keep it forever.
80		self.round_committer.is_none() &&
81			self.inner.round_state().estimate.map_or(true, |x| x.1 <= self.finalized_number)
82	}
83
84	fn update_finalized(&mut self, new_finalized: N) {
85		self.finalized_number = cmp::max(self.finalized_number, new_finalized);
86
87		// wake up the future to be polled if done.
88		if self.is_done() {
89			if let Some(ref waker) = self.waker {
90				waker.wake_by_ref();
91			}
92		}
93	}
94}
95
96enum BackgroundRoundChange<H, N, E: Environment<H, N>>
97where
98	H: Clone + Eq + Ord + ::std::fmt::Debug,
99	N: Copy + BlockNumberOps + ::std::fmt::Debug,
100{
101	/// Background round has fully concluded and can be discarded.
102	Concluded(u64),
103	/// Background round has a commit message to issue but should continue
104	/// being driven afterwards.
105	Committed(Commit<H, N, E::Signature, E::Id>),
106}
107
108impl<H, N, E: Environment<H, N>> Future for BackgroundRound<H, N, E>
109where
110	H: Clone + Eq + Ord + ::std::fmt::Debug,
111	N: Copy + BlockNumberOps + ::std::fmt::Debug,
112{
113	type Output = Result<BackgroundRoundChange<H, N, E>, E::Error>;
114
115	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
116		self.waker = Some(cx.waker().clone());
117
118		let _ = self.inner.poll(cx)?;
119
120		self.round_committer = match self.round_committer.take() {
121			None => None,
122			Some(mut committer) => match committer.commit(cx, &mut self.inner)? {
123				Poll::Ready(None) => None,
124				Poll::Ready(Some(commit)) =>
125					return Poll::Ready(Ok(BackgroundRoundChange::Committed(commit))),
126				Poll::Pending => Some(committer),
127			},
128		};
129
130		if self.is_done() {
131			// if this is fully concluded (has committed _and_ estimate finalized)
132			// we bail for real.
133			Poll::Ready(Ok(BackgroundRoundChange::Concluded(self.round_number())))
134		} else {
135			Poll::Pending
136		}
137	}
138}
139
140impl<H, N, E: Environment<H, N>> Unpin for BackgroundRound<H, N, E>
141where
142	H: Clone + Eq + Ord + ::std::fmt::Debug,
143	N: Copy + BlockNumberOps + ::std::fmt::Debug,
144{
145}
146
147struct RoundCommitter<H, N, E: Environment<H, N>>
148where
149	H: Clone + Eq + Ord + ::std::fmt::Debug,
150	N: Copy + BlockNumberOps + ::std::fmt::Debug,
151{
152	commit_timer: E::Timer,
153	import_commits: stream::Fuse<mpsc::UnboundedReceiver<Commit<H, N, E::Signature, E::Id>>>,
154	last_commit: Option<Commit<H, N, E::Signature, E::Id>>,
155}
156
157impl<H, N, E: Environment<H, N>> RoundCommitter<H, N, E>
158where
159	H: Clone + Eq + Ord + ::std::fmt::Debug,
160	N: Copy + BlockNumberOps + ::std::fmt::Debug,
161{
162	fn new(
163		commit_timer: E::Timer,
164		commit_receiver: mpsc::UnboundedReceiver<Commit<H, N, E::Signature, E::Id>>,
165	) -> Self {
166		RoundCommitter { commit_timer, import_commits: commit_receiver.fuse(), last_commit: None }
167	}
168
169	fn import_commit(
170		&mut self,
171		voting_round: &mut VotingRound<H, N, E>,
172		commit: Commit<H, N, E::Signature, E::Id>,
173	) -> Result<bool, E::Error> {
174		// ignore commits for a block lower than we already finalized
175		if commit.target_number < voting_round.finalized().map_or_else(N::zero, |(_, n)| *n) {
176			return Ok(true)
177		}
178
179		if voting_round.check_and_import_from_commit(&commit)?.is_none() {
180			return Ok(false)
181		}
182
183		self.last_commit = Some(commit);
184
185		Ok(true)
186	}
187
188	fn commit(
189		&mut self,
190		cx: &mut Context,
191		voting_round: &mut VotingRound<H, N, E>,
192	) -> Poll<Result<Option<Commit<H, N, E::Signature, E::Id>>, E::Error>> {
193		while let Poll::Ready(Some(commit)) =
194			Stream::poll_next(Pin::new(&mut self.import_commits), cx)
195		{
196			if !self.import_commit(voting_round, commit)? {
197				trace!(target: LOG_TARGET, "Ignoring invalid commit");
198			}
199		}
200
201		ready!(self.commit_timer.poll_unpin(cx))?;
202
203		match (self.last_commit.take(), voting_round.finalized()) {
204			(None, Some(_)) => Poll::Ready(Ok(voting_round.finalizing_commit().cloned())),
205			(Some(Commit { target_number, .. }), Some((_, finalized_number)))
206				if target_number < *finalized_number =>
207				Poll::Ready(Ok(voting_round.finalizing_commit().cloned())),
208			_ => Poll::Ready(Ok(None)),
209		}
210	}
211}
212
213struct SelfReturningFuture<F> {
214	pub inner: Option<F>,
215}
216
217impl<F> From<F> for SelfReturningFuture<F> {
218	fn from(f: F) -> Self {
219		SelfReturningFuture { inner: Some(f) }
220	}
221}
222
223impl<F> SelfReturningFuture<F> {
224	fn mutate<X: FnOnce(&mut F)>(&mut self, x: X) {
225		if let Some(ref mut inner) = self.inner {
226			x(inner)
227		}
228	}
229}
230
231impl<F: Future + Unpin> Future for SelfReturningFuture<F> {
232	type Output = (F::Output, F);
233
234	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
235		match self.inner.take() {
236			None => panic!("poll after return is not done in this module; qed"),
237			Some(mut f) => match f.poll_unpin(cx) {
238				Poll::Ready(item) => Poll::Ready((item, f)),
239				Poll::Pending => {
240					self.inner = Some(f);
241					Poll::Pending
242				},
243			},
244		}
245	}
246}
247
248/// A stream for past rounds, which produces any commit messages from those
249/// rounds and drives them to completion.
250pub(super) struct PastRounds<H, N, E: Environment<H, N>>
251where
252	H: Clone + Eq + Ord + ::std::fmt::Debug,
253	N: Copy + BlockNumberOps + ::std::fmt::Debug,
254{
255	past_rounds: FuturesUnordered<SelfReturningFuture<BackgroundRound<H, N, E>>>,
256	commit_senders: HashMap<u64, mpsc::UnboundedSender<Commit<H, N, E::Signature, E::Id>>>,
257}
258
259impl<H, N, E: Environment<H, N>> PastRounds<H, N, E>
260where
261	H: Clone + Eq + Ord + ::std::fmt::Debug,
262	N: Copy + BlockNumberOps + ::std::fmt::Debug,
263{
264	/// Create a new past rounds stream.
265	pub(super) fn new() -> Self {
266		PastRounds { past_rounds: FuturesUnordered::new(), commit_senders: HashMap::new() }
267	}
268
269	// push an old voting round onto this stream.
270	pub(super) fn push(&mut self, env: &E, round: VotingRound<H, N, E>) {
271		let round_number = round.round_number();
272		let (tx, rx) = mpsc::unbounded();
273		let background = BackgroundRound {
274			inner: round,
275			waker: None,
276			// https://github.com/paritytech/finality-grandpa/issues/50
277			finalized_number: N::zero(),
278			round_committer: Some(RoundCommitter::new(env.round_commit_timer(), rx)),
279		};
280		self.past_rounds.push(background.into());
281		self.commit_senders.insert(round_number, tx);
282	}
283
284	/// update the last finalized block. this will lead to
285	/// any irrelevant background rounds being pruned.
286	pub(super) fn update_finalized(&mut self, f_num: N) {
287		// have the task check if it should be pruned.
288		// if so, this future will be re-polled
289		for bg in self.past_rounds.iter_mut() {
290			bg.mutate(|f| f.update_finalized(f_num));
291		}
292	}
293
294	/// Get the underlying `VotingRound` items that are being run in the background.
295	pub(super) fn voting_rounds(&self) -> impl Iterator<Item = &VotingRound<H, N, E>> {
296		self.past_rounds
297			.iter()
298			.filter_map(|self_returning_future| self_returning_future.inner.as_ref())
299			.map(|background_round| background_round.voting_round())
300	}
301
302	// import the commit into the given backgrounded round. If not possible,
303	// just return and process the commit.
304	pub(super) fn import_commit(
305		&self,
306		round_number: u64,
307		commit: Commit<H, N, E::Signature, E::Id>,
308	) -> Option<Commit<H, N, E::Signature, E::Id>> {
309		if let Some(sender) = self.commit_senders.get(&round_number) {
310			sender.unbounded_send(commit).map_err(|e| e.into_inner()).err()
311		} else {
312			Some(commit)
313		}
314	}
315}
316
317impl<H, N, E: Environment<H, N>> Stream for PastRounds<H, N, E>
318where
319	H: Clone + Eq + Ord + ::std::fmt::Debug,
320	N: Copy + BlockNumberOps + ::std::fmt::Debug,
321{
322	type Item = Result<(u64, Commit<H, N, E::Signature, E::Id>), E::Error>;
323
324	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
325		loop {
326			match Stream::poll_next(Pin::new(&mut self.past_rounds), cx) {
327				Poll::Ready(Some((Ok(BackgroundRoundChange::Concluded(number)), round))) => {
328					let round = &round.inner;
329					round.env().concluded(
330						round.round_number(),
331						round.round_state(),
332						round.dag_base(),
333						round.historical_votes(),
334					)?;
335
336					self.commit_senders.remove(&number);
337				},
338				Poll::Ready(Some((Ok(BackgroundRoundChange::Committed(commit)), round))) => {
339					let number = round.round_number();
340
341					// reschedule until irrelevant.
342					self.past_rounds.push(round.into());
343
344					debug!(
345						target: LOG_TARGET,
346						"Committing: round_number = {}, \
347						target_number = {:?}, target_hash = {:?}",
348						number,
349						commit.target_number,
350						commit.target_hash,
351					);
352
353					return Poll::Ready(Some(Ok((number, commit))))
354				},
355				Poll::Ready(Some((Err(err), _))) => return Poll::Ready(Some(Err(err))),
356				Poll::Ready(None) => return Poll::Ready(None),
357				Poll::Pending => return Poll::Pending,
358			}
359		}
360	}
361}