1#[cfg(feature = "std")]
18use futures::ready;
19use futures::{channel::mpsc::UnboundedSender, prelude::*};
20#[cfg(feature = "std")]
21use log::{debug, trace, warn};
22
23use std::{
24 pin::Pin,
25 sync::Arc,
26 task::{Context, Poll},
27};
28
29use super::{Buffered, Environment, FinalizedNotification};
30use crate::{
31 round::{Round, State as RoundState},
32 validate_commit,
33 voter_set::VoterSet,
34 weights::VoteWeight,
35 BlockNumberOps, Commit, HistoricalVotes, ImportResult, Message, Precommit, Prevote,
36 PrimaryPropose, SignedMessage, SignedPrecommit, LOG_TARGET,
37};
38
39pub(super) enum State<T, W> {
41 Start(T, T),
42 Proposed(T, T),
43 Prevoting(T, W),
44 Prevoted(T),
45 Precommitted,
46}
47
48impl<T, W> std::fmt::Debug for State<T, W> {
49 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
50 match self {
51 State::Start(..) => write!(f, "Start"),
52 State::Proposed(..) => write!(f, "Proposed"),
53 State::Prevoting(..) => write!(f, "Prevoting"),
54 State::Prevoted(_) => write!(f, "Prevoted"),
55 State::Precommitted => write!(f, "Precommitted"),
56 }
57 }
58}
59
60pub(super) struct VotingRound<H, N, E: Environment<H, N>>
62where
63 H: Clone + Eq + Ord + ::std::fmt::Debug,
64 N: Copy + BlockNumberOps + ::std::fmt::Debug,
65{
66 env: Arc<E>,
67 voting: Voting,
68 votes: Round<E::Id, H, N, E::Signature>,
69 incoming: E::In,
70 outgoing: Buffered<E::Out, Message<H, N>>,
71 state: Option<State<E::Timer, (H, E::BestChain)>>, bridged_round_state: Option<crate::bridge_state::PriorView<H, N>>, last_round_state: Option<crate::bridge_state::LatterView<H, N>>, primary_block: Option<(H, N)>, finalized_sender: UnboundedSender<FinalizedNotification<H, N, E>>,
76 best_finalized: Option<Commit<H, N, E::Signature, E::Id>>,
77}
78
79enum Voting {
81 No,
83 Yes,
85 Primary,
88}
89
90impl Voting {
91 fn is_active(&self) -> bool {
93 matches!(self, Voting::Yes | Voting::Primary)
94 }
95
96 fn is_primary(&self) -> bool {
98 matches!(self, Voting::Primary)
99 }
100}
101
102impl<H, N, E: Environment<H, N>> VotingRound<H, N, E>
103where
104 H: Clone + Eq + Ord + ::std::fmt::Debug,
105 N: Copy + BlockNumberOps + ::std::fmt::Debug,
106{
107 pub(super) fn new(
109 round_number: u64,
110 voters: VoterSet<E::Id>,
111 base: (H, N),
112 last_round_state: Option<crate::bridge_state::LatterView<H, N>>,
113 finalized_sender: UnboundedSender<FinalizedNotification<H, N, E>>,
114 env: Arc<E>,
115 ) -> VotingRound<H, N, E> {
116 let round_data = env.round_data(round_number);
117 let round_params = crate::round::RoundParams { voters, base, round_number };
118
119 let votes = Round::new(round_params);
120
121 let voting = if round_data.voter_id.as_ref() == Some(votes.primary_voter().0) {
122 Voting::Primary
123 } else if round_data.voter_id.as_ref().map_or(false, |id| votes.voters().contains(id)) {
124 Voting::Yes
125 } else {
126 Voting::No
127 };
128
129 VotingRound {
130 votes,
131 voting,
132 incoming: round_data.incoming,
133 outgoing: Buffered::new(round_data.outgoing),
134 state: Some(State::Start(round_data.prevote_timer, round_data.precommit_timer)),
135 bridged_round_state: None,
136 primary_block: None,
137 best_finalized: None,
138 env,
139 last_round_state,
140 finalized_sender,
141 }
142 }
143
144 pub(super) fn completed(
147 votes: Round<E::Id, H, N, E::Signature>,
148 finalized_sender: UnboundedSender<FinalizedNotification<H, N, E>>,
149 last_round_state: Option<crate::bridge_state::LatterView<H, N>>,
150 env: Arc<E>,
151 ) -> VotingRound<H, N, E> {
152 let round_data = env.round_data(votes.number());
153
154 VotingRound {
155 votes,
156 voting: Voting::No,
157 incoming: round_data.incoming,
158 outgoing: Buffered::new(round_data.outgoing),
159 state: None,
160 bridged_round_state: None,
161 primary_block: None,
162 env,
163 last_round_state,
164 finalized_sender,
165 best_finalized: None,
166 }
167 }
168
169 pub(super) fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), E::Error>> {
172 trace!(
173 target: LOG_TARGET,
174 "Polling round {}, state = {:?}, step = {:?}",
175 self.votes.number(),
176 self.votes.state(),
177 self.state
178 );
179
180 let pre_state = self.votes.state();
181 self.process_incoming(cx)?;
182
183 let last_round_state = self.last_round_state.as_ref().map(|s| s.get(cx).clone());
187 if let Some(ref last_round_state) = last_round_state {
188 self.primary_propose(last_round_state)?;
189 self.prevote(cx, last_round_state)?;
190 self.precommit(cx, last_round_state)?;
191 }
192
193 ready!(self.outgoing.poll(cx))?;
194 self.process_incoming(cx)?; let post_state = self.votes.state();
198 self.notify(pre_state, post_state);
199
200 if !self.votes.completable() {
202 return Poll::Pending
203 }
204
205 let last_round_estimate_finalized = match last_round_state {
207 Some(RoundState {
208 estimate: Some((_, last_round_estimate)),
209 finalized: Some((_, last_round_finalized)),
210 ..
211 }) => {
212 let finalized_in_last_round = last_round_estimate <= last_round_finalized;
214
215 let finalized_in_current_round =
217 self.finalized().map_or(false, |(_, current_round_finalized)| {
218 last_round_estimate <= *current_round_finalized
219 });
220
221 finalized_in_last_round || finalized_in_current_round
222 },
223 None => {
224 true
228 },
229 _ => false,
230 };
231
232 if !last_round_estimate_finalized {
234 trace!(
235 target: LOG_TARGET,
236 "Round {} completable but estimate not finalized.",
237 self.round_number()
238 );
239 self.log_participation(log::Level::Trace);
240 return Poll::Pending
241 }
242
243 debug!(
244 target: LOG_TARGET,
245 "Completed round {}, state = {:?}, step = {:?}",
246 self.votes.number(),
247 self.votes.state(),
248 self.state
249 );
250
251 self.log_participation(log::Level::Debug);
252
253 Poll::Ready(Ok(()))
255 }
256
257 pub(super) fn state(&self) -> Option<&State<E::Timer, (H, E::BestChain)>> {
259 self.state.as_ref()
260 }
261
262 pub(super) fn env(&self) -> &E {
264 &self.env
265 }
266
267 pub(super) fn round_number(&self) -> u64 {
269 self.votes.number()
270 }
271
272 pub(super) fn round_state(&self) -> RoundState<H, N> {
274 self.votes.state()
275 }
276
277 pub(super) fn dag_base(&self) -> (H, N) {
279 self.votes.base()
280 }
281
282 pub(super) fn voters(&self) -> &VoterSet<E::Id> {
284 self.votes.voters()
285 }
286
287 pub(super) fn finalized(&self) -> Option<&(H, N)> {
289 self.votes.finalized()
290 }
291
292 pub(super) fn prevote_weight(&self) -> VoteWeight {
294 self.votes.prevote_participation().0
295 }
296
297 pub(super) fn precommit_weight(&self) -> VoteWeight {
299 self.votes.precommit_participation().0
300 }
301
302 pub(super) fn prevote_ids(&self) -> impl Iterator<Item = E::Id> {
304 self.votes.prevotes().into_iter().map(|pv| pv.0)
305 }
306
307 pub(super) fn precommit_ids(&self) -> impl Iterator<Item = E::Id> {
309 self.votes.precommits().into_iter().map(|pv| pv.0)
310 }
311
312 pub(super) fn check_and_import_from_commit(
315 &mut self,
316 commit: &Commit<H, N, E::Signature, E::Id>,
317 ) -> Result<Option<(H, N)>, E::Error> {
318 if !validate_commit(commit, self.voters(), &*self.env)?.is_valid() {
319 return Ok(None)
320 }
321
322 for SignedPrecommit { precommit, signature, id } in commit.precommits.iter().cloned() {
323 let import_result =
324 self.votes.import_precommit(&*self.env, precommit, id, signature)?;
325 if let ImportResult { equivocation: Some(e), .. } = import_result {
326 self.env.precommit_equivocation(self.round_number(), e);
327 }
328 }
329
330 Ok(Some((commit.target_hash.clone(), commit.target_number)))
331 }
332
333 pub(super) fn finalized_sender(&self) -> UnboundedSender<FinalizedNotification<H, N, E>> {
335 self.finalized_sender.clone()
336 }
337
338 pub(super) fn bridge_state(&mut self) -> crate::bridge_state::LatterView<H, N> {
341 let (prior_view, latter_view) = crate::bridge_state::bridge_state(self.votes.state());
342 if self.bridged_round_state.is_some() {
343 warn!(
344 target: LOG_TARGET,
345 "Bridged state from round {} more than once.",
346 self.votes.number()
347 );
348 }
349
350 self.bridged_round_state = Some(prior_view);
351 latter_view
352 }
353
354 pub(super) fn finalizing_commit(&self) -> Option<&Commit<H, N, E::Signature, E::Id>> {
356 self.best_finalized.as_ref()
357 }
358
359 pub(super) fn historical_votes(&self) -> &HistoricalVotes<H, N, E::Signature, E::Id> {
364 self.votes.historical_votes()
365 }
366
367 pub(super) fn handle_vote(
369 &mut self,
370 vote: SignedMessage<H, N, E::Signature, E::Id>,
371 ) -> Result<(), E::Error> {
372 let SignedMessage { message, signature, id } = vote;
373 if !self
374 .env
375 .is_equal_or_descendent_of(self.votes.base().0, message.target().0.clone())
376 {
377 trace!(
378 target: LOG_TARGET,
379 "Ignoring message targeting {:?} lower than round base {:?}",
380 message.target(),
381 self.votes.base(),
382 );
383 return Ok(())
384 }
385
386 match message {
387 Message::Prevote(prevote) => {
388 let import_result =
389 self.votes.import_prevote(&*self.env, prevote, id, signature)?;
390 if let ImportResult { equivocation: Some(e), .. } = import_result {
391 self.env.prevote_equivocation(self.votes.number(), e);
392 }
393 },
394 Message::Precommit(precommit) => {
395 let import_result =
396 self.votes.import_precommit(&*self.env, precommit, id, signature)?;
397 if let ImportResult { equivocation: Some(e), .. } = import_result {
398 self.env.precommit_equivocation(self.votes.number(), e);
399 }
400 },
401 Message::PrimaryPropose(primary) => {
402 let primary_id = self.votes.primary_voter().0.clone();
403 if id == primary_id {
406 self.primary_block = Some((primary.target_hash, primary.target_number));
407 }
408 },
409 }
410
411 Ok(())
412 }
413
414 fn log_participation(&self, log_level: log::Level) {
415 let total_weight = self.voters().total_weight();
416 let threshold = self.voters().threshold();
417 let n_voters = self.voters().len();
418 let number = self.round_number();
419
420 let (prevote_weight, n_prevotes) = self.votes.prevote_participation();
421 let (precommit_weight, n_precommits) = self.votes.precommit_participation();
422
423 log::log!(
424 target: LOG_TARGET,
425 log_level,
426 "Round {}: prevotes: {}/{}/{} weight, {}/{} actual",
427 number,
428 prevote_weight,
429 threshold,
430 total_weight,
431 n_prevotes,
432 n_voters
433 );
434
435 log::log!(
436 target: LOG_TARGET,
437 log_level,
438 "Round {}: precommits: {}/{}/{} weight, {}/{} actual",
439 number,
440 precommit_weight,
441 threshold,
442 total_weight,
443 n_precommits,
444 n_voters
445 );
446 }
447
448 fn process_incoming(&mut self, cx: &mut Context) -> Result<(), E::Error> {
449 while let Poll::Ready(Some(incoming)) = Stream::poll_next(Pin::new(&mut self.incoming), cx)
450 {
451 trace!(target: LOG_TARGET, "Round {}: Got incoming message", self.round_number());
452 self.handle_vote(incoming?)?;
453 }
454
455 Ok(())
456 }
457
458 fn primary_propose(&mut self, last_round_state: &RoundState<H, N>) -> Result<(), E::Error> {
459 match self.state.take() {
460 Some(State::Start(prevote_timer, precommit_timer)) => {
461 let maybe_estimate = last_round_state.estimate.clone();
462
463 match (maybe_estimate, self.voting.is_primary()) {
464 (Some(last_round_estimate), true) => {
465 let maybe_finalized = last_round_state.finalized.clone();
466
467 let should_send_primary =
469 maybe_finalized.map_or(true, |f| last_round_estimate.1 > f.1);
470 if should_send_primary {
471 debug!(
472 target: LOG_TARGET,
473 "Sending primary block hint for round {}",
474 self.votes.number()
475 );
476 let primary = PrimaryPropose {
477 target_hash: last_round_estimate.0,
478 target_number: last_round_estimate.1,
479 };
480 self.env.proposed(self.round_number(), primary.clone())?;
481 self.outgoing.push(Message::PrimaryPropose(primary));
482 self.state = Some(State::Proposed(prevote_timer, precommit_timer));
483
484 return Ok(())
485 } else {
486 debug!(
487 target: LOG_TARGET,
488 "Last round estimate has been finalized, \
489 not sending primary block hint for round {}",
490 self.votes.number()
491 );
492 }
493 },
494 (None, true) => {
495 debug!(
496 target: LOG_TARGET,
497 "Last round estimate does not exist, \
498 not sending primary block hint for round {}",
499 self.votes.number()
500 );
501 },
502 _ => {},
503 }
504
505 self.state = Some(State::Start(prevote_timer, precommit_timer));
506 },
507 x => {
508 self.state = x;
509 },
510 }
511
512 Ok(())
513 }
514
515 fn prevote(
516 &mut self,
517 cx: &mut Context,
518 last_round_state: &RoundState<H, N>,
519 ) -> Result<(), E::Error> {
520 let state = self.state.take();
521
522 let start_prevoting = |this: &mut Self,
523 mut prevote_timer: E::Timer,
524 precommit_timer: E::Timer,
525 proposed: bool,
526 cx: &mut Context| {
527 let should_prevote = match prevote_timer.poll_unpin(cx) {
528 Poll::Ready(Err(e)) => return Err(e),
529 Poll::Ready(Ok(())) => true,
530 Poll::Pending => this.votes.completable(),
531 };
532
533 if should_prevote {
534 if this.voting.is_active() {
535 debug!(
536 target: LOG_TARGET,
537 "Constructing prevote for round {}",
538 this.votes.number()
539 );
540
541 let (base, best_chain) = this.construct_prevote(last_round_state);
542
543 cx.waker().wake_by_ref();
548
549 this.state = Some(State::Prevoting(precommit_timer, (base, best_chain)));
550 } else {
551 this.state = Some(State::Prevoted(precommit_timer));
552 }
553 } else if proposed {
554 this.state = Some(State::Proposed(prevote_timer, precommit_timer));
555 } else {
556 this.state = Some(State::Start(prevote_timer, precommit_timer));
557 }
558
559 Ok(())
560 };
561
562 let finish_prevoting = |this: &mut Self,
563 precommit_timer: E::Timer,
564 base: H,
565 mut best_chain: E::BestChain,
566 cx: &mut Context| {
567 let best_chain = match best_chain.poll_unpin(cx) {
568 Poll::Ready(Err(e)) => return Err(e),
569 Poll::Ready(Ok(best_chain)) => best_chain,
570 Poll::Pending => {
571 this.state = Some(State::Prevoting(precommit_timer, (base, best_chain)));
572 return Ok(())
573 },
574 };
575
576 if let Some(target) = best_chain {
577 let prevote = Prevote { target_hash: target.0, target_number: target.1 };
578
579 debug!(target: LOG_TARGET, "Casting prevote for round {}", this.votes.number());
580 this.env.prevoted(this.round_number(), prevote.clone())?;
581 this.votes.set_prevoted_index();
582 this.outgoing.push(Message::Prevote(prevote));
583 this.state = Some(State::Prevoted(precommit_timer));
584 } else {
585 warn!(
588 target: LOG_TARGET,
589 "Could not cast prevote: previously known block {:?} has disappeared", base,
590 );
591
592 this.state = None;
594 this.voting = Voting::No;
595 }
596
597 Ok(())
598 };
599
600 match state {
601 Some(State::Start(prevote_timer, precommit_timer)) => {
602 start_prevoting(self, prevote_timer, precommit_timer, false, cx)?;
603 },
604 Some(State::Proposed(prevote_timer, precommit_timer)) => {
605 start_prevoting(self, prevote_timer, precommit_timer, true, cx)?;
606 },
607 Some(State::Prevoting(precommit_timer, (base, best_chain))) => {
608 finish_prevoting(self, precommit_timer, base, best_chain, cx)?;
609 },
610 x => {
611 self.state = x;
612 },
613 }
614
615 Ok(())
616 }
617
618 fn precommit(
619 &mut self,
620 cx: &mut Context,
621 last_round_state: &RoundState<H, N>,
622 ) -> Result<(), E::Error> {
623 match self.state.take() {
624 Some(State::Prevoted(mut precommit_timer)) => {
625 let last_round_estimate = last_round_state
626 .estimate
627 .clone()
628 .expect("Rounds only started when prior round completable; qed");
629
630 let should_precommit = {
631 self.votes.state().prevote_ghost.as_ref().map_or(false, |p_g| {
634 p_g == &last_round_estimate ||
635 self.env
636 .is_equal_or_descendent_of(last_round_estimate.0, p_g.0.clone())
637 })
638 } && match precommit_timer.poll_unpin(cx) {
639 Poll::Ready(Err(e)) => return Err(e),
640 Poll::Ready(Ok(())) => true,
641 Poll::Pending => self.votes.completable(),
642 };
643
644 if should_precommit {
645 if self.voting.is_active() {
646 debug!(
647 target: LOG_TARGET,
648 "Casting precommit for round {}",
649 self.votes.number()
650 );
651 let precommit = self.construct_precommit();
652 self.env.precommitted(self.round_number(), precommit.clone())?;
653 self.votes.set_precommitted_index();
654 self.outgoing.push(Message::Precommit(precommit));
655 }
656 self.state = Some(State::Precommitted);
657 } else {
658 self.state = Some(State::Prevoted(precommit_timer));
659 }
660 },
661 x => {
662 self.state = x;
663 },
664 }
665
666 Ok(())
667 }
668
669 fn construct_prevote(&self, last_round_state: &RoundState<H, N>) -> (H, E::BestChain) {
671 let last_round_estimate = last_round_state
672 .estimate
673 .clone()
674 .expect("Rounds only started when prior round completable; qed");
675
676 let find_descendent_of = match self.primary_block {
677 None => {
678 last_round_estimate.0
680 },
681 Some(ref primary_block) => {
682 let last_prevote_g = last_round_state
687 .prevote_ghost
688 .clone()
689 .expect("Rounds only started when prior round completable; qed");
690
691 if primary_block == &last_prevote_g {
693 primary_block.0.clone()
694 } else if primary_block.1 >= last_prevote_g.1 {
695 last_round_estimate.0
696 } else {
697 let &(ref p_hash, p_num) = primary_block;
702 match self.env.ancestry(last_round_estimate.0.clone(), last_prevote_g.0.clone())
703 {
704 Ok(ancestry) => {
705 let to_sub = p_num + N::one();
706
707 let offset: usize = if last_prevote_g.1 < to_sub {
708 0
709 } else {
710 (last_prevote_g.1 - to_sub).as_()
711 };
712
713 if ancestry.get(offset).map_or(false, |b| b == p_hash) {
714 p_hash.clone()
715 } else {
716 last_round_estimate.0
717 }
718 },
719 Err(crate::Error::NotDescendent) => {
720 warn!(
722 target: LOG_TARGET,
723 "Possible case of massive equivocation: \
724 last round prevote GHOST: {:?} is not a descendant of last round estimate: {:?}",
725 last_prevote_g,
726 last_round_estimate,
727 );
728
729 last_round_estimate.0
730 },
731 }
732 }
733 },
734 };
735
736 (find_descendent_of.clone(), self.env.best_chain_containing(find_descendent_of))
737 }
738
739 fn construct_precommit(&self) -> Precommit<H, N> {
741 let t = match self.votes.state().prevote_ghost {
742 Some(target) => target,
743 None => self.votes.base(),
744 };
745
746 Precommit { target_hash: t.0, target_number: t.1 }
747 }
748
749 fn notify(&mut self, last_state: RoundState<H, N>, new_state: RoundState<H, N>) {
751 if last_state != new_state {
752 if let Some(ref b) = self.bridged_round_state {
753 b.update(new_state.clone());
754 }
755 }
756
757 let state_changed = last_state.finalized != new_state.finalized;
765 let sent_finality_notifications = self.best_finalized.is_some();
766
767 if new_state.completable && (state_changed || !sent_finality_notifications) {
768 let precommitted = matches!(self.state, Some(State::Precommitted));
769 let cant_vote = self.last_round_state.is_none();
772
773 if precommitted || cant_vote {
774 if let Some((f_hash, f_number)) = new_state.finalized {
775 let commit = Commit {
776 target_hash: f_hash.clone(),
777 target_number: f_number,
778 precommits: self.votes.finalizing_precommits(&*self.env)
779 .expect("always returns none if something was finalized; this is checked above; qed")
780 .collect(),
781 };
782 let finalized = (f_hash, f_number, self.votes.number(), commit.clone());
783 let _ = self.finalized_sender.unbounded_send(finalized);
784 self.best_finalized = Some(commit);
785 }
786 }
787 }
788 }
789}