1use std::{
20 collections::{BTreeMap, HashMap},
21 marker::PhantomData,
22 pin::Pin,
23 sync::Arc,
24 time::Duration,
25};
26
27use codec::{Decode, Encode};
28use finality_grandpa::{
29 round::State as RoundState, voter, voter_set::VoterSet, BlockNumberOps, Error as GrandpaError,
30};
31use futures::prelude::*;
32use futures_timer::Delay;
33use log::{debug, warn};
34use parking_lot::RwLock;
35use prometheus_endpoint::{register, Counter, Gauge, PrometheusError, U64};
36
37use sc_client_api::{
38 backend::{apply_aux, Backend as BackendT},
39 utils::is_descendent_of,
40};
41use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
42use sc_transaction_pool_api::OffchainTransactionPoolFactory;
43use sp_api::ApiExt;
44use sp_blockchain::HeaderMetadata;
45use sp_consensus::SelectChain as SelectChainT;
46use sp_consensus_grandpa::{
47 AuthorityId, AuthoritySignature, Equivocation, EquivocationProof, GrandpaApi, RoundNumber,
48 SetId, GRANDPA_ENGINE_ID,
49};
50use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero};
51
52use crate::{
53 authorities::{AuthoritySet, SharedAuthoritySet},
54 communication::{Network as NetworkT, Syncing as SyncingT},
55 justification::GrandpaJustification,
56 local_authority_id,
57 notification::GrandpaJustificationSender,
58 until_imported::UntilVoteTargetImported,
59 voting_rule::VotingRule as VotingRuleT,
60 ClientForGrandpa, CommandOrError, Commit, Config, Error, NewAuthoritySet, Precommit, Prevote,
61 PrimaryPropose, SignedMessage, VoterCommand, LOG_TARGET,
62};
63
64type HistoricalVotes<Block> = finality_grandpa::HistoricalVotes<
65 <Block as BlockT>::Hash,
66 NumberFor<Block>,
67 AuthoritySignature,
68 AuthorityId,
69>;
70
71#[derive(Debug, Clone, Decode, Encode, PartialEq)]
74pub struct CompletedRound<Block: BlockT> {
75 pub number: RoundNumber,
77 pub state: RoundState<Block::Hash, NumberFor<Block>>,
79 pub base: (Block::Hash, NumberFor<Block>),
81 pub votes: Vec<SignedMessage<Block::Header>>,
83}
84
85#[derive(Debug, Clone, PartialEq)]
89pub struct CompletedRounds<Block: BlockT> {
90 rounds: Vec<CompletedRound<Block>>,
91 set_id: SetId,
92 voters: Vec<AuthorityId>,
93}
94
95const NUM_LAST_COMPLETED_ROUNDS: usize = 2;
99
100impl<Block: BlockT> Encode for CompletedRounds<Block> {
101 fn encode(&self) -> Vec<u8> {
102 let v = Vec::from_iter(&self.rounds);
103 (&v, &self.set_id, &self.voters).encode()
104 }
105}
106
107impl<Block: BlockT> codec::EncodeLike for CompletedRounds<Block> {}
108
109impl<Block: BlockT> Decode for CompletedRounds<Block> {
110 fn decode<I: codec::Input>(value: &mut I) -> Result<Self, codec::Error> {
111 <(Vec<CompletedRound<Block>>, SetId, Vec<AuthorityId>)>::decode(value)
112 .map(|(rounds, set_id, voters)| CompletedRounds { rounds, set_id, voters })
113 }
114}
115
116impl<Block: BlockT> CompletedRounds<Block> {
117 pub(crate) fn new(
119 genesis: CompletedRound<Block>,
120 set_id: SetId,
121 voters: &AuthoritySet<Block::Hash, NumberFor<Block>>,
122 ) -> CompletedRounds<Block> {
123 let mut rounds = Vec::with_capacity(NUM_LAST_COMPLETED_ROUNDS);
124 rounds.push(genesis);
125
126 let voters = voters.current_authorities.iter().map(|(a, _)| a.clone()).collect();
127 CompletedRounds { rounds, set_id, voters }
128 }
129
130 pub fn set_info(&self) -> (SetId, &[AuthorityId]) {
132 (self.set_id, &self.voters[..])
133 }
134
135 pub fn iter(&self) -> impl Iterator<Item = &CompletedRound<Block>> {
137 self.rounds.iter().rev()
138 }
139
140 pub fn last(&self) -> &CompletedRound<Block> {
142 self.rounds
143 .first()
144 .expect("inner is never empty; always contains at least genesis; qed")
145 }
146
147 pub fn push(&mut self, completed_round: CompletedRound<Block>) {
150 use std::cmp::Reverse;
151
152 match self
153 .rounds
154 .binary_search_by_key(&Reverse(completed_round.number), |completed_round| {
155 Reverse(completed_round.number)
156 }) {
157 Ok(idx) => self.rounds[idx] = completed_round,
158 Err(idx) => self.rounds.insert(idx, completed_round),
159 };
160
161 if self.rounds.len() > NUM_LAST_COMPLETED_ROUNDS {
162 self.rounds.pop();
163 }
164 }
165}
166
167pub type CurrentRounds<Block> = BTreeMap<RoundNumber, HasVoted<<Block as BlockT>::Header>>;
170
171#[derive(Debug, Decode, Encode, PartialEq)]
177pub enum VoterSetState<Block: BlockT> {
178 Live {
180 completed_rounds: CompletedRounds<Block>,
182 current_rounds: CurrentRounds<Block>,
184 },
185 Paused {
187 completed_rounds: CompletedRounds<Block>,
189 },
190}
191
192impl<Block: BlockT> VoterSetState<Block> {
193 pub(crate) fn live(
197 set_id: SetId,
198 authority_set: &AuthoritySet<Block::Hash, NumberFor<Block>>,
199 genesis_state: (Block::Hash, NumberFor<Block>),
200 ) -> VoterSetState<Block> {
201 let state = RoundState::genesis((genesis_state.0, genesis_state.1));
202 let completed_rounds = CompletedRounds::new(
203 CompletedRound {
204 number: 0,
205 state,
206 base: (genesis_state.0, genesis_state.1),
207 votes: Vec::new(),
208 },
209 set_id,
210 authority_set,
211 );
212
213 let mut current_rounds = CurrentRounds::<Block>::new();
214 current_rounds.insert(1, HasVoted::No);
215
216 VoterSetState::Live { completed_rounds, current_rounds }
217 }
218
219 pub(crate) fn completed_rounds(&self) -> CompletedRounds<Block> {
221 match self {
222 VoterSetState::Live { completed_rounds, .. } => completed_rounds.clone(),
223 VoterSetState::Paused { completed_rounds } => completed_rounds.clone(),
224 }
225 }
226
227 pub(crate) fn last_completed_round(&self) -> CompletedRound<Block> {
229 match self {
230 VoterSetState::Live { completed_rounds, .. } => completed_rounds.last().clone(),
231 VoterSetState::Paused { completed_rounds } => completed_rounds.last().clone(),
232 }
233 }
234
235 pub fn with_current_round(
238 &self,
239 round: RoundNumber,
240 ) -> Result<(&CompletedRounds<Block>, &CurrentRounds<Block>), Error> {
241 if let VoterSetState::Live { completed_rounds, current_rounds } = self {
242 if current_rounds.contains_key(&round) {
243 Ok((completed_rounds, current_rounds))
244 } else {
245 let msg = "Voter acting on a live round we are not tracking.";
246 Err(Error::Safety(msg.to_string()))
247 }
248 } else {
249 let msg = "Voter acting while in paused state.";
250 Err(Error::Safety(msg.to_string()))
251 }
252 }
253}
254
255#[derive(Clone, Debug, Decode, Encode, PartialEq)]
257pub enum HasVoted<Header: HeaderT> {
258 No,
260 Yes(AuthorityId, Vote<Header>),
262}
263
264#[derive(Debug, Clone, Decode, Encode, PartialEq)]
266pub enum Vote<Header: HeaderT> {
267 Propose(PrimaryPropose<Header>),
269 Prevote(Option<PrimaryPropose<Header>>, Prevote<Header>),
271 Precommit(Option<PrimaryPropose<Header>>, Prevote<Header>, Precommit<Header>),
273}
274
275impl<Header: HeaderT> HasVoted<Header> {
276 pub fn propose(&self) -> Option<&PrimaryPropose<Header>> {
278 match self {
279 HasVoted::Yes(_, Vote::Propose(propose)) => Some(propose),
280 HasVoted::Yes(_, Vote::Prevote(propose, _)) |
281 HasVoted::Yes(_, Vote::Precommit(propose, _, _)) => propose.as_ref(),
282 _ => None,
283 }
284 }
285
286 pub fn prevote(&self) -> Option<&Prevote<Header>> {
288 match self {
289 HasVoted::Yes(_, Vote::Prevote(_, prevote)) |
290 HasVoted::Yes(_, Vote::Precommit(_, prevote, _)) => Some(prevote),
291 _ => None,
292 }
293 }
294
295 pub fn precommit(&self) -> Option<&Precommit<Header>> {
297 match self {
298 HasVoted::Yes(_, Vote::Precommit(_, _, precommit)) => Some(precommit),
299 _ => None,
300 }
301 }
302
303 pub fn can_propose(&self) -> bool {
305 self.propose().is_none()
306 }
307
308 pub fn can_prevote(&self) -> bool {
310 self.prevote().is_none()
311 }
312
313 pub fn can_precommit(&self) -> bool {
315 self.precommit().is_none()
316 }
317}
318
319#[derive(Clone)]
321pub struct SharedVoterSetState<Block: BlockT> {
322 inner: Arc<RwLock<VoterSetState<Block>>>,
324 voting: Arc<RwLock<HashMap<RoundNumber, AuthorityId>>>,
327}
328
329impl<Block: BlockT> From<VoterSetState<Block>> for SharedVoterSetState<Block> {
330 fn from(set_state: VoterSetState<Block>) -> Self {
331 SharedVoterSetState::new(set_state)
332 }
333}
334
335impl<Block: BlockT> SharedVoterSetState<Block> {
336 pub(crate) fn new(state: VoterSetState<Block>) -> Self {
338 SharedVoterSetState {
339 inner: Arc::new(RwLock::new(state)),
340 voting: Arc::new(RwLock::new(HashMap::new())),
341 }
342 }
343
344 pub(crate) fn read(&self) -> parking_lot::RwLockReadGuard<VoterSetState<Block>> {
346 self.inner.read()
347 }
348
349 pub(crate) fn voting_on(&self, round: RoundNumber) -> Option<AuthorityId> {
351 self.voting.read().get(&round).cloned()
352 }
353
354 pub(crate) fn started_voting_on(&self, round: RoundNumber, local_id: AuthorityId) {
356 self.voting.write().insert(round, local_id);
357 }
358
359 pub(crate) fn finished_voting_on(&self, round: RoundNumber) {
363 self.voting.write().remove(&round);
364 }
365
366 pub(crate) fn has_voted(&self, round: RoundNumber) -> HasVoted<Block::Header> {
368 match &*self.inner.read() {
369 VoterSetState::Live { current_rounds, .. } => current_rounds
370 .get(&round)
371 .and_then(|has_voted| match has_voted {
372 HasVoted::Yes(id, vote) => Some(HasVoted::Yes(id.clone(), vote.clone())),
373 _ => None,
374 })
375 .unwrap_or(HasVoted::No),
376 _ => HasVoted::No,
377 }
378 }
379
380 fn with<F, R>(&self, f: F) -> R
382 where
383 F: FnOnce(&mut VoterSetState<Block>) -> R,
384 {
385 f(&mut *self.inner.write())
386 }
387}
388
389#[derive(Clone)]
391pub(crate) struct Metrics {
392 finality_grandpa_round: Gauge<U64>,
393 finality_grandpa_prevotes: Counter<U64>,
394 finality_grandpa_precommits: Counter<U64>,
395}
396
397impl Metrics {
398 pub(crate) fn register(
399 registry: &prometheus_endpoint::Registry,
400 ) -> Result<Self, PrometheusError> {
401 Ok(Self {
402 finality_grandpa_round: register(
403 Gauge::new("substrate_finality_grandpa_round", "Highest completed GRANDPA round.")?,
404 registry,
405 )?,
406 finality_grandpa_prevotes: register(
407 Counter::new(
408 "substrate_finality_grandpa_prevotes_total",
409 "Total number of GRANDPA prevotes cast locally.",
410 )?,
411 registry,
412 )?,
413 finality_grandpa_precommits: register(
414 Counter::new(
415 "substrate_finality_grandpa_precommits_total",
416 "Total number of GRANDPA precommits cast locally.",
417 )?,
418 registry,
419 )?,
420 })
421 }
422}
423
424pub(crate) struct Environment<
426 Backend,
427 Block: BlockT,
428 C,
429 N: NetworkT<Block>,
430 S: SyncingT<Block>,
431 SC,
432 VR,
433> {
434 pub(crate) client: Arc<C>,
435 pub(crate) select_chain: SC,
436 pub(crate) voters: Arc<VoterSet<AuthorityId>>,
437 pub(crate) config: Config,
438 pub(crate) authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
439 pub(crate) network: crate::communication::NetworkBridge<Block, N, S>,
440 pub(crate) set_id: SetId,
441 pub(crate) voter_set_state: SharedVoterSetState<Block>,
442 pub(crate) voting_rule: VR,
443 pub(crate) metrics: Option<Metrics>,
444 pub(crate) justification_sender: Option<GrandpaJustificationSender<Block>>,
445 pub(crate) telemetry: Option<TelemetryHandle>,
446 pub(crate) offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
447 pub(crate) _phantom: PhantomData<Backend>,
448}
449
450impl<BE, Block: BlockT, C, N: NetworkT<Block>, S: SyncingT<Block>, SC, VR>
451 Environment<BE, Block, C, N, S, SC, VR>
452{
453 pub(crate) fn update_voter_set_state<F>(&self, f: F) -> Result<(), Error>
457 where
458 F: FnOnce(&VoterSetState<Block>) -> Result<Option<VoterSetState<Block>>, Error>,
459 {
460 self.voter_set_state.with(|voter_set_state| {
461 if let Some(set_state) = f(voter_set_state)? {
462 *voter_set_state = set_state;
463
464 if let Some(metrics) = self.metrics.as_ref() {
465 if let VoterSetState::Live { completed_rounds, .. } = voter_set_state {
466 let highest = completed_rounds
467 .rounds
468 .iter()
469 .map(|round| round.number)
470 .max()
471 .expect("There is always one completed round (genesis); qed");
472
473 metrics.finality_grandpa_round.set(highest);
474 }
475 }
476 }
477 Ok(())
478 })
479 }
480}
481
482impl<BE, Block, C, N, S, SC, VR> Environment<BE, Block, C, N, S, SC, VR>
483where
484 Block: BlockT,
485 BE: BackendT<Block>,
486 C: ClientForGrandpa<Block, BE>,
487 C::Api: GrandpaApi<Block>,
488 N: NetworkT<Block>,
489 S: SyncingT<Block>,
490 SC: SelectChainT<Block>,
491{
492 pub(crate) fn report_equivocation(
498 &self,
499 equivocation: Equivocation<Block::Hash, NumberFor<Block>>,
500 ) -> Result<(), Error> {
501 if let Some(local_id) = self.voter_set_state.voting_on(equivocation.round_number()) {
502 if *equivocation.offender() == local_id {
503 return Err(Error::Safety(
504 "Refraining from sending equivocation report for our own equivocation.".into(),
505 ))
506 }
507 }
508
509 let is_descendent_of = is_descendent_of(&*self.client, None);
510
511 let (best_block_hash, best_block_number) = {
512 let info = self.client.info();
515 (info.best_hash, info.best_number)
516 };
517
518 let authority_set = self.authority_set.inner();
519
520 let next_change = authority_set
523 .next_change(&best_block_hash, &is_descendent_of)
524 .map_err(|e| Error::Safety(e.to_string()))?;
525
526 let current_set_latest_hash = match next_change {
528 Some((_, n)) if n.is_zero() =>
529 return Err(Error::Safety("Authority set change signalled at genesis.".to_string())),
530 Some((_, n)) if n > best_block_number => best_block_hash,
534 Some((h, _)) => {
535 let header = self.client.header(h)?.expect(
537 "got block hash from registered pending change; \
538 pending changes are only registered on block import; qed.",
539 );
540
541 *header.parent_hash()
543 },
544 None => best_block_hash,
547 };
548
549 let key_owner_proof = match self
551 .client
552 .runtime_api()
553 .generate_key_ownership_proof(
554 current_set_latest_hash,
555 authority_set.set_id,
556 equivocation.offender().clone(),
557 )
558 .map_err(Error::RuntimeApi)?
559 {
560 Some(proof) => proof,
561 None => {
562 debug!(
563 target: LOG_TARGET,
564 "Equivocation offender is not part of the authority set."
565 );
566 return Ok(())
567 },
568 };
569
570 let equivocation_proof = EquivocationProof::new(authority_set.set_id, equivocation);
572
573 let mut runtime_api = self.client.runtime_api();
574
575 runtime_api.register_extension(
576 self.offchain_tx_pool_factory.offchain_transaction_pool(best_block_hash),
577 );
578
579 runtime_api
580 .submit_report_equivocation_unsigned_extrinsic(
581 best_block_hash,
582 equivocation_proof,
583 key_owner_proof,
584 )
585 .map_err(Error::RuntimeApi)?;
586
587 Ok(())
588 }
589}
590
591impl<BE, Block, C, N, S, SC, VR> finality_grandpa::Chain<Block::Hash, NumberFor<Block>>
592 for Environment<BE, Block, C, N, S, SC, VR>
593where
594 Block: BlockT,
595 BE: BackendT<Block>,
596 C: ClientForGrandpa<Block, BE>,
597 N: NetworkT<Block>,
598 S: SyncingT<Block>,
599 SC: SelectChainT<Block>,
600 VR: VotingRuleT<Block, C>,
601 NumberFor<Block>: BlockNumberOps,
602{
603 fn ancestry(
604 &self,
605 base: Block::Hash,
606 block: Block::Hash,
607 ) -> Result<Vec<Block::Hash>, GrandpaError> {
608 ancestry(&self.client, base, block)
609 }
610}
611
612pub(crate) fn ancestry<Block: BlockT, Client>(
613 client: &Arc<Client>,
614 base: Block::Hash,
615 block: Block::Hash,
616) -> Result<Vec<Block::Hash>, GrandpaError>
617where
618 Client: HeaderMetadata<Block, Error = sp_blockchain::Error>,
619{
620 if base == block {
621 return Err(GrandpaError::NotDescendent)
622 }
623
624 let tree_route_res = sp_blockchain::tree_route(&**client, block, base);
625
626 let tree_route = match tree_route_res {
627 Ok(tree_route) => tree_route,
628 Err(e) => {
629 debug!(
630 target: LOG_TARGET,
631 "Encountered error computing ancestry between block {:?} and base {:?}: {}",
632 block,
633 base,
634 e
635 );
636
637 return Err(GrandpaError::NotDescendent)
638 },
639 };
640
641 if tree_route.common_block().hash != base {
642 return Err(GrandpaError::NotDescendent)
643 }
644
645 Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect())
648}
649
650impl<B, Block, C, N, S, SC, VR> voter::Environment<Block::Hash, NumberFor<Block>>
651 for Environment<B, Block, C, N, S, SC, VR>
652where
653 Block: BlockT,
654 B: BackendT<Block>,
655 C: ClientForGrandpa<Block, B> + 'static,
656 C::Api: GrandpaApi<Block>,
657 N: NetworkT<Block>,
658 S: SyncingT<Block>,
659 SC: SelectChainT<Block> + 'static,
660 VR: VotingRuleT<Block, C> + Clone + 'static,
661 NumberFor<Block>: BlockNumberOps,
662{
663 type Timer = Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
664 type BestChain = Pin<
665 Box<
666 dyn Future<Output = Result<Option<(Block::Hash, NumberFor<Block>)>, Self::Error>>
667 + Send,
668 >,
669 >;
670
671 type Id = AuthorityId;
672 type Signature = AuthoritySignature;
673
674 type In = Pin<
676 Box<
677 dyn Stream<
678 Item = Result<
679 ::finality_grandpa::SignedMessage<
680 Block::Hash,
681 NumberFor<Block>,
682 Self::Signature,
683 Self::Id,
684 >,
685 Self::Error,
686 >,
687 > + Send,
688 >,
689 >;
690 type Out = Pin<
691 Box<
692 dyn Sink<
693 ::finality_grandpa::Message<Block::Hash, NumberFor<Block>>,
694 Error = Self::Error,
695 > + Send,
696 >,
697 >;
698
699 type Error = CommandOrError<Block::Hash, NumberFor<Block>>;
700
701 fn best_chain_containing(&self, block: Block::Hash) -> Self::BestChain {
702 let client = self.client.clone();
703 let authority_set = self.authority_set.clone();
704 let select_chain = self.select_chain.clone();
705 let voting_rule = self.voting_rule.clone();
706 let set_id = self.set_id;
707
708 Box::pin(async move {
709 if set_id != authority_set.set_id() {
714 return Ok(None)
715 }
716
717 best_chain_containing(block, client, authority_set, select_chain, voting_rule)
718 .await
719 .map_err(|e| e.into())
720 })
721 }
722
723 fn round_data(
724 &self,
725 round: RoundNumber,
726 ) -> voter::RoundData<Self::Id, Self::Timer, Self::In, Self::Out> {
727 let prevote_timer = Delay::new(self.config.gossip_duration * 2);
728 let precommit_timer = Delay::new(self.config.gossip_duration * 4);
729
730 let local_id = local_authority_id(&self.voters, self.config.keystore.as_ref());
731
732 let has_voted = match self.voter_set_state.has_voted(round) {
733 HasVoted::Yes(id, vote) =>
734 if local_id.as_ref().map(|k| k == &id).unwrap_or(false) {
735 HasVoted::Yes(id, vote)
736 } else {
737 HasVoted::No
738 },
739 HasVoted::No => HasVoted::No,
740 };
741
742 if let Some(id) = local_id.as_ref() {
750 self.voter_set_state.started_voting_on(round, id.clone());
751 }
752
753 let keystore = match (local_id.as_ref(), self.config.keystore.as_ref()) {
756 (Some(id), Some(keystore)) => Some((id.clone(), keystore.clone()).into()),
757 _ => None,
758 };
759
760 let (incoming, outgoing) = self.network.round_communication(
761 keystore,
762 crate::communication::Round(round),
763 crate::communication::SetId(self.set_id),
764 self.voters.clone(),
765 has_voted,
766 );
767
768 let incoming = Box::pin(
771 UntilVoteTargetImported::new(
772 self.client.import_notification_stream(),
773 self.network.clone(),
774 self.client.clone(),
775 incoming,
776 "round",
777 None,
778 )
779 .map_err(Into::into),
780 );
781
782 let outgoing = Box::pin(outgoing.sink_err_into());
784
785 voter::RoundData {
786 voter_id: local_id,
787 prevote_timer: Box::pin(prevote_timer.map(Ok)),
788 precommit_timer: Box::pin(precommit_timer.map(Ok)),
789 incoming,
790 outgoing,
791 }
792 }
793
794 fn proposed(
795 &self,
796 round: RoundNumber,
797 propose: PrimaryPropose<Block::Header>,
798 ) -> Result<(), Self::Error> {
799 let local_id = match self.voter_set_state.voting_on(round) {
800 Some(id) => id,
801 None => return Ok(()),
802 };
803
804 self.update_voter_set_state(|voter_set_state| {
805 let (completed_rounds, current_rounds) = voter_set_state.with_current_round(round)?;
806 let current_round = current_rounds
807 .get(&round)
808 .expect("checked in with_current_round that key exists; qed.");
809
810 if !current_round.can_propose() {
811 return Ok(None)
815 }
816
817 let mut current_rounds = current_rounds.clone();
818 let current_round = current_rounds
819 .get_mut(&round)
820 .expect("checked previously that key exists; qed.");
821
822 *current_round = HasVoted::Yes(local_id, Vote::Propose(propose));
823
824 let set_state = VoterSetState::<Block>::Live {
825 completed_rounds: completed_rounds.clone(),
826 current_rounds,
827 };
828
829 crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
830
831 Ok(Some(set_state))
832 })?;
833
834 Ok(())
835 }
836
837 fn prevoted(
838 &self,
839 round: RoundNumber,
840 prevote: Prevote<Block::Header>,
841 ) -> Result<(), Self::Error> {
842 let local_id = match self.voter_set_state.voting_on(round) {
843 Some(id) => id,
844 None => return Ok(()),
845 };
846
847 let report_prevote_metrics = |prevote: &Prevote<Block::Header>| {
848 telemetry!(
849 self.telemetry;
850 CONSENSUS_DEBUG;
851 "afg.prevote_issued";
852 "round" => round,
853 "target_number" => ?prevote.target_number,
854 "target_hash" => ?prevote.target_hash,
855 );
856
857 if let Some(metrics) = self.metrics.as_ref() {
858 metrics.finality_grandpa_prevotes.inc();
859 }
860 };
861
862 self.update_voter_set_state(|voter_set_state| {
863 let (completed_rounds, current_rounds) = voter_set_state.with_current_round(round)?;
864 let current_round = current_rounds
865 .get(&round)
866 .expect("checked in with_current_round that key exists; qed.");
867
868 if !current_round.can_prevote() {
869 return Ok(None)
873 }
874
875 report_prevote_metrics(&prevote);
877
878 let propose = current_round.propose();
879
880 let mut current_rounds = current_rounds.clone();
881 let current_round = current_rounds
882 .get_mut(&round)
883 .expect("checked previously that key exists; qed.");
884
885 *current_round = HasVoted::Yes(local_id, Vote::Prevote(propose.cloned(), prevote));
886
887 let set_state = VoterSetState::<Block>::Live {
888 completed_rounds: completed_rounds.clone(),
889 current_rounds,
890 };
891
892 crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
893
894 Ok(Some(set_state))
895 })?;
896
897 Ok(())
898 }
899
900 fn precommitted(
901 &self,
902 round: RoundNumber,
903 precommit: Precommit<Block::Header>,
904 ) -> Result<(), Self::Error> {
905 let local_id = match self.voter_set_state.voting_on(round) {
906 Some(id) => id,
907 None => return Ok(()),
908 };
909
910 let report_precommit_metrics = |precommit: &Precommit<Block::Header>| {
911 telemetry!(
912 self.telemetry;
913 CONSENSUS_DEBUG;
914 "afg.precommit_issued";
915 "round" => round,
916 "target_number" => ?precommit.target_number,
917 "target_hash" => ?precommit.target_hash,
918 );
919
920 if let Some(metrics) = self.metrics.as_ref() {
921 metrics.finality_grandpa_precommits.inc();
922 }
923 };
924
925 self.update_voter_set_state(|voter_set_state| {
926 let (completed_rounds, current_rounds) = voter_set_state.with_current_round(round)?;
927 let current_round = current_rounds
928 .get(&round)
929 .expect("checked in with_current_round that key exists; qed.");
930
931 if !current_round.can_precommit() {
932 return Ok(None)
936 }
937
938 report_precommit_metrics(&precommit);
940
941 let propose = current_round.propose();
942 let prevote = match current_round {
943 HasVoted::Yes(_, Vote::Prevote(_, prevote)) => prevote,
944 _ => {
945 let msg = "Voter precommitting before prevoting.";
946 return Err(Error::Safety(msg.to_string()))
947 },
948 };
949
950 let mut current_rounds = current_rounds.clone();
951 let current_round = current_rounds
952 .get_mut(&round)
953 .expect("checked previously that key exists; qed.");
954
955 *current_round = HasVoted::Yes(
956 local_id,
957 Vote::Precommit(propose.cloned(), prevote.clone(), precommit),
958 );
959
960 let set_state = VoterSetState::<Block>::Live {
961 completed_rounds: completed_rounds.clone(),
962 current_rounds,
963 };
964
965 crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
966
967 Ok(Some(set_state))
968 })?;
969
970 Ok(())
971 }
972
973 fn completed(
974 &self,
975 round: RoundNumber,
976 state: RoundState<Block::Hash, NumberFor<Block>>,
977 base: (Block::Hash, NumberFor<Block>),
978 historical_votes: &HistoricalVotes<Block>,
979 ) -> Result<(), Self::Error> {
980 debug!(
981 target: LOG_TARGET,
982 "Voter {} completed round {} in set {}. Estimate = {:?}, Finalized in round = {:?}",
983 self.config.name(),
984 round,
985 self.set_id,
986 state.estimate.as_ref().map(|e| e.1),
987 state.finalized.as_ref().map(|e| e.1),
988 );
989
990 self.update_voter_set_state(|voter_set_state| {
991 let (completed_rounds, current_rounds) =
995 if let VoterSetState::Live { completed_rounds, current_rounds } = voter_set_state {
996 (completed_rounds, current_rounds)
997 } else {
998 let msg = "Voter acting while in paused state.";
999 return Err(Error::Safety(msg.to_string()))
1000 };
1001
1002 let mut completed_rounds = completed_rounds.clone();
1003
1004 let votes = historical_votes.seen().to_vec();
1006
1007 completed_rounds.push(CompletedRound {
1008 number: round,
1009 state: state.clone(),
1010 base,
1011 votes,
1012 });
1013
1014 let mut current_rounds = current_rounds.clone();
1016 current_rounds.remove(&round);
1017
1018 current_rounds.entry(round + 1).or_insert(HasVoted::No);
1021
1022 let set_state = VoterSetState::<Block>::Live { completed_rounds, current_rounds };
1023
1024 crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
1025
1026 Ok(Some(set_state))
1027 })?;
1028
1029 self.voter_set_state.finished_voting_on(round);
1031
1032 Ok(())
1033 }
1034
1035 fn concluded(
1036 &self,
1037 round: RoundNumber,
1038 state: RoundState<Block::Hash, NumberFor<Block>>,
1039 _base: (Block::Hash, NumberFor<Block>),
1040 historical_votes: &HistoricalVotes<Block>,
1041 ) -> Result<(), Self::Error> {
1042 debug!(
1043 target: LOG_TARGET,
1044 "Voter {} concluded round {} in set {}. Estimate = {:?}, Finalized in round = {:?}",
1045 self.config.name(),
1046 round,
1047 self.set_id,
1048 state.estimate.as_ref().map(|e| e.1),
1049 state.finalized.as_ref().map(|e| e.1),
1050 );
1051
1052 self.update_voter_set_state(|voter_set_state| {
1053 let (completed_rounds, current_rounds) =
1056 if let VoterSetState::Live { completed_rounds, current_rounds } = voter_set_state {
1057 (completed_rounds, current_rounds)
1058 } else {
1059 let msg = "Voter acting while in paused state.";
1060 return Err(Error::Safety(msg.to_string()))
1061 };
1062
1063 let mut completed_rounds = completed_rounds.clone();
1064
1065 if let Some(already_completed) =
1066 completed_rounds.rounds.iter_mut().find(|r| r.number == round)
1067 {
1068 let n_existing_votes = already_completed.votes.len();
1069
1070 already_completed
1073 .votes
1074 .extend(historical_votes.seen().iter().skip(n_existing_votes).cloned());
1075 already_completed.state = state;
1076 crate::aux_schema::write_concluded_round(&*self.client, already_completed)?;
1077 }
1078
1079 let set_state = VoterSetState::<Block>::Live {
1080 completed_rounds,
1081 current_rounds: current_rounds.clone(),
1082 };
1083
1084 crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
1085
1086 Ok(Some(set_state))
1087 })?;
1088
1089 Ok(())
1090 }
1091
1092 fn finalize_block(
1093 &self,
1094 hash: Block::Hash,
1095 number: NumberFor<Block>,
1096 round: RoundNumber,
1097 commit: Commit<Block::Header>,
1098 ) -> Result<(), Self::Error> {
1099 finalize_block(
1100 self.client.clone(),
1101 &self.authority_set,
1102 Some(self.config.justification_generation_period),
1103 hash,
1104 number,
1105 (round, commit).into(),
1106 false,
1107 self.justification_sender.as_ref(),
1108 self.telemetry.clone(),
1109 )
1110 }
1111
1112 fn round_commit_timer(&self) -> Self::Timer {
1113 use rand::{thread_rng, Rng};
1114
1115 let delay: u64 =
1117 thread_rng().gen_range(0..2 * self.config.gossip_duration.as_millis() as u64);
1118 Box::pin(Delay::new(Duration::from_millis(delay)).map(Ok))
1119 }
1120
1121 fn prevote_equivocation(
1122 &self,
1123 _round: RoundNumber,
1124 equivocation: finality_grandpa::Equivocation<
1125 Self::Id,
1126 Prevote<Block::Header>,
1127 Self::Signature,
1128 >,
1129 ) {
1130 warn!(
1131 target: LOG_TARGET,
1132 "Detected prevote equivocation in the finality worker: {:?}", equivocation
1133 );
1134 if let Err(err) = self.report_equivocation(equivocation.into()) {
1135 warn!(target: LOG_TARGET, "Error reporting prevote equivocation: {}", err);
1136 }
1137 }
1138
1139 fn precommit_equivocation(
1140 &self,
1141 _round: RoundNumber,
1142 equivocation: finality_grandpa::Equivocation<
1143 Self::Id,
1144 Precommit<Block::Header>,
1145 Self::Signature,
1146 >,
1147 ) {
1148 warn!(
1149 target: LOG_TARGET,
1150 "Detected precommit equivocation in the finality worker: {:?}", equivocation
1151 );
1152 if let Err(err) = self.report_equivocation(equivocation.into()) {
1153 warn!(target: LOG_TARGET, "Error reporting precommit equivocation: {}", err);
1154 }
1155 }
1156}
1157
1158pub(crate) enum JustificationOrCommit<Block: BlockT> {
1159 Justification(GrandpaJustification<Block>),
1160 Commit((RoundNumber, Commit<Block::Header>)),
1161}
1162
1163impl<Block: BlockT> From<(RoundNumber, Commit<Block::Header>)> for JustificationOrCommit<Block> {
1164 fn from(commit: (RoundNumber, Commit<Block::Header>)) -> JustificationOrCommit<Block> {
1165 JustificationOrCommit::Commit(commit)
1166 }
1167}
1168
1169impl<Block: BlockT> From<GrandpaJustification<Block>> for JustificationOrCommit<Block> {
1170 fn from(justification: GrandpaJustification<Block>) -> JustificationOrCommit<Block> {
1171 JustificationOrCommit::Justification(justification)
1172 }
1173}
1174
1175async fn best_chain_containing<Block, Backend, Client, SelectChain, VotingRule>(
1176 block: Block::Hash,
1177 client: Arc<Client>,
1178 authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
1179 select_chain: SelectChain,
1180 voting_rule: VotingRule,
1181) -> Result<Option<(Block::Hash, NumberFor<Block>)>, Error>
1182where
1183 Backend: BackendT<Block>,
1184 Block: BlockT,
1185 Client: ClientForGrandpa<Block, Backend>,
1186 SelectChain: SelectChainT<Block> + 'static,
1187 VotingRule: VotingRuleT<Block, Client>,
1188{
1189 let base_header = match client.header(block)? {
1190 Some(h) => h,
1191 None => {
1192 warn!(
1193 target: LOG_TARGET,
1194 "Encountered error finding best chain containing {:?}: couldn't find base block",
1195 block,
1196 );
1197
1198 return Ok(None)
1199 },
1200 };
1201
1202 let limit = authority_set.current_limit(*base_header.number());
1207 debug!(
1208 target: LOG_TARGET,
1209 "Finding best chain containing block {:?} with number limit {:?}", block, limit
1210 );
1211
1212 let mut target_header = match select_chain.finality_target(block, None).await {
1213 Ok(target_hash) => client
1214 .header(target_hash)?
1215 .expect("Header known to exist after `finality_target` call; qed"),
1216 Err(err) => {
1217 debug!(
1218 target: LOG_TARGET,
1219 "Encountered error finding best chain containing {:?}: couldn't find target block: {}",
1220 block,
1221 err,
1222 );
1223
1224 base_header.clone()
1231 },
1232 };
1233
1234 let mut best_header = match select_chain.best_chain().await {
1238 Ok(best_header) => best_header,
1239 Err(err) => {
1240 warn!(
1241 target: LOG_TARGET,
1242 "Encountered error finding best chain containing {:?}: couldn't find best block: {}",
1243 block,
1244 err,
1245 );
1246
1247 return Ok(None)
1248 },
1249 };
1250
1251 let is_descendent_of = is_descendent_of(&*client, None);
1252
1253 if target_header.number() > best_header.number() ||
1254 target_header.number() == best_header.number() &&
1255 target_header.hash() != best_header.hash() ||
1256 !is_descendent_of(&target_header.hash(), &best_header.hash())?
1257 {
1258 debug!(
1259 target: LOG_TARGET,
1260 "SelectChain returned a finality target inconsistent with its best block. Restricting best block to target block"
1261 );
1262
1263 best_header = target_header.clone();
1264 }
1265
1266 debug!(
1267 target: LOG_TARGET,
1268 "SelectChain: finality target: #{} ({}), best block: #{} ({})",
1269 target_header.number(),
1270 target_header.hash(),
1271 best_header.number(),
1272 best_header.hash(),
1273 );
1274
1275 if let Some(target_number) = limit.filter(|limit| limit < target_header.number()) {
1278 loop {
1280 if *target_header.number() < target_number {
1281 unreachable!(
1282 "we are traversing backwards from a known block; \
1283 blocks are stored contiguously; \
1284 qed"
1285 );
1286 }
1287
1288 if *target_header.number() == target_number {
1289 break
1290 }
1291
1292 target_header = client
1293 .header(*target_header.parent_hash())?
1294 .expect("Header known to exist after `finality_target` call; qed");
1295 }
1296
1297 debug!(
1298 target: LOG_TARGET,
1299 "Finality target restricted to #{} ({}) due to pending authority set change",
1300 target_header.number(),
1301 target_header.hash()
1302 )
1303 }
1304
1305 Ok(voting_rule
1312 .restrict_vote(client.clone(), &base_header, &best_header, &target_header)
1313 .await
1314 .filter(|(_, restricted_number)| {
1315 restricted_number >= base_header.number() && restricted_number < target_header.number()
1317 })
1318 .or_else(|| Some((target_header.hash(), *target_header.number()))))
1319}
1320
1321pub(crate) fn should_process_justification<BE, Block, Client>(
1329 client: &Client,
1330 justification_period: u32,
1331 number: NumberFor<Block>,
1332 enacts_change: bool,
1333) -> bool
1334where
1335 Block: BlockT,
1336 BE: BackendT<Block>,
1337 Client: ClientForGrandpa<Block, BE>,
1338{
1339 if enacts_change {
1340 return true
1341 }
1342
1343 let last_finalized_number = client.info().finalized_number;
1344
1345 if last_finalized_number.is_zero() {
1347 return true
1348 }
1349
1350 last_finalized_number / justification_period.into() != number / justification_period.into()
1351}
1352
1353pub(crate) fn finalize_block<BE, Block, Client>(
1358 client: Arc<Client>,
1359 authority_set: &SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
1360 justification_generation_period: Option<u32>,
1361 hash: Block::Hash,
1362 number: NumberFor<Block>,
1363 justification_or_commit: JustificationOrCommit<Block>,
1364 initial_sync: bool,
1365 justification_sender: Option<&GrandpaJustificationSender<Block>>,
1366 telemetry: Option<TelemetryHandle>,
1367) -> Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>
1368where
1369 Block: BlockT,
1370 BE: BackendT<Block>,
1371 Client: ClientForGrandpa<Block, BE>,
1372{
1373 let mut authority_set = authority_set.inner();
1377
1378 let status = client.info();
1379
1380 if number <= status.finalized_number && client.hash(number)? == Some(hash) {
1381 warn!(target: LOG_TARGET, "Re-finalized block #{:?} ({:?}) in the canonical chain, current best finalized is #{:?}",
1385 hash,
1386 number,
1387 status.finalized_number,
1388 );
1389
1390 return Ok(())
1391 }
1392
1393 let old_authority_set = authority_set.clone();
1395
1396 let update_res: Result<_, Error> = client.lock_import_and_run(|import_op| {
1397 let status = authority_set
1398 .apply_standard_changes(
1399 hash,
1400 number,
1401 &is_descendent_of::<Block, _>(&*client, None),
1402 initial_sync,
1403 None,
1404 )
1405 .map_err(|e| Error::Safety(e.to_string()))?;
1406
1407 fn notify_justification<Block: BlockT>(
1409 justification_sender: Option<&GrandpaJustificationSender<Block>>,
1410 justification: impl FnOnce() -> Result<GrandpaJustification<Block>, Error>,
1411 ) {
1412 if let Some(sender) = justification_sender {
1413 if let Err(err) = sender.notify(justification) {
1414 warn!(
1415 target: LOG_TARGET,
1416 "Error creating justification for subscriber: {}", err
1417 );
1418 }
1419 }
1420 }
1421
1422 let (justification_required, justification) = match justification_or_commit {
1429 JustificationOrCommit::Justification(justification) => (true, justification),
1430 JustificationOrCommit::Commit((round_number, commit)) => {
1431 let enacts_change = status.new_set_block.is_some();
1432
1433 let justification_required = justification_generation_period
1434 .map(|period| {
1435 should_process_justification(&*client, period, number, enacts_change)
1436 })
1437 .unwrap_or(enacts_change);
1438
1439 let justification =
1440 GrandpaJustification::from_commit(&client, round_number, commit)?;
1441
1442 (justification_required, justification)
1443 },
1444 };
1445
1446 notify_justification(justification_sender, || Ok(justification.clone()));
1447
1448 let persisted_justification = if justification_required {
1449 Some((GRANDPA_ENGINE_ID, justification.encode()))
1450 } else {
1451 None
1452 };
1453
1454 client
1457 .apply_finality(import_op, hash, persisted_justification, true)
1458 .map_err(|e| {
1459 warn!(
1460 target: LOG_TARGET,
1461 "Error applying finality to block {:?}: {}",
1462 (hash, number),
1463 e
1464 );
1465 e
1466 })?;
1467
1468 debug!(target: LOG_TARGET, "Finalizing blocks up to ({:?}, {})", number, hash);
1469
1470 telemetry!(
1471 telemetry;
1472 CONSENSUS_INFO;
1473 "afg.finalized_blocks_up_to";
1474 "number" => ?number, "hash" => ?hash,
1475 );
1476
1477 crate::aux_schema::update_best_justification(&justification, |insert| {
1478 apply_aux(import_op, insert, &[])
1479 })?;
1480
1481 let new_authorities = if let Some((canon_hash, canon_number)) = status.new_set_block {
1482 let (new_id, set_ref) = authority_set.current();
1484
1485 if set_ref.len() > 16 {
1486 grandpa_log!(
1487 initial_sync,
1488 "👴 Applying GRANDPA set change to new set with {} authorities",
1489 set_ref.len(),
1490 );
1491 } else {
1492 grandpa_log!(
1493 initial_sync,
1494 "👴 Applying GRANDPA set change to new set {:?}",
1495 set_ref
1496 );
1497 }
1498
1499 telemetry!(
1500 telemetry;
1501 CONSENSUS_INFO;
1502 "afg.generating_new_authority_set";
1503 "number" => ?canon_number, "hash" => ?canon_hash,
1504 "authorities" => ?set_ref.to_vec(),
1505 "set_id" => ?new_id,
1506 );
1507 Some(NewAuthoritySet {
1508 canon_hash,
1509 canon_number,
1510 set_id: new_id,
1511 authorities: set_ref.to_vec(),
1512 })
1513 } else {
1514 None
1515 };
1516
1517 if status.changed {
1518 let write_result = crate::aux_schema::update_authority_set::<Block, _, _>(
1519 &authority_set,
1520 new_authorities.as_ref(),
1521 |insert| apply_aux(import_op, insert, &[]),
1522 );
1523
1524 if let Err(e) = write_result {
1525 warn!(
1526 target: LOG_TARGET,
1527 "Failed to write updated authority set to disk. Bailing."
1528 );
1529 warn!(target: LOG_TARGET, "Node is in a potentially inconsistent state.");
1530
1531 return Err(e.into())
1532 }
1533 }
1534
1535 Ok(new_authorities.map(VoterCommand::ChangeAuthorities))
1536 });
1537
1538 match update_res {
1539 Ok(Some(command)) => Err(CommandOrError::VoterCommand(command)),
1540 Ok(None) => Ok(()),
1541 Err(e) => {
1542 *authority_set = old_authority_set;
1543
1544 Err(CommandOrError::Error(e))
1545 },
1546 }
1547}