1#[cfg(feature = "std")]
25use futures::ready;
26use futures::{
27 channel::mpsc,
28 prelude::*,
29 stream::{self, futures_unordered::FuturesUnordered},
30 task,
31};
32#[cfg(feature = "std")]
33use log::{debug, trace};
34
35use std::{
36 cmp,
37 collections::HashMap,
38 pin::Pin,
39 task::{Context, Poll},
40};
41
42use super::{voting_round::VotingRound, Environment};
43use crate::{BlockNumberOps, Commit, LOG_TARGET};
44
45struct BackgroundRound<H, N, E: Environment<H, N>>
50where
51 H: Clone + Eq + Ord + ::std::fmt::Debug,
52 N: Copy + BlockNumberOps + ::std::fmt::Debug,
53{
54 inner: VotingRound<H, N, E>,
55 waker: Option<task::Waker>,
56 finalized_number: N,
57 round_committer: Option<RoundCommitter<H, N, E>>,
58}
59
60impl<H, N, E: Environment<H, N>> BackgroundRound<H, N, E>
61where
62 H: Clone + Eq + Ord + ::std::fmt::Debug,
63 N: Copy + BlockNumberOps + ::std::fmt::Debug,
64{
65 fn round_number(&self) -> u64 {
66 self.inner.round_number()
67 }
68
69 fn voting_round(&self) -> &VotingRound<H, N, E> {
70 &self.inner
71 }
72
73 fn is_done(&self) -> bool {
74 self.round_committer.is_none() &&
81 self.inner.round_state().estimate.map_or(true, |x| x.1 <= self.finalized_number)
82 }
83
84 fn update_finalized(&mut self, new_finalized: N) {
85 self.finalized_number = cmp::max(self.finalized_number, new_finalized);
86
87 if self.is_done() {
89 if let Some(ref waker) = self.waker {
90 waker.wake_by_ref();
91 }
92 }
93 }
94}
95
96enum BackgroundRoundChange<H, N, E: Environment<H, N>>
97where
98 H: Clone + Eq + Ord + ::std::fmt::Debug,
99 N: Copy + BlockNumberOps + ::std::fmt::Debug,
100{
101 Concluded(u64),
103 Committed(Commit<H, N, E::Signature, E::Id>),
106}
107
108impl<H, N, E: Environment<H, N>> Future for BackgroundRound<H, N, E>
109where
110 H: Clone + Eq + Ord + ::std::fmt::Debug,
111 N: Copy + BlockNumberOps + ::std::fmt::Debug,
112{
113 type Output = Result<BackgroundRoundChange<H, N, E>, E::Error>;
114
115 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
116 self.waker = Some(cx.waker().clone());
117
118 let _ = self.inner.poll(cx)?;
119
120 self.round_committer = match self.round_committer.take() {
121 None => None,
122 Some(mut committer) => match committer.commit(cx, &mut self.inner)? {
123 Poll::Ready(None) => None,
124 Poll::Ready(Some(commit)) =>
125 return Poll::Ready(Ok(BackgroundRoundChange::Committed(commit))),
126 Poll::Pending => Some(committer),
127 },
128 };
129
130 if self.is_done() {
131 Poll::Ready(Ok(BackgroundRoundChange::Concluded(self.round_number())))
134 } else {
135 Poll::Pending
136 }
137 }
138}
139
140impl<H, N, E: Environment<H, N>> Unpin for BackgroundRound<H, N, E>
141where
142 H: Clone + Eq + Ord + ::std::fmt::Debug,
143 N: Copy + BlockNumberOps + ::std::fmt::Debug,
144{
145}
146
147struct RoundCommitter<H, N, E: Environment<H, N>>
148where
149 H: Clone + Eq + Ord + ::std::fmt::Debug,
150 N: Copy + BlockNumberOps + ::std::fmt::Debug,
151{
152 commit_timer: E::Timer,
153 import_commits: stream::Fuse<mpsc::UnboundedReceiver<Commit<H, N, E::Signature, E::Id>>>,
154 last_commit: Option<Commit<H, N, E::Signature, E::Id>>,
155}
156
157impl<H, N, E: Environment<H, N>> RoundCommitter<H, N, E>
158where
159 H: Clone + Eq + Ord + ::std::fmt::Debug,
160 N: Copy + BlockNumberOps + ::std::fmt::Debug,
161{
162 fn new(
163 commit_timer: E::Timer,
164 commit_receiver: mpsc::UnboundedReceiver<Commit<H, N, E::Signature, E::Id>>,
165 ) -> Self {
166 RoundCommitter { commit_timer, import_commits: commit_receiver.fuse(), last_commit: None }
167 }
168
169 fn import_commit(
170 &mut self,
171 voting_round: &mut VotingRound<H, N, E>,
172 commit: Commit<H, N, E::Signature, E::Id>,
173 ) -> Result<bool, E::Error> {
174 if commit.target_number < voting_round.finalized().map_or_else(N::zero, |(_, n)| *n) {
176 return Ok(true)
177 }
178
179 if voting_round.check_and_import_from_commit(&commit)?.is_none() {
180 return Ok(false)
181 }
182
183 self.last_commit = Some(commit);
184
185 Ok(true)
186 }
187
188 fn commit(
189 &mut self,
190 cx: &mut Context,
191 voting_round: &mut VotingRound<H, N, E>,
192 ) -> Poll<Result<Option<Commit<H, N, E::Signature, E::Id>>, E::Error>> {
193 while let Poll::Ready(Some(commit)) =
194 Stream::poll_next(Pin::new(&mut self.import_commits), cx)
195 {
196 if !self.import_commit(voting_round, commit)? {
197 trace!(target: LOG_TARGET, "Ignoring invalid commit");
198 }
199 }
200
201 ready!(self.commit_timer.poll_unpin(cx))?;
202
203 match (self.last_commit.take(), voting_round.finalized()) {
204 (None, Some(_)) => Poll::Ready(Ok(voting_round.finalizing_commit().cloned())),
205 (Some(Commit { target_number, .. }), Some((_, finalized_number)))
206 if target_number < *finalized_number =>
207 Poll::Ready(Ok(voting_round.finalizing_commit().cloned())),
208 _ => Poll::Ready(Ok(None)),
209 }
210 }
211}
212
213struct SelfReturningFuture<F> {
214 pub inner: Option<F>,
215}
216
217impl<F> From<F> for SelfReturningFuture<F> {
218 fn from(f: F) -> Self {
219 SelfReturningFuture { inner: Some(f) }
220 }
221}
222
223impl<F> SelfReturningFuture<F> {
224 fn mutate<X: FnOnce(&mut F)>(&mut self, x: X) {
225 if let Some(ref mut inner) = self.inner {
226 x(inner)
227 }
228 }
229}
230
231impl<F: Future + Unpin> Future for SelfReturningFuture<F> {
232 type Output = (F::Output, F);
233
234 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
235 match self.inner.take() {
236 None => panic!("poll after return is not done in this module; qed"),
237 Some(mut f) => match f.poll_unpin(cx) {
238 Poll::Ready(item) => Poll::Ready((item, f)),
239 Poll::Pending => {
240 self.inner = Some(f);
241 Poll::Pending
242 },
243 },
244 }
245 }
246}
247
248pub(super) struct PastRounds<H, N, E: Environment<H, N>>
251where
252 H: Clone + Eq + Ord + ::std::fmt::Debug,
253 N: Copy + BlockNumberOps + ::std::fmt::Debug,
254{
255 past_rounds: FuturesUnordered<SelfReturningFuture<BackgroundRound<H, N, E>>>,
256 commit_senders: HashMap<u64, mpsc::UnboundedSender<Commit<H, N, E::Signature, E::Id>>>,
257}
258
259impl<H, N, E: Environment<H, N>> PastRounds<H, N, E>
260where
261 H: Clone + Eq + Ord + ::std::fmt::Debug,
262 N: Copy + BlockNumberOps + ::std::fmt::Debug,
263{
264 pub(super) fn new() -> Self {
266 PastRounds { past_rounds: FuturesUnordered::new(), commit_senders: HashMap::new() }
267 }
268
269 pub(super) fn push(&mut self, env: &E, round: VotingRound<H, N, E>) {
271 let round_number = round.round_number();
272 let (tx, rx) = mpsc::unbounded();
273 let background = BackgroundRound {
274 inner: round,
275 waker: None,
276 finalized_number: N::zero(),
278 round_committer: Some(RoundCommitter::new(env.round_commit_timer(), rx)),
279 };
280 self.past_rounds.push(background.into());
281 self.commit_senders.insert(round_number, tx);
282 }
283
284 pub(super) fn update_finalized(&mut self, f_num: N) {
287 for bg in self.past_rounds.iter_mut() {
290 bg.mutate(|f| f.update_finalized(f_num));
291 }
292 }
293
294 pub(super) fn voting_rounds(&self) -> impl Iterator<Item = &VotingRound<H, N, E>> {
296 self.past_rounds
297 .iter()
298 .filter_map(|self_returning_future| self_returning_future.inner.as_ref())
299 .map(|background_round| background_round.voting_round())
300 }
301
302 pub(super) fn import_commit(
305 &self,
306 round_number: u64,
307 commit: Commit<H, N, E::Signature, E::Id>,
308 ) -> Option<Commit<H, N, E::Signature, E::Id>> {
309 if let Some(sender) = self.commit_senders.get(&round_number) {
310 sender.unbounded_send(commit).map_err(|e| e.into_inner()).err()
311 } else {
312 Some(commit)
313 }
314 }
315}
316
317impl<H, N, E: Environment<H, N>> Stream for PastRounds<H, N, E>
318where
319 H: Clone + Eq + Ord + ::std::fmt::Debug,
320 N: Copy + BlockNumberOps + ::std::fmt::Debug,
321{
322 type Item = Result<(u64, Commit<H, N, E::Signature, E::Id>), E::Error>;
323
324 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
325 loop {
326 match Stream::poll_next(Pin::new(&mut self.past_rounds), cx) {
327 Poll::Ready(Some((Ok(BackgroundRoundChange::Concluded(number)), round))) => {
328 let round = &round.inner;
329 round.env().concluded(
330 round.round_number(),
331 round.round_state(),
332 round.dag_base(),
333 round.historical_votes(),
334 )?;
335
336 self.commit_senders.remove(&number);
337 },
338 Poll::Ready(Some((Ok(BackgroundRoundChange::Committed(commit)), round))) => {
339 let number = round.round_number();
340
341 self.past_rounds.push(round.into());
343
344 debug!(
345 target: LOG_TARGET,
346 "Committing: round_number = {}, \
347 target_number = {:?}, target_hash = {:?}",
348 number,
349 commit.target_number,
350 commit.target_hash,
351 );
352
353 return Poll::Ready(Some(Ok((number, commit))))
354 },
355 Poll::Ready(Some((Err(err), _))) => return Poll::Ready(Some(Err(err))),
356 Poll::Ready(None) => return Poll::Ready(None),
357 Poll::Pending => return Poll::Pending,
358 }
359 }
360 }
361}