referrerpolicy=no-referrer-when-downgrade

sc_consensus_grandpa/
environment.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{
20	collections::{BTreeMap, HashMap},
21	marker::PhantomData,
22	pin::Pin,
23	sync::Arc,
24	time::Duration,
25};
26
27use codec::{Decode, Encode};
28use finality_grandpa::{
29	round::State as RoundState, voter, voter_set::VoterSet, BlockNumberOps, Error as GrandpaError,
30};
31use futures::prelude::*;
32use futures_timer::Delay;
33use log::{debug, warn};
34use parking_lot::RwLock;
35use prometheus_endpoint::{register, Counter, Gauge, PrometheusError, U64};
36
37use sc_client_api::{
38	backend::{apply_aux, Backend as BackendT},
39	utils::is_descendent_of,
40};
41use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
42use sc_transaction_pool_api::OffchainTransactionPoolFactory;
43use sp_api::ApiExt;
44use sp_blockchain::HeaderMetadata;
45use sp_consensus::SelectChain as SelectChainT;
46use sp_consensus_grandpa::{
47	AuthorityId, AuthoritySignature, Equivocation, EquivocationProof, GrandpaApi, RoundNumber,
48	SetId, GRANDPA_ENGINE_ID,
49};
50use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero};
51
52use crate::{
53	authorities::{AuthoritySet, SharedAuthoritySet},
54	communication::{Network as NetworkT, Syncing as SyncingT},
55	justification::GrandpaJustification,
56	local_authority_id,
57	notification::GrandpaJustificationSender,
58	until_imported::UntilVoteTargetImported,
59	voting_rule::VotingRule as VotingRuleT,
60	ClientForGrandpa, CommandOrError, Commit, Config, Error, NewAuthoritySet, Precommit, Prevote,
61	PrimaryPropose, SignedMessage, VoterCommand, LOG_TARGET,
62};
63
64type HistoricalVotes<Block> = finality_grandpa::HistoricalVotes<
65	<Block as BlockT>::Hash,
66	NumberFor<Block>,
67	AuthoritySignature,
68	AuthorityId,
69>;
70
71/// Data about a completed round. The set of votes that is stored must be
72/// minimal, i.e. at most one equivocation is stored per voter.
73#[derive(Debug, Clone, Decode, Encode, PartialEq)]
74pub struct CompletedRound<Block: BlockT> {
75	/// The round number.
76	pub number: RoundNumber,
77	/// The round state (prevote ghost, estimate, finalized, etc.)
78	pub state: RoundState<Block::Hash, NumberFor<Block>>,
79	/// The target block base used for voting in the round.
80	pub base: (Block::Hash, NumberFor<Block>),
81	/// All the votes observed in the round.
82	pub votes: Vec<SignedMessage<Block::Header>>,
83}
84
85// Data about last completed rounds within a single voter set. Stores
86// NUM_LAST_COMPLETED_ROUNDS and always contains data about at least one round
87// (genesis).
88#[derive(Debug, Clone, PartialEq)]
89pub struct CompletedRounds<Block: BlockT> {
90	rounds: Vec<CompletedRound<Block>>,
91	set_id: SetId,
92	voters: Vec<AuthorityId>,
93}
94
95// NOTE: the current strategy for persisting completed rounds is very naive
96// (update everything) and we also rely on cloning to do atomic updates,
97// therefore this value should be kept small for now.
98const NUM_LAST_COMPLETED_ROUNDS: usize = 2;
99
100impl<Block: BlockT> Encode for CompletedRounds<Block> {
101	fn encode(&self) -> Vec<u8> {
102		let v = Vec::from_iter(&self.rounds);
103		(&v, &self.set_id, &self.voters).encode()
104	}
105}
106
107impl<Block: BlockT> codec::EncodeLike for CompletedRounds<Block> {}
108
109impl<Block: BlockT> Decode for CompletedRounds<Block> {
110	fn decode<I: codec::Input>(value: &mut I) -> Result<Self, codec::Error> {
111		<(Vec<CompletedRound<Block>>, SetId, Vec<AuthorityId>)>::decode(value)
112			.map(|(rounds, set_id, voters)| CompletedRounds { rounds, set_id, voters })
113	}
114}
115
116impl<Block: BlockT> CompletedRounds<Block> {
117	/// Create a new completed rounds tracker with NUM_LAST_COMPLETED_ROUNDS capacity.
118	pub(crate) fn new(
119		genesis: CompletedRound<Block>,
120		set_id: SetId,
121		voters: &AuthoritySet<Block::Hash, NumberFor<Block>>,
122	) -> CompletedRounds<Block> {
123		let mut rounds = Vec::with_capacity(NUM_LAST_COMPLETED_ROUNDS);
124		rounds.push(genesis);
125
126		let voters = voters.current_authorities.iter().map(|(a, _)| a.clone()).collect();
127		CompletedRounds { rounds, set_id, voters }
128	}
129
130	/// Get the set-id and voter set of the completed rounds.
131	pub fn set_info(&self) -> (SetId, &[AuthorityId]) {
132		(self.set_id, &self.voters[..])
133	}
134
135	/// Iterate over all completed rounds.
136	pub fn iter(&self) -> impl Iterator<Item = &CompletedRound<Block>> {
137		self.rounds.iter().rev()
138	}
139
140	/// Returns the last (latest) completed round.
141	pub fn last(&self) -> &CompletedRound<Block> {
142		self.rounds
143			.first()
144			.expect("inner is never empty; always contains at least genesis; qed")
145	}
146
147	/// Push a new completed round, oldest round is evicted if number of rounds
148	/// is higher than `NUM_LAST_COMPLETED_ROUNDS`.
149	pub fn push(&mut self, completed_round: CompletedRound<Block>) {
150		use std::cmp::Reverse;
151
152		match self
153			.rounds
154			.binary_search_by_key(&Reverse(completed_round.number), |completed_round| {
155				Reverse(completed_round.number)
156			}) {
157			Ok(idx) => self.rounds[idx] = completed_round,
158			Err(idx) => self.rounds.insert(idx, completed_round),
159		};
160
161		if self.rounds.len() > NUM_LAST_COMPLETED_ROUNDS {
162			self.rounds.pop();
163		}
164	}
165}
166
167/// A map with voter status information for currently live rounds,
168/// which votes have we cast and what are they.
169pub type CurrentRounds<Block> = BTreeMap<RoundNumber, HasVoted<<Block as BlockT>::Header>>;
170
171/// The state of the current voter set, whether it is currently active or not
172/// and information related to the previously completed rounds. Current round
173/// voting status is used when restarting the voter, i.e. it will re-use the
174/// previous votes for a given round if appropriate (same round and same local
175/// key).
176#[derive(Debug, Decode, Encode, PartialEq)]
177pub enum VoterSetState<Block: BlockT> {
178	/// The voter is live, i.e. participating in rounds.
179	Live {
180		/// The previously completed rounds.
181		completed_rounds: CompletedRounds<Block>,
182		/// Voter status for the currently live rounds.
183		current_rounds: CurrentRounds<Block>,
184	},
185	/// The voter is paused, i.e. not casting or importing any votes.
186	Paused {
187		/// The previously completed rounds.
188		completed_rounds: CompletedRounds<Block>,
189	},
190}
191
192impl<Block: BlockT> VoterSetState<Block> {
193	/// Create a new live VoterSetState with round 0 as a completed round using
194	/// the given genesis state and the given authorities. Round 1 is added as a
195	/// current round (with state `HasVoted::No`).
196	pub(crate) fn live(
197		set_id: SetId,
198		authority_set: &AuthoritySet<Block::Hash, NumberFor<Block>>,
199		genesis_state: (Block::Hash, NumberFor<Block>),
200	) -> VoterSetState<Block> {
201		let state = RoundState::genesis((genesis_state.0, genesis_state.1));
202		let completed_rounds = CompletedRounds::new(
203			CompletedRound {
204				number: 0,
205				state,
206				base: (genesis_state.0, genesis_state.1),
207				votes: Vec::new(),
208			},
209			set_id,
210			authority_set,
211		);
212
213		let mut current_rounds = CurrentRounds::<Block>::new();
214		current_rounds.insert(1, HasVoted::No);
215
216		VoterSetState::Live { completed_rounds, current_rounds }
217	}
218
219	/// Returns the last completed rounds.
220	pub(crate) fn completed_rounds(&self) -> CompletedRounds<Block> {
221		match self {
222			VoterSetState::Live { completed_rounds, .. } => completed_rounds.clone(),
223			VoterSetState::Paused { completed_rounds } => completed_rounds.clone(),
224		}
225	}
226
227	/// Returns the last completed round.
228	pub(crate) fn last_completed_round(&self) -> CompletedRound<Block> {
229		match self {
230			VoterSetState::Live { completed_rounds, .. } => completed_rounds.last().clone(),
231			VoterSetState::Paused { completed_rounds } => completed_rounds.last().clone(),
232		}
233	}
234
235	/// Returns the voter set state validating that it includes the given round
236	/// in current rounds and that the voter isn't paused.
237	pub fn with_current_round(
238		&self,
239		round: RoundNumber,
240	) -> Result<(&CompletedRounds<Block>, &CurrentRounds<Block>), Error> {
241		if let VoterSetState::Live { completed_rounds, current_rounds } = self {
242			if current_rounds.contains_key(&round) {
243				Ok((completed_rounds, current_rounds))
244			} else {
245				let msg = "Voter acting on a live round we are not tracking.";
246				Err(Error::Safety(msg.to_string()))
247			}
248		} else {
249			let msg = "Voter acting while in paused state.";
250			Err(Error::Safety(msg.to_string()))
251		}
252	}
253}
254
255/// Whether we've voted already during a prior run of the program.
256#[derive(Clone, Debug, Decode, Encode, PartialEq)]
257pub enum HasVoted<Header: HeaderT> {
258	/// Has not voted already in this round.
259	No,
260	/// Has voted in this round.
261	Yes(AuthorityId, Vote<Header>),
262}
263
264/// The votes cast by this voter already during a prior run of the program.
265#[derive(Debug, Clone, Decode, Encode, PartialEq)]
266pub enum Vote<Header: HeaderT> {
267	/// Has cast a proposal.
268	Propose(PrimaryPropose<Header>),
269	/// Has cast a prevote.
270	Prevote(Option<PrimaryPropose<Header>>, Prevote<Header>),
271	/// Has cast a precommit (implies prevote.)
272	Precommit(Option<PrimaryPropose<Header>>, Prevote<Header>, Precommit<Header>),
273}
274
275impl<Header: HeaderT> HasVoted<Header> {
276	/// Returns the proposal we should vote with (if any.)
277	pub fn propose(&self) -> Option<&PrimaryPropose<Header>> {
278		match self {
279			HasVoted::Yes(_, Vote::Propose(propose)) => Some(propose),
280			HasVoted::Yes(_, Vote::Prevote(propose, _)) |
281			HasVoted::Yes(_, Vote::Precommit(propose, _, _)) => propose.as_ref(),
282			_ => None,
283		}
284	}
285
286	/// Returns the prevote we should vote with (if any.)
287	pub fn prevote(&self) -> Option<&Prevote<Header>> {
288		match self {
289			HasVoted::Yes(_, Vote::Prevote(_, prevote)) |
290			HasVoted::Yes(_, Vote::Precommit(_, prevote, _)) => Some(prevote),
291			_ => None,
292		}
293	}
294
295	/// Returns the precommit we should vote with (if any.)
296	pub fn precommit(&self) -> Option<&Precommit<Header>> {
297		match self {
298			HasVoted::Yes(_, Vote::Precommit(_, _, precommit)) => Some(precommit),
299			_ => None,
300		}
301	}
302
303	/// Returns true if the voter can still propose, false otherwise.
304	pub fn can_propose(&self) -> bool {
305		self.propose().is_none()
306	}
307
308	/// Returns true if the voter can still prevote, false otherwise.
309	pub fn can_prevote(&self) -> bool {
310		self.prevote().is_none()
311	}
312
313	/// Returns true if the voter can still precommit, false otherwise.
314	pub fn can_precommit(&self) -> bool {
315		self.precommit().is_none()
316	}
317}
318
319/// A voter set state meant to be shared safely across multiple owners.
320#[derive(Clone)]
321pub struct SharedVoterSetState<Block: BlockT> {
322	/// The inner shared `VoterSetState`.
323	inner: Arc<RwLock<VoterSetState<Block>>>,
324	/// A tracker for the rounds that we are actively participating on (i.e. voting)
325	/// and the authority id under which we are doing it.
326	voting: Arc<RwLock<HashMap<RoundNumber, AuthorityId>>>,
327}
328
329impl<Block: BlockT> From<VoterSetState<Block>> for SharedVoterSetState<Block> {
330	fn from(set_state: VoterSetState<Block>) -> Self {
331		SharedVoterSetState::new(set_state)
332	}
333}
334
335impl<Block: BlockT> SharedVoterSetState<Block> {
336	/// Create a new shared voter set tracker with the given state.
337	pub(crate) fn new(state: VoterSetState<Block>) -> Self {
338		SharedVoterSetState {
339			inner: Arc::new(RwLock::new(state)),
340			voting: Arc::new(RwLock::new(HashMap::new())),
341		}
342	}
343
344	/// Read the inner voter set state.
345	pub(crate) fn read(&self) -> parking_lot::RwLockReadGuard<VoterSetState<Block>> {
346		self.inner.read()
347	}
348
349	/// Get the authority id that we are using to vote on the given round, if any.
350	pub(crate) fn voting_on(&self, round: RoundNumber) -> Option<AuthorityId> {
351		self.voting.read().get(&round).cloned()
352	}
353
354	/// Note that we started voting on the give round with the given authority id.
355	pub(crate) fn started_voting_on(&self, round: RoundNumber, local_id: AuthorityId) {
356		self.voting.write().insert(round, local_id);
357	}
358
359	/// Note that we have finished voting on the given round. If we were voting on
360	/// the given round, the authority id that we were using to do it will be
361	/// cleared.
362	pub(crate) fn finished_voting_on(&self, round: RoundNumber) {
363		self.voting.write().remove(&round);
364	}
365
366	/// Return vote status information for the current round.
367	pub(crate) fn has_voted(&self, round: RoundNumber) -> HasVoted<Block::Header> {
368		match &*self.inner.read() {
369			VoterSetState::Live { current_rounds, .. } => current_rounds
370				.get(&round)
371				.and_then(|has_voted| match has_voted {
372					HasVoted::Yes(id, vote) => Some(HasVoted::Yes(id.clone(), vote.clone())),
373					_ => None,
374				})
375				.unwrap_or(HasVoted::No),
376			_ => HasVoted::No,
377		}
378	}
379
380	// NOTE: not exposed outside of this module intentionally.
381	fn with<F, R>(&self, f: F) -> R
382	where
383		F: FnOnce(&mut VoterSetState<Block>) -> R,
384	{
385		f(&mut *self.inner.write())
386	}
387}
388
389/// Prometheus metrics for GRANDPA.
390#[derive(Clone)]
391pub(crate) struct Metrics {
392	finality_grandpa_round: Gauge<U64>,
393	finality_grandpa_prevotes: Counter<U64>,
394	finality_grandpa_precommits: Counter<U64>,
395}
396
397impl Metrics {
398	pub(crate) fn register(
399		registry: &prometheus_endpoint::Registry,
400	) -> Result<Self, PrometheusError> {
401		Ok(Self {
402			finality_grandpa_round: register(
403				Gauge::new("substrate_finality_grandpa_round", "Highest completed GRANDPA round.")?,
404				registry,
405			)?,
406			finality_grandpa_prevotes: register(
407				Counter::new(
408					"substrate_finality_grandpa_prevotes_total",
409					"Total number of GRANDPA prevotes cast locally.",
410				)?,
411				registry,
412			)?,
413			finality_grandpa_precommits: register(
414				Counter::new(
415					"substrate_finality_grandpa_precommits_total",
416					"Total number of GRANDPA precommits cast locally.",
417				)?,
418				registry,
419			)?,
420		})
421	}
422}
423
424/// The environment we run GRANDPA in.
425pub(crate) struct Environment<
426	Backend,
427	Block: BlockT,
428	C,
429	N: NetworkT<Block>,
430	S: SyncingT<Block>,
431	SC,
432	VR,
433> {
434	pub(crate) client: Arc<C>,
435	pub(crate) select_chain: SC,
436	pub(crate) voters: Arc<VoterSet<AuthorityId>>,
437	pub(crate) config: Config,
438	pub(crate) authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
439	pub(crate) network: crate::communication::NetworkBridge<Block, N, S>,
440	pub(crate) set_id: SetId,
441	pub(crate) voter_set_state: SharedVoterSetState<Block>,
442	pub(crate) voting_rule: VR,
443	pub(crate) metrics: Option<Metrics>,
444	pub(crate) justification_sender: Option<GrandpaJustificationSender<Block>>,
445	pub(crate) telemetry: Option<TelemetryHandle>,
446	pub(crate) offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
447	pub(crate) _phantom: PhantomData<Backend>,
448}
449
450impl<BE, Block: BlockT, C, N: NetworkT<Block>, S: SyncingT<Block>, SC, VR>
451	Environment<BE, Block, C, N, S, SC, VR>
452{
453	/// Updates the voter set state using the given closure. The write lock is
454	/// held during evaluation of the closure and the environment's voter set
455	/// state is set to its result if successful.
456	pub(crate) fn update_voter_set_state<F>(&self, f: F) -> Result<(), Error>
457	where
458		F: FnOnce(&VoterSetState<Block>) -> Result<Option<VoterSetState<Block>>, Error>,
459	{
460		self.voter_set_state.with(|voter_set_state| {
461			if let Some(set_state) = f(voter_set_state)? {
462				*voter_set_state = set_state;
463
464				if let Some(metrics) = self.metrics.as_ref() {
465					if let VoterSetState::Live { completed_rounds, .. } = voter_set_state {
466						let highest = completed_rounds
467							.rounds
468							.iter()
469							.map(|round| round.number)
470							.max()
471							.expect("There is always one completed round (genesis); qed");
472
473						metrics.finality_grandpa_round.set(highest);
474					}
475				}
476			}
477			Ok(())
478		})
479	}
480}
481
482impl<BE, Block, C, N, S, SC, VR> Environment<BE, Block, C, N, S, SC, VR>
483where
484	Block: BlockT,
485	BE: BackendT<Block>,
486	C: ClientForGrandpa<Block, BE>,
487	C::Api: GrandpaApi<Block>,
488	N: NetworkT<Block>,
489	S: SyncingT<Block>,
490	SC: SelectChainT<Block>,
491{
492	/// Report the given equivocation to the GRANDPA runtime module. This method
493	/// generates a session membership proof of the offender and then submits an
494	/// extrinsic to report the equivocation. In particular, the session membership
495	/// proof must be generated at the block at which the given set was active which
496	/// isn't necessarily the best block if there are pending authority set changes.
497	pub(crate) fn report_equivocation(
498		&self,
499		equivocation: Equivocation<Block::Hash, NumberFor<Block>>,
500	) -> Result<(), Error> {
501		if let Some(local_id) = self.voter_set_state.voting_on(equivocation.round_number()) {
502			if *equivocation.offender() == local_id {
503				return Err(Error::Safety(
504					"Refraining from sending equivocation report for our own equivocation.".into(),
505				))
506			}
507		}
508
509		let is_descendent_of = is_descendent_of(&*self.client, None);
510
511		let (best_block_hash, best_block_number) = {
512			// TODO [#9158]: Use SelectChain::best_chain() to get a potentially
513			// more accurate best block
514			let info = self.client.info();
515			(info.best_hash, info.best_number)
516		};
517
518		let authority_set = self.authority_set.inner();
519
520		// block hash and number of the next pending authority set change in the
521		// given best chain.
522		let next_change = authority_set
523			.next_change(&best_block_hash, &is_descendent_of)
524			.map_err(|e| Error::Safety(e.to_string()))?;
525
526		// find the hash of the latest block in the current set
527		let current_set_latest_hash = match next_change {
528			Some((_, n)) if n.is_zero() =>
529				return Err(Error::Safety("Authority set change signalled at genesis.".to_string())),
530			// the next set starts at `n` so the current one lasts until `n - 1`. if
531			// `n` is later than the best block, then the current set is still live
532			// at best block.
533			Some((_, n)) if n > best_block_number => best_block_hash,
534			Some((h, _)) => {
535				// this is the header at which the new set will start
536				let header = self.client.header(h)?.expect(
537					"got block hash from registered pending change; \
538					 pending changes are only registered on block import; qed.",
539				);
540
541				// its parent block is the last block in the current set
542				*header.parent_hash()
543			},
544			// there is no pending change, the latest block for the current set is
545			// the best block.
546			None => best_block_hash,
547		};
548
549		// generate key ownership proof at that block
550		let key_owner_proof = match self
551			.client
552			.runtime_api()
553			.generate_key_ownership_proof(
554				current_set_latest_hash,
555				authority_set.set_id,
556				equivocation.offender().clone(),
557			)
558			.map_err(Error::RuntimeApi)?
559		{
560			Some(proof) => proof,
561			None => {
562				debug!(
563					target: LOG_TARGET,
564					"Equivocation offender is not part of the authority set."
565				);
566				return Ok(())
567			},
568		};
569
570		// submit equivocation report at **best** block
571		let equivocation_proof = EquivocationProof::new(authority_set.set_id, equivocation);
572
573		let mut runtime_api = self.client.runtime_api();
574
575		runtime_api.register_extension(
576			self.offchain_tx_pool_factory.offchain_transaction_pool(best_block_hash),
577		);
578
579		runtime_api
580			.submit_report_equivocation_unsigned_extrinsic(
581				best_block_hash,
582				equivocation_proof,
583				key_owner_proof,
584			)
585			.map_err(Error::RuntimeApi)?;
586
587		Ok(())
588	}
589}
590
591impl<BE, Block, C, N, S, SC, VR> finality_grandpa::Chain<Block::Hash, NumberFor<Block>>
592	for Environment<BE, Block, C, N, S, SC, VR>
593where
594	Block: BlockT,
595	BE: BackendT<Block>,
596	C: ClientForGrandpa<Block, BE>,
597	N: NetworkT<Block>,
598	S: SyncingT<Block>,
599	SC: SelectChainT<Block>,
600	VR: VotingRuleT<Block, C>,
601	NumberFor<Block>: BlockNumberOps,
602{
603	fn ancestry(
604		&self,
605		base: Block::Hash,
606		block: Block::Hash,
607	) -> Result<Vec<Block::Hash>, GrandpaError> {
608		ancestry(&self.client, base, block)
609	}
610}
611
612pub(crate) fn ancestry<Block: BlockT, Client>(
613	client: &Arc<Client>,
614	base: Block::Hash,
615	block: Block::Hash,
616) -> Result<Vec<Block::Hash>, GrandpaError>
617where
618	Client: HeaderMetadata<Block, Error = sp_blockchain::Error>,
619{
620	if base == block {
621		return Err(GrandpaError::NotDescendent)
622	}
623
624	let tree_route_res = sp_blockchain::tree_route(&**client, block, base);
625
626	let tree_route = match tree_route_res {
627		Ok(tree_route) => tree_route,
628		Err(e) => {
629			debug!(
630				target: LOG_TARGET,
631				"Encountered error computing ancestry between block {:?} and base {:?}: {}",
632				block,
633				base,
634				e
635			);
636
637			return Err(GrandpaError::NotDescendent)
638		},
639	};
640
641	if tree_route.common_block().hash != base {
642		return Err(GrandpaError::NotDescendent)
643	}
644
645	// skip one because our ancestry is meant to start from the parent of `block`,
646	// and `tree_route` includes it.
647	Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect())
648}
649
650impl<B, Block, C, N, S, SC, VR> voter::Environment<Block::Hash, NumberFor<Block>>
651	for Environment<B, Block, C, N, S, SC, VR>
652where
653	Block: BlockT,
654	B: BackendT<Block>,
655	C: ClientForGrandpa<Block, B> + 'static,
656	C::Api: GrandpaApi<Block>,
657	N: NetworkT<Block>,
658	S: SyncingT<Block>,
659	SC: SelectChainT<Block> + 'static,
660	VR: VotingRuleT<Block, C> + Clone + 'static,
661	NumberFor<Block>: BlockNumberOps,
662{
663	type Timer = Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
664	type BestChain = Pin<
665		Box<
666			dyn Future<Output = Result<Option<(Block::Hash, NumberFor<Block>)>, Self::Error>>
667				+ Send,
668		>,
669	>;
670
671	type Id = AuthorityId;
672	type Signature = AuthoritySignature;
673
674	// regular round message streams
675	type In = Pin<
676		Box<
677			dyn Stream<
678					Item = Result<
679						::finality_grandpa::SignedMessage<
680							Block::Hash,
681							NumberFor<Block>,
682							Self::Signature,
683							Self::Id,
684						>,
685						Self::Error,
686					>,
687				> + Send,
688		>,
689	>;
690	type Out = Pin<
691		Box<
692			dyn Sink<
693					::finality_grandpa::Message<Block::Hash, NumberFor<Block>>,
694					Error = Self::Error,
695				> + Send,
696		>,
697	>;
698
699	type Error = CommandOrError<Block::Hash, NumberFor<Block>>;
700
701	fn best_chain_containing(&self, block: Block::Hash) -> Self::BestChain {
702		let client = self.client.clone();
703		let authority_set = self.authority_set.clone();
704		let select_chain = self.select_chain.clone();
705		let voting_rule = self.voting_rule.clone();
706		let set_id = self.set_id;
707
708		Box::pin(async move {
709			// NOTE: when we finalize an authority set change through the sync protocol the voter is
710			//       signaled asynchronously. therefore the voter could still vote in the next round
711			//       before activating the new set. the `authority_set` is updated immediately thus
712			//       we restrict the voter based on that.
713			if set_id != authority_set.set_id() {
714				return Ok(None)
715			}
716
717			best_chain_containing(block, client, authority_set, select_chain, voting_rule)
718				.await
719				.map_err(|e| e.into())
720		})
721	}
722
723	fn round_data(
724		&self,
725		round: RoundNumber,
726	) -> voter::RoundData<Self::Id, Self::Timer, Self::In, Self::Out> {
727		let prevote_timer = Delay::new(self.config.gossip_duration * 2);
728		let precommit_timer = Delay::new(self.config.gossip_duration * 4);
729
730		let local_id = local_authority_id(&self.voters, self.config.keystore.as_ref());
731
732		let has_voted = match self.voter_set_state.has_voted(round) {
733			HasVoted::Yes(id, vote) =>
734				if local_id.as_ref().map(|k| k == &id).unwrap_or(false) {
735					HasVoted::Yes(id, vote)
736				} else {
737					HasVoted::No
738				},
739			HasVoted::No => HasVoted::No,
740		};
741
742		// NOTE: we cache the local authority id that we'll be using to vote on the
743		// given round. this is done to make sure we only check for available keys
744		// from the keystore in this method when beginning the round, otherwise if
745		// the keystore state changed during the round (e.g. a key was removed) it
746		// could lead to internal state inconsistencies in the voter environment
747		// (e.g. we wouldn't update the voter set state after prevoting since there's
748		// no local authority id).
749		if let Some(id) = local_id.as_ref() {
750			self.voter_set_state.started_voting_on(round, id.clone());
751		}
752
753		// we can only sign when we have a local key in the authority set
754		// and we have a reference to the keystore.
755		let keystore = match (local_id.as_ref(), self.config.keystore.as_ref()) {
756			(Some(id), Some(keystore)) => Some((id.clone(), keystore.clone()).into()),
757			_ => None,
758		};
759
760		let (incoming, outgoing) = self.network.round_communication(
761			keystore,
762			crate::communication::Round(round),
763			crate::communication::SetId(self.set_id),
764			self.voters.clone(),
765			has_voted,
766		);
767
768		// schedule incoming messages from the network to be held until
769		// corresponding blocks are imported.
770		let incoming = Box::pin(
771			UntilVoteTargetImported::new(
772				self.client.import_notification_stream(),
773				self.network.clone(),
774				self.client.clone(),
775				incoming,
776				"round",
777				None,
778			)
779			.map_err(Into::into),
780		);
781
782		// schedule network message cleanup when sink drops.
783		let outgoing = Box::pin(outgoing.sink_err_into());
784
785		voter::RoundData {
786			voter_id: local_id,
787			prevote_timer: Box::pin(prevote_timer.map(Ok)),
788			precommit_timer: Box::pin(precommit_timer.map(Ok)),
789			incoming,
790			outgoing,
791		}
792	}
793
794	fn proposed(
795		&self,
796		round: RoundNumber,
797		propose: PrimaryPropose<Block::Header>,
798	) -> Result<(), Self::Error> {
799		let local_id = match self.voter_set_state.voting_on(round) {
800			Some(id) => id,
801			None => return Ok(()),
802		};
803
804		self.update_voter_set_state(|voter_set_state| {
805			let (completed_rounds, current_rounds) = voter_set_state.with_current_round(round)?;
806			let current_round = current_rounds
807				.get(&round)
808				.expect("checked in with_current_round that key exists; qed.");
809
810			if !current_round.can_propose() {
811				// we've already proposed in this round (in a previous run),
812				// ignore the given vote and don't update the voter set
813				// state
814				return Ok(None)
815			}
816
817			let mut current_rounds = current_rounds.clone();
818			let current_round = current_rounds
819				.get_mut(&round)
820				.expect("checked previously that key exists; qed.");
821
822			*current_round = HasVoted::Yes(local_id, Vote::Propose(propose));
823
824			let set_state = VoterSetState::<Block>::Live {
825				completed_rounds: completed_rounds.clone(),
826				current_rounds,
827			};
828
829			crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
830
831			Ok(Some(set_state))
832		})?;
833
834		Ok(())
835	}
836
837	fn prevoted(
838		&self,
839		round: RoundNumber,
840		prevote: Prevote<Block::Header>,
841	) -> Result<(), Self::Error> {
842		let local_id = match self.voter_set_state.voting_on(round) {
843			Some(id) => id,
844			None => return Ok(()),
845		};
846
847		let report_prevote_metrics = |prevote: &Prevote<Block::Header>| {
848			telemetry!(
849				self.telemetry;
850				CONSENSUS_DEBUG;
851				"afg.prevote_issued";
852				"round" => round,
853				"target_number" => ?prevote.target_number,
854				"target_hash" => ?prevote.target_hash,
855			);
856
857			if let Some(metrics) = self.metrics.as_ref() {
858				metrics.finality_grandpa_prevotes.inc();
859			}
860		};
861
862		self.update_voter_set_state(|voter_set_state| {
863			let (completed_rounds, current_rounds) = voter_set_state.with_current_round(round)?;
864			let current_round = current_rounds
865				.get(&round)
866				.expect("checked in with_current_round that key exists; qed.");
867
868			if !current_round.can_prevote() {
869				// we've already prevoted in this round (in a previous run),
870				// ignore the given vote and don't update the voter set
871				// state
872				return Ok(None)
873			}
874
875			// report to telemetry and prometheus
876			report_prevote_metrics(&prevote);
877
878			let propose = current_round.propose();
879
880			let mut current_rounds = current_rounds.clone();
881			let current_round = current_rounds
882				.get_mut(&round)
883				.expect("checked previously that key exists; qed.");
884
885			*current_round = HasVoted::Yes(local_id, Vote::Prevote(propose.cloned(), prevote));
886
887			let set_state = VoterSetState::<Block>::Live {
888				completed_rounds: completed_rounds.clone(),
889				current_rounds,
890			};
891
892			crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
893
894			Ok(Some(set_state))
895		})?;
896
897		Ok(())
898	}
899
900	fn precommitted(
901		&self,
902		round: RoundNumber,
903		precommit: Precommit<Block::Header>,
904	) -> Result<(), Self::Error> {
905		let local_id = match self.voter_set_state.voting_on(round) {
906			Some(id) => id,
907			None => return Ok(()),
908		};
909
910		let report_precommit_metrics = |precommit: &Precommit<Block::Header>| {
911			telemetry!(
912				self.telemetry;
913				CONSENSUS_DEBUG;
914				"afg.precommit_issued";
915				"round" => round,
916				"target_number" => ?precommit.target_number,
917				"target_hash" => ?precommit.target_hash,
918			);
919
920			if let Some(metrics) = self.metrics.as_ref() {
921				metrics.finality_grandpa_precommits.inc();
922			}
923		};
924
925		self.update_voter_set_state(|voter_set_state| {
926			let (completed_rounds, current_rounds) = voter_set_state.with_current_round(round)?;
927			let current_round = current_rounds
928				.get(&round)
929				.expect("checked in with_current_round that key exists; qed.");
930
931			if !current_round.can_precommit() {
932				// we've already precommitted in this round (in a previous run),
933				// ignore the given vote and don't update the voter set
934				// state
935				return Ok(None)
936			}
937
938			// report to telemetry and prometheus
939			report_precommit_metrics(&precommit);
940
941			let propose = current_round.propose();
942			let prevote = match current_round {
943				HasVoted::Yes(_, Vote::Prevote(_, prevote)) => prevote,
944				_ => {
945					let msg = "Voter precommitting before prevoting.";
946					return Err(Error::Safety(msg.to_string()))
947				},
948			};
949
950			let mut current_rounds = current_rounds.clone();
951			let current_round = current_rounds
952				.get_mut(&round)
953				.expect("checked previously that key exists; qed.");
954
955			*current_round = HasVoted::Yes(
956				local_id,
957				Vote::Precommit(propose.cloned(), prevote.clone(), precommit),
958			);
959
960			let set_state = VoterSetState::<Block>::Live {
961				completed_rounds: completed_rounds.clone(),
962				current_rounds,
963			};
964
965			crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
966
967			Ok(Some(set_state))
968		})?;
969
970		Ok(())
971	}
972
973	fn completed(
974		&self,
975		round: RoundNumber,
976		state: RoundState<Block::Hash, NumberFor<Block>>,
977		base: (Block::Hash, NumberFor<Block>),
978		historical_votes: &HistoricalVotes<Block>,
979	) -> Result<(), Self::Error> {
980		debug!(
981			target: LOG_TARGET,
982			"Voter {} completed round {} in set {}. Estimate = {:?}, Finalized in round = {:?}",
983			self.config.name(),
984			round,
985			self.set_id,
986			state.estimate.as_ref().map(|e| e.1),
987			state.finalized.as_ref().map(|e| e.1),
988		);
989
990		self.update_voter_set_state(|voter_set_state| {
991			// NOTE: we don't use `with_current_round` here, it is possible that
992			// we are not currently tracking this round if it is a round we
993			// caught up to.
994			let (completed_rounds, current_rounds) =
995				if let VoterSetState::Live { completed_rounds, current_rounds } = voter_set_state {
996					(completed_rounds, current_rounds)
997				} else {
998					let msg = "Voter acting while in paused state.";
999					return Err(Error::Safety(msg.to_string()))
1000				};
1001
1002			let mut completed_rounds = completed_rounds.clone();
1003
1004			// TODO: Future integration will store the prevote and precommit index. See #2611.
1005			let votes = historical_votes.seen().to_vec();
1006
1007			completed_rounds.push(CompletedRound {
1008				number: round,
1009				state: state.clone(),
1010				base,
1011				votes,
1012			});
1013
1014			// remove the round from live rounds and start tracking the next round
1015			let mut current_rounds = current_rounds.clone();
1016			current_rounds.remove(&round);
1017
1018			// NOTE: this entry should always exist as GRANDPA rounds are always
1019			// started in increasing order, still it's better to play it safe.
1020			current_rounds.entry(round + 1).or_insert(HasVoted::No);
1021
1022			let set_state = VoterSetState::<Block>::Live { completed_rounds, current_rounds };
1023
1024			crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
1025
1026			Ok(Some(set_state))
1027		})?;
1028
1029		// clear any cached local authority id associated with this round
1030		self.voter_set_state.finished_voting_on(round);
1031
1032		Ok(())
1033	}
1034
1035	fn concluded(
1036		&self,
1037		round: RoundNumber,
1038		state: RoundState<Block::Hash, NumberFor<Block>>,
1039		_base: (Block::Hash, NumberFor<Block>),
1040		historical_votes: &HistoricalVotes<Block>,
1041	) -> Result<(), Self::Error> {
1042		debug!(
1043			target: LOG_TARGET,
1044			"Voter {} concluded round {} in set {}. Estimate = {:?}, Finalized in round = {:?}",
1045			self.config.name(),
1046			round,
1047			self.set_id,
1048			state.estimate.as_ref().map(|e| e.1),
1049			state.finalized.as_ref().map(|e| e.1),
1050		);
1051
1052		self.update_voter_set_state(|voter_set_state| {
1053			// NOTE: we don't use `with_current_round` here, because a concluded
1054			// round is completed and cannot be current.
1055			let (completed_rounds, current_rounds) =
1056				if let VoterSetState::Live { completed_rounds, current_rounds } = voter_set_state {
1057					(completed_rounds, current_rounds)
1058				} else {
1059					let msg = "Voter acting while in paused state.";
1060					return Err(Error::Safety(msg.to_string()))
1061				};
1062
1063			let mut completed_rounds = completed_rounds.clone();
1064
1065			if let Some(already_completed) =
1066				completed_rounds.rounds.iter_mut().find(|r| r.number == round)
1067			{
1068				let n_existing_votes = already_completed.votes.len();
1069
1070				// the interface of Environment guarantees that the previous `historical_votes`
1071				// from `completable` is a prefix of what is passed to `concluded`.
1072				already_completed
1073					.votes
1074					.extend(historical_votes.seen().iter().skip(n_existing_votes).cloned());
1075				already_completed.state = state;
1076				crate::aux_schema::write_concluded_round(&*self.client, already_completed)?;
1077			}
1078
1079			let set_state = VoterSetState::<Block>::Live {
1080				completed_rounds,
1081				current_rounds: current_rounds.clone(),
1082			};
1083
1084			crate::aux_schema::write_voter_set_state(&*self.client, &set_state)?;
1085
1086			Ok(Some(set_state))
1087		})?;
1088
1089		Ok(())
1090	}
1091
1092	fn finalize_block(
1093		&self,
1094		hash: Block::Hash,
1095		number: NumberFor<Block>,
1096		round: RoundNumber,
1097		commit: Commit<Block::Header>,
1098	) -> Result<(), Self::Error> {
1099		finalize_block(
1100			self.client.clone(),
1101			&self.authority_set,
1102			Some(self.config.justification_generation_period),
1103			hash,
1104			number,
1105			(round, commit).into(),
1106			false,
1107			self.justification_sender.as_ref(),
1108			self.telemetry.clone(),
1109		)
1110	}
1111
1112	fn round_commit_timer(&self) -> Self::Timer {
1113		use rand::{thread_rng, Rng};
1114
1115		// random between `[0, 2 * gossip_duration]` seconds.
1116		let delay: u64 =
1117			thread_rng().gen_range(0..2 * self.config.gossip_duration.as_millis() as u64);
1118		Box::pin(Delay::new(Duration::from_millis(delay)).map(Ok))
1119	}
1120
1121	fn prevote_equivocation(
1122		&self,
1123		_round: RoundNumber,
1124		equivocation: finality_grandpa::Equivocation<
1125			Self::Id,
1126			Prevote<Block::Header>,
1127			Self::Signature,
1128		>,
1129	) {
1130		warn!(
1131			target: LOG_TARGET,
1132			"Detected prevote equivocation in the finality worker: {:?}", equivocation
1133		);
1134		if let Err(err) = self.report_equivocation(equivocation.into()) {
1135			warn!(target: LOG_TARGET, "Error reporting prevote equivocation: {}", err);
1136		}
1137	}
1138
1139	fn precommit_equivocation(
1140		&self,
1141		_round: RoundNumber,
1142		equivocation: finality_grandpa::Equivocation<
1143			Self::Id,
1144			Precommit<Block::Header>,
1145			Self::Signature,
1146		>,
1147	) {
1148		warn!(
1149			target: LOG_TARGET,
1150			"Detected precommit equivocation in the finality worker: {:?}", equivocation
1151		);
1152		if let Err(err) = self.report_equivocation(equivocation.into()) {
1153			warn!(target: LOG_TARGET, "Error reporting precommit equivocation: {}", err);
1154		}
1155	}
1156}
1157
1158pub(crate) enum JustificationOrCommit<Block: BlockT> {
1159	Justification(GrandpaJustification<Block>),
1160	Commit((RoundNumber, Commit<Block::Header>)),
1161}
1162
1163impl<Block: BlockT> From<(RoundNumber, Commit<Block::Header>)> for JustificationOrCommit<Block> {
1164	fn from(commit: (RoundNumber, Commit<Block::Header>)) -> JustificationOrCommit<Block> {
1165		JustificationOrCommit::Commit(commit)
1166	}
1167}
1168
1169impl<Block: BlockT> From<GrandpaJustification<Block>> for JustificationOrCommit<Block> {
1170	fn from(justification: GrandpaJustification<Block>) -> JustificationOrCommit<Block> {
1171		JustificationOrCommit::Justification(justification)
1172	}
1173}
1174
1175async fn best_chain_containing<Block, Backend, Client, SelectChain, VotingRule>(
1176	block: Block::Hash,
1177	client: Arc<Client>,
1178	authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
1179	select_chain: SelectChain,
1180	voting_rule: VotingRule,
1181) -> Result<Option<(Block::Hash, NumberFor<Block>)>, Error>
1182where
1183	Backend: BackendT<Block>,
1184	Block: BlockT,
1185	Client: ClientForGrandpa<Block, Backend>,
1186	SelectChain: SelectChainT<Block> + 'static,
1187	VotingRule: VotingRuleT<Block, Client>,
1188{
1189	let base_header = match client.header(block)? {
1190		Some(h) => h,
1191		None => {
1192			warn!(
1193				target: LOG_TARGET,
1194				"Encountered error finding best chain containing {:?}: couldn't find base block",
1195				block,
1196			);
1197
1198			return Ok(None)
1199		},
1200	};
1201
1202	// we refuse to vote beyond the current limit number where transitions are scheduled to occur.
1203	// once blocks are finalized that make that transition irrelevant or activate it, we will
1204	// proceed onwards. most of the time there will be no pending transition.  the limit, if any, is
1205	// guaranteed to be higher than or equal to the given base number.
1206	let limit = authority_set.current_limit(*base_header.number());
1207	debug!(
1208		target: LOG_TARGET,
1209		"Finding best chain containing block {:?} with number limit {:?}", block, limit
1210	);
1211
1212	let mut target_header = match select_chain.finality_target(block, None).await {
1213		Ok(target_hash) => client
1214			.header(target_hash)?
1215			.expect("Header known to exist after `finality_target` call; qed"),
1216		Err(err) => {
1217			debug!(
1218				target: LOG_TARGET,
1219				"Encountered error finding best chain containing {:?}: couldn't find target block: {}",
1220				block,
1221				err,
1222			);
1223
1224			// NOTE: in case the given `SelectChain` doesn't provide any block we fallback to using
1225			// the given base block provided by the GRANDPA voter.
1226			//
1227			// For example, `LongestChain` will error if the given block to use as base isn't part
1228			// of the best chain (as defined by `LongestChain`), which could happen if there was a
1229			// re-org.
1230			base_header.clone()
1231		},
1232	};
1233
1234	// NOTE: this is purposefully done after `finality_target` to prevent a case
1235	// where in-between these two requests there is a block import and
1236	// `finality_target` returns something higher than `best_chain`.
1237	let mut best_header = match select_chain.best_chain().await {
1238		Ok(best_header) => best_header,
1239		Err(err) => {
1240			warn!(
1241				target: LOG_TARGET,
1242				"Encountered error finding best chain containing {:?}: couldn't find best block: {}",
1243				block,
1244				err,
1245			);
1246
1247			return Ok(None)
1248		},
1249	};
1250
1251	let is_descendent_of = is_descendent_of(&*client, None);
1252
1253	if target_header.number() > best_header.number() ||
1254		target_header.number() == best_header.number() &&
1255			target_header.hash() != best_header.hash() ||
1256		!is_descendent_of(&target_header.hash(), &best_header.hash())?
1257	{
1258		debug!(
1259			target: LOG_TARGET,
1260			"SelectChain returned a finality target inconsistent with its best block. Restricting best block to target block"
1261		);
1262
1263		best_header = target_header.clone();
1264	}
1265
1266	debug!(
1267		target: LOG_TARGET,
1268		"SelectChain: finality target: #{} ({}), best block: #{} ({})",
1269		target_header.number(),
1270		target_header.hash(),
1271		best_header.number(),
1272		best_header.hash(),
1273	);
1274
1275	// check if our vote is currently being limited due to a pending change,
1276	// in which case we will restrict our target header to the given limit
1277	if let Some(target_number) = limit.filter(|limit| limit < target_header.number()) {
1278		// walk backwards until we find the target block
1279		loop {
1280			if *target_header.number() < target_number {
1281				unreachable!(
1282					"we are traversing backwards from a known block; \
1283					 blocks are stored contiguously; \
1284					 qed"
1285				);
1286			}
1287
1288			if *target_header.number() == target_number {
1289				break
1290			}
1291
1292			target_header = client
1293				.header(*target_header.parent_hash())?
1294				.expect("Header known to exist after `finality_target` call; qed");
1295		}
1296
1297		debug!(
1298			target: LOG_TARGET,
1299			"Finality target restricted to #{} ({}) due to pending authority set change",
1300			target_header.number(),
1301			target_header.hash()
1302		)
1303	}
1304
1305	// restrict vote according to the given voting rule, if the voting rule
1306	// doesn't restrict the vote then we keep the previous target.
1307	//
1308	// we also make sure that the restricted vote is higher than the round base
1309	// (i.e. last finalized), otherwise the value returned by the given voting
1310	// rule is ignored and the original target is used instead.
1311	Ok(voting_rule
1312		.restrict_vote(client.clone(), &base_header, &best_header, &target_header)
1313		.await
1314		.filter(|(_, restricted_number)| {
1315			// we can only restrict votes within the interval [base, target]
1316			restricted_number >= base_header.number() && restricted_number < target_header.number()
1317		})
1318		.or_else(|| Some((target_header.hash(), *target_header.number()))))
1319}
1320
1321/// Whether we should process a justification for the given block.
1322///
1323/// This can be used to decide whether to import a justification (when
1324/// importing a block), or whether to generate a justification from a
1325/// commit (when validating). Justifications for blocks that change the
1326/// authority set will always be processed, otherwise we'll only process
1327/// justifications if the last one was `justification_period` blocks ago.
1328pub(crate) fn should_process_justification<BE, Block, Client>(
1329	client: &Client,
1330	justification_period: u32,
1331	number: NumberFor<Block>,
1332	enacts_change: bool,
1333) -> bool
1334where
1335	Block: BlockT,
1336	BE: BackendT<Block>,
1337	Client: ClientForGrandpa<Block, BE>,
1338{
1339	if enacts_change {
1340		return true
1341	}
1342
1343	let last_finalized_number = client.info().finalized_number;
1344
1345	// keep the first justification before reaching the justification period
1346	if last_finalized_number.is_zero() {
1347		return true
1348	}
1349
1350	last_finalized_number / justification_period.into() != number / justification_period.into()
1351}
1352
1353/// Finalize the given block and apply any authority set changes. If an
1354/// authority set change is enacted then a justification is created (if not
1355/// given) and stored with the block when finalizing it.
1356/// This method assumes that the block being finalized has already been imported.
1357pub(crate) fn finalize_block<BE, Block, Client>(
1358	client: Arc<Client>,
1359	authority_set: &SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
1360	justification_generation_period: Option<u32>,
1361	hash: Block::Hash,
1362	number: NumberFor<Block>,
1363	justification_or_commit: JustificationOrCommit<Block>,
1364	initial_sync: bool,
1365	justification_sender: Option<&GrandpaJustificationSender<Block>>,
1366	telemetry: Option<TelemetryHandle>,
1367) -> Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>
1368where
1369	Block: BlockT,
1370	BE: BackendT<Block>,
1371	Client: ClientForGrandpa<Block, BE>,
1372{
1373	// NOTE: lock must be held through writing to DB to avoid race. this lock
1374	//       also implicitly synchronizes the check for last finalized number
1375	//       below.
1376	let mut authority_set = authority_set.inner();
1377
1378	let status = client.info();
1379
1380	if number <= status.finalized_number && client.hash(number)? == Some(hash) {
1381		// This can happen after a forced change (triggered manually from the runtime when
1382		// finality is stalled), since the voter will be restarted at the median last finalized
1383		// block, which can be lower than the local best finalized block.
1384		warn!(target: LOG_TARGET, "Re-finalized block #{:?} ({:?}) in the canonical chain, current best finalized is #{:?}",
1385				hash,
1386				number,
1387				status.finalized_number,
1388		);
1389
1390		return Ok(())
1391	}
1392
1393	// FIXME #1483: clone only when changed
1394	let old_authority_set = authority_set.clone();
1395
1396	let update_res: Result<_, Error> = client.lock_import_and_run(|import_op| {
1397		let status = authority_set
1398			.apply_standard_changes(
1399				hash,
1400				number,
1401				&is_descendent_of::<Block, _>(&*client, None),
1402				initial_sync,
1403				None,
1404			)
1405			.map_err(|e| Error::Safety(e.to_string()))?;
1406
1407		// send a justification notification if a sender exists and in case of error log it.
1408		fn notify_justification<Block: BlockT>(
1409			justification_sender: Option<&GrandpaJustificationSender<Block>>,
1410			justification: impl FnOnce() -> Result<GrandpaJustification<Block>, Error>,
1411		) {
1412			if let Some(sender) = justification_sender {
1413				if let Err(err) = sender.notify(justification) {
1414					warn!(
1415						target: LOG_TARGET,
1416						"Error creating justification for subscriber: {}", err
1417					);
1418				}
1419			}
1420		}
1421
1422		// NOTE: this code assumes that honest voters will never vote past a
1423		// transition block, thus we don't have to worry about the case where
1424		// we have a transition with `effective_block = N`, but we finalize
1425		// `N+1`. this assumption is required to make sure we store
1426		// justifications for transition blocks which will be requested by
1427		// syncing clients.
1428		let (justification_required, justification) = match justification_or_commit {
1429			JustificationOrCommit::Justification(justification) => (true, justification),
1430			JustificationOrCommit::Commit((round_number, commit)) => {
1431				let enacts_change = status.new_set_block.is_some();
1432
1433				let justification_required = justification_generation_period
1434					.map(|period| {
1435						should_process_justification(&*client, period, number, enacts_change)
1436					})
1437					.unwrap_or(enacts_change);
1438
1439				let justification =
1440					GrandpaJustification::from_commit(&client, round_number, commit)?;
1441
1442				(justification_required, justification)
1443			},
1444		};
1445
1446		notify_justification(justification_sender, || Ok(justification.clone()));
1447
1448		let persisted_justification = if justification_required {
1449			Some((GRANDPA_ENGINE_ID, justification.encode()))
1450		} else {
1451			None
1452		};
1453
1454		// ideally some handle to a synchronization oracle would be used
1455		// to avoid unconditionally notifying.
1456		client
1457			.apply_finality(import_op, hash, persisted_justification, true)
1458			.map_err(|e| {
1459				warn!(
1460					target: LOG_TARGET,
1461					"Error applying finality to block {:?}: {}",
1462					(hash, number),
1463					e
1464				);
1465				e
1466			})?;
1467
1468		debug!(target: LOG_TARGET, "Finalizing blocks up to ({:?}, {})", number, hash);
1469
1470		telemetry!(
1471			telemetry;
1472			CONSENSUS_INFO;
1473			"afg.finalized_blocks_up_to";
1474			"number" => ?number, "hash" => ?hash,
1475		);
1476
1477		crate::aux_schema::update_best_justification(&justification, |insert| {
1478			apply_aux(import_op, insert, &[])
1479		})?;
1480
1481		let new_authorities = if let Some((canon_hash, canon_number)) = status.new_set_block {
1482			// the authority set has changed.
1483			let (new_id, set_ref) = authority_set.current();
1484
1485			if set_ref.len() > 16 {
1486				grandpa_log!(
1487					initial_sync,
1488					"👴 Applying GRANDPA set change to new set with {} authorities",
1489					set_ref.len(),
1490				);
1491			} else {
1492				grandpa_log!(
1493					initial_sync,
1494					"👴 Applying GRANDPA set change to new set {:?}",
1495					set_ref
1496				);
1497			}
1498
1499			telemetry!(
1500				telemetry;
1501				CONSENSUS_INFO;
1502				"afg.generating_new_authority_set";
1503				"number" => ?canon_number, "hash" => ?canon_hash,
1504				"authorities" => ?set_ref.to_vec(),
1505				"set_id" => ?new_id,
1506			);
1507			Some(NewAuthoritySet {
1508				canon_hash,
1509				canon_number,
1510				set_id: new_id,
1511				authorities: set_ref.to_vec(),
1512			})
1513		} else {
1514			None
1515		};
1516
1517		if status.changed {
1518			let write_result = crate::aux_schema::update_authority_set::<Block, _, _>(
1519				&authority_set,
1520				new_authorities.as_ref(),
1521				|insert| apply_aux(import_op, insert, &[]),
1522			);
1523
1524			if let Err(e) = write_result {
1525				warn!(
1526					target: LOG_TARGET,
1527					"Failed to write updated authority set to disk. Bailing."
1528				);
1529				warn!(target: LOG_TARGET, "Node is in a potentially inconsistent state.");
1530
1531				return Err(e.into())
1532			}
1533		}
1534
1535		Ok(new_authorities.map(VoterCommand::ChangeAuthorities))
1536	});
1537
1538	match update_res {
1539		Ok(Some(command)) => Err(CommandOrError::VoterCommand(command)),
1540		Ok(None) => Ok(()),
1541		Err(e) => {
1542			*authority_set = old_authority_set;
1543
1544			Err(CommandOrError::Error(e))
1545		},
1546	}
1547}