referrerpolicy=no-referrer-when-downgrade

sc_consensus_grandpa/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Integration of the GRANDPA finality gadget into substrate.
20//!
21//! This crate is unstable and the API and usage may change.
22//!
23//! This crate provides a long-running future that produces finality notifications.
24//!
25//! # Usage
26//!
27//! First, create a block-import wrapper with the `block_import` function. The
28//! GRANDPA worker needs to be linked together with this block import object, so
29//! a `LinkHalf` is returned as well. All blocks imported (from network or
30//! consensus or otherwise) must pass through this wrapper, otherwise consensus
31//! is likely to break in unexpected ways.
32//!
33//! Next, use the `LinkHalf` and a local configuration to `run_grandpa_voter`.
34//! This requires a `Network` implementation. The returned future should be
35//! driven to completion and will finalize blocks in the background.
36//!
37//! # Changing authority sets
38//!
39//! The rough idea behind changing authority sets in GRANDPA is that at some point,
40//! we obtain agreement for some maximum block height that the current set can
41//! finalize, and once a block with that height is finalized the next set will
42//! pick up finalization from there.
43//!
44//! Technically speaking, this would be implemented as a voting rule which says,
45//! "if there is a signal for a change in N blocks in block B, only vote on
46//! chains with length NUM(B) + N if they contain B". This conditional-inclusion
47//! logic is complex to compute because it requires looking arbitrarily far
48//! back in the chain.
49//!
50//! Instead, we keep track of a list of all signals we've seen so far (across
51//! all forks), sorted ascending by the block number they would be applied at.
52//! We never vote on chains with number higher than the earliest handoff block
53//! number (this is num(signal) + N). When finalizing a block, we either apply
54//! or prune any signaled changes based on whether the signaling block is
55//! included in the newly-finalized chain.
56
57#![warn(missing_docs)]
58
59use codec::Decode;
60use futures::{prelude::*, StreamExt};
61use log::{debug, error, info};
62use parking_lot::RwLock;
63use prometheus_endpoint::{PrometheusError, Registry};
64use sc_client_api::{
65	backend::{AuxStore, Backend},
66	utils::is_descendent_of,
67	BlockchainEvents, CallExecutor, ExecutorProvider, Finalizer, LockImportRun, StorageProvider,
68};
69use sc_consensus::BlockImport;
70use sc_network::{types::ProtocolName, NetworkBackend, NotificationService};
71use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
72use sc_transaction_pool_api::OffchainTransactionPoolFactory;
73use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
74use sp_api::ProvideRuntimeApi;
75use sp_application_crypto::AppCrypto;
76use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata, Result as ClientResult};
77use sp_consensus::SelectChain;
78use sp_consensus_grandpa::{
79	AuthorityList, AuthoritySignature, SetId, CLIENT_LOG_TARGET as LOG_TARGET,
80};
81use sp_core::{crypto::ByteArray, traits::CallContext};
82use sp_keystore::KeystorePtr;
83use sp_runtime::{
84	generic::BlockId,
85	traits::{Block as BlockT, NumberFor, Zero},
86};
87
88pub use finality_grandpa::BlockNumberOps;
89use finality_grandpa::{voter, voter_set::VoterSet, Error as GrandpaError};
90
91use std::{
92	fmt, io,
93	pin::Pin,
94	sync::Arc,
95	task::{Context, Poll},
96	time::Duration,
97};
98
99// utility logging macro that takes as first argument a conditional to
100// decide whether to log under debug or info level (useful to restrict
101// logging under initial sync).
102macro_rules! grandpa_log {
103	($condition:expr, $($msg: expr),+ $(,)?) => {
104		{
105			let log_level = if $condition {
106				log::Level::Debug
107			} else {
108				log::Level::Info
109			};
110
111			log::log!(target: LOG_TARGET, log_level, $($msg),+);
112		}
113	};
114}
115
116mod authorities;
117mod aux_schema;
118mod communication;
119mod environment;
120mod finality_proof;
121mod import;
122mod justification;
123mod notification;
124mod observer;
125mod until_imported;
126mod voting_rule;
127pub mod warp_proof;
128
129pub use authorities::{AuthoritySet, AuthoritySetChanges, SharedAuthoritySet};
130pub use aux_schema::best_justification;
131pub use communication::grandpa_protocol_name::standard_name as protocol_standard_name;
132pub use finality_grandpa::voter::report;
133pub use finality_proof::{FinalityProof, FinalityProofError, FinalityProofProvider};
134pub use import::{find_forced_change, find_scheduled_change, GrandpaBlockImport};
135pub use justification::GrandpaJustification;
136pub use notification::{GrandpaJustificationSender, GrandpaJustificationStream};
137pub use observer::run_grandpa_observer;
138pub use voting_rule::{
139	BeforeBestBlockBy, ThreeQuartersOfTheUnfinalizedChain, VotingRule, VotingRuleResult,
140	VotingRulesBuilder,
141};
142
143use aux_schema::PersistentData;
144use communication::{Network as NetworkT, NetworkBridge, Syncing as SyncingT};
145use environment::{Environment, VoterSetState};
146use until_imported::UntilGlobalMessageBlocksImported;
147
148// Re-export these two because it's just so damn convenient.
149pub use sp_consensus_grandpa::{
150	AuthorityId, AuthorityPair, CatchUp, Commit, CompactCommit, GrandpaApi, Message, Precommit,
151	Prevote, PrimaryPropose, ScheduledChange, SignedMessage, GRANDPA_ENGINE_ID,
152};
153use std::marker::PhantomData;
154
155/// Filter that preserves blocks with GRANDPA justifications during pruning.
156///
157/// Use this filter with `DatabaseSettings::pruning_filters` to ensure that blocks
158/// required for warp sync are not pruned. GRANDPA justifications at authority set change
159/// boundaries are needed to construct warp sync proofs.
160#[derive(Debug, Clone)]
161pub struct GrandpaPruningFilter;
162
163impl sc_client_db::PruningFilter for GrandpaPruningFilter {
164	fn should_retain(&self, justifications: &sp_runtime::Justifications) -> bool {
165		justifications.get(GRANDPA_ENGINE_ID).is_some()
166	}
167}
168
169#[cfg(test)]
170mod tests;
171
172/// A global communication input stream for commits and catch up messages. Not
173/// exposed publicly, used internally to simplify types in the communication
174/// layer.
175type CommunicationIn<Block> = voter::CommunicationIn<
176	<Block as BlockT>::Hash,
177	NumberFor<Block>,
178	AuthoritySignature,
179	AuthorityId,
180>;
181/// Global communication input stream for commits and catch up messages, with
182/// the hash type not being derived from the block, useful for forcing the hash
183/// to some type (e.g. `H256`) when the compiler can't do the inference.
184type CommunicationInH<Block, H> =
185	voter::CommunicationIn<H, NumberFor<Block>, AuthoritySignature, AuthorityId>;
186
187/// Global communication sink for commits with the hash type not being derived
188/// from the block, useful for forcing the hash to some type (e.g. `H256`) when
189/// the compiler can't do the inference.
190type CommunicationOutH<Block, H> =
191	voter::CommunicationOut<H, NumberFor<Block>, AuthoritySignature, AuthorityId>;
192
193/// Shared voter state for querying.
194pub struct SharedVoterState {
195	inner: Arc<RwLock<Option<Box<dyn voter::VoterState<AuthorityId> + Sync + Send>>>>,
196}
197
198impl SharedVoterState {
199	/// Create a new empty `SharedVoterState` instance.
200	pub fn empty() -> Self {
201		Self { inner: Arc::new(RwLock::new(None)) }
202	}
203
204	fn reset(
205		&self,
206		voter_state: Box<dyn voter::VoterState<AuthorityId> + Sync + Send>,
207	) -> Option<()> {
208		let mut shared_voter_state = self.inner.try_write_for(Duration::from_secs(1))?;
209
210		*shared_voter_state = Some(voter_state);
211		Some(())
212	}
213
214	/// Get the inner `VoterState` instance.
215	pub fn voter_state(&self) -> Option<report::VoterState<AuthorityId>> {
216		self.inner.read().as_ref().map(|vs| vs.get())
217	}
218}
219
220impl Clone for SharedVoterState {
221	fn clone(&self) -> Self {
222		SharedVoterState { inner: self.inner.clone() }
223	}
224}
225
226/// Configuration for the GRANDPA service
227#[derive(Clone)]
228pub struct Config {
229	/// The expected duration for a message to be gossiped across the network.
230	pub gossip_duration: Duration,
231	/// Justification generation period (in blocks). GRANDPA will try to generate
232	/// justifications at least every justification_generation_period blocks. There
233	/// are some other events which might cause justification generation.
234	pub justification_generation_period: u32,
235	/// Whether the GRANDPA observer protocol is live on the network and thereby
236	/// a full-node not running as a validator is running the GRANDPA observer
237	/// protocol (we will only issue catch-up requests to authorities when the
238	/// observer protocol is enabled).
239	pub observer_enabled: bool,
240	/// The role of the local node (i.e. authority, full-node or light).
241	pub local_role: sc_network::config::Role,
242	/// Some local identifier of the voter.
243	pub name: Option<String>,
244	/// The keystore that manages the keys of this node.
245	pub keystore: Option<KeystorePtr>,
246	/// TelemetryHandle instance.
247	pub telemetry: Option<TelemetryHandle>,
248	/// Chain specific GRANDPA protocol name. See [`crate::protocol_standard_name`].
249	pub protocol_name: ProtocolName,
250}
251
252impl Config {
253	fn name(&self) -> &str {
254		self.name.as_deref().unwrap_or("<unknown>")
255	}
256}
257
258/// Errors that can occur while voting in GRANDPA.
259#[derive(Debug, thiserror::Error)]
260pub enum Error {
261	/// An error within grandpa.
262	#[error("grandpa error: {0}")]
263	Grandpa(#[from] GrandpaError),
264
265	/// A network error.
266	#[error("network error: {0}")]
267	Network(String),
268
269	/// A blockchain error.
270	#[error("blockchain error: {0}")]
271	Blockchain(String),
272
273	/// Could not complete a round on disk.
274	#[error("could not complete a round on disk: {0}")]
275	Client(#[from] ClientError),
276
277	/// Could not sign outgoing message
278	#[error("could not sign outgoing message: {0}")]
279	Signing(String),
280
281	/// An invariant has been violated (e.g. not finalizing pending change blocks in-order)
282	#[error("safety invariant has been violated: {0}")]
283	Safety(String),
284
285	/// A timer failed to fire.
286	#[error("a timer failed to fire: {0}")]
287	Timer(io::Error),
288
289	/// A runtime api request failed.
290	#[error("runtime API request failed: {0}")]
291	RuntimeApi(sp_api::ApiError),
292}
293
294/// Something which can determine if a block is known.
295pub(crate) trait BlockStatus<Block: BlockT> {
296	/// Return `Ok(Some(number))` or `Ok(None)` depending on whether the block
297	/// is definitely known and has been imported.
298	/// If an unexpected error occurs, return that.
299	fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error>;
300}
301
302impl<Block: BlockT, Client> BlockStatus<Block> for Arc<Client>
303where
304	Client: HeaderBackend<Block>,
305	NumberFor<Block>: BlockNumberOps,
306{
307	fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error> {
308		self.block_number_from_id(&BlockId::Hash(hash))
309			.map_err(|e| Error::Blockchain(e.to_string()))
310	}
311}
312
313/// A trait that includes all the client functionalities grandpa requires.
314/// Ideally this would be a trait alias, we're not there yet.
315/// tracking issue <https://github.com/rust-lang/rust/issues/41517>
316pub trait ClientForGrandpa<Block, BE>:
317	LockImportRun<Block, BE>
318	+ Finalizer<Block, BE>
319	+ AuxStore
320	+ HeaderMetadata<Block, Error = sp_blockchain::Error>
321	+ HeaderBackend<Block>
322	+ BlockchainEvents<Block>
323	+ ProvideRuntimeApi<Block>
324	+ ExecutorProvider<Block>
325	+ BlockImport<Block, Error = sp_consensus::Error>
326	+ StorageProvider<Block, BE>
327where
328	BE: Backend<Block>,
329	Block: BlockT,
330{
331}
332
333impl<Block, BE, T> ClientForGrandpa<Block, BE> for T
334where
335	BE: Backend<Block>,
336	Block: BlockT,
337	T: LockImportRun<Block, BE>
338		+ Finalizer<Block, BE>
339		+ AuxStore
340		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
341		+ HeaderBackend<Block>
342		+ BlockchainEvents<Block>
343		+ ProvideRuntimeApi<Block>
344		+ ExecutorProvider<Block>
345		+ BlockImport<Block, Error = sp_consensus::Error>
346		+ StorageProvider<Block, BE>,
347{
348}
349
350/// Something that one can ask to do a block sync request.
351pub(crate) trait BlockSyncRequester<Block: BlockT> {
352	/// Notifies the sync service to try and sync the given block from the given
353	/// peers.
354	///
355	/// If the given vector of peers is empty then the underlying implementation
356	/// should make a best effort to fetch the block from any peers it is
357	/// connected to (NOTE: this assumption will change in the future #3629).
358	fn set_sync_fork_request(
359		&self,
360		peers: Vec<sc_network_types::PeerId>,
361		hash: Block::Hash,
362		number: NumberFor<Block>,
363	);
364}
365
366impl<Block, Network, Syncing> BlockSyncRequester<Block> for NetworkBridge<Block, Network, Syncing>
367where
368	Block: BlockT,
369	Network: NetworkT<Block>,
370	Syncing: SyncingT<Block>,
371{
372	fn set_sync_fork_request(
373		&self,
374		peers: Vec<sc_network_types::PeerId>,
375		hash: Block::Hash,
376		number: NumberFor<Block>,
377	) {
378		NetworkBridge::set_sync_fork_request(self, peers, hash, number)
379	}
380}
381
382/// A new authority set along with the canonical block it changed at.
383#[derive(Debug)]
384pub(crate) struct NewAuthoritySet<H, N> {
385	pub(crate) canon_number: N,
386	pub(crate) canon_hash: H,
387	pub(crate) set_id: SetId,
388	pub(crate) authorities: AuthorityList,
389}
390
391/// Commands issued to the voter.
392#[derive(Debug)]
393pub(crate) enum VoterCommand<H, N> {
394	/// Pause the voter for given reason.
395	Pause(String),
396	/// New authorities.
397	ChangeAuthorities(NewAuthoritySet<H, N>),
398}
399
400impl<H, N> fmt::Display for VoterCommand<H, N> {
401	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
402		match *self {
403			VoterCommand::Pause(ref reason) => write!(f, "Pausing voter: {}", reason),
404			VoterCommand::ChangeAuthorities(_) => write!(f, "Changing authorities"),
405		}
406	}
407}
408
409/// Signals either an early exit of a voter or an error.
410#[derive(Debug)]
411pub(crate) enum CommandOrError<H, N> {
412	/// An error occurred.
413	Error(Error),
414	/// A command to the voter.
415	VoterCommand(VoterCommand<H, N>),
416}
417
418impl<H, N> From<Error> for CommandOrError<H, N> {
419	fn from(e: Error) -> Self {
420		CommandOrError::Error(e)
421	}
422}
423
424impl<H, N> From<ClientError> for CommandOrError<H, N> {
425	fn from(e: ClientError) -> Self {
426		CommandOrError::Error(Error::Client(e))
427	}
428}
429
430impl<H, N> From<finality_grandpa::Error> for CommandOrError<H, N> {
431	fn from(e: finality_grandpa::Error) -> Self {
432		CommandOrError::Error(Error::from(e))
433	}
434}
435
436impl<H, N> From<VoterCommand<H, N>> for CommandOrError<H, N> {
437	fn from(e: VoterCommand<H, N>) -> Self {
438		CommandOrError::VoterCommand(e)
439	}
440}
441
442impl<H: fmt::Debug, N: fmt::Debug> ::std::error::Error for CommandOrError<H, N> {}
443
444impl<H, N> fmt::Display for CommandOrError<H, N> {
445	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
446		match *self {
447			CommandOrError::Error(ref e) => write!(f, "{}", e),
448			CommandOrError::VoterCommand(ref cmd) => write!(f, "{}", cmd),
449		}
450	}
451}
452
453/// Link between the block importer and the background voter.
454pub struct LinkHalf<Block: BlockT, C, SC> {
455	client: Arc<C>,
456	select_chain: SC,
457	persistent_data: PersistentData<Block>,
458	voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
459	justification_sender: GrandpaJustificationSender<Block>,
460	justification_stream: GrandpaJustificationStream<Block>,
461	telemetry: Option<TelemetryHandle>,
462}
463
464impl<Block: BlockT, C, SC> LinkHalf<Block, C, SC> {
465	/// Get the shared authority set.
466	pub fn shared_authority_set(&self) -> &SharedAuthoritySet<Block::Hash, NumberFor<Block>> {
467		&self.persistent_data.authority_set
468	}
469
470	/// Get the receiving end of justification notifications.
471	pub fn justification_stream(&self) -> GrandpaJustificationStream<Block> {
472		self.justification_stream.clone()
473	}
474}
475
476/// Provider for the Grandpa authority set configured on the genesis block.
477pub trait GenesisAuthoritySetProvider<Block: BlockT> {
478	/// Get the authority set at the genesis block.
479	fn get(&self) -> Result<AuthorityList, ClientError>;
480}
481
482impl<Block: BlockT, E, Client> GenesisAuthoritySetProvider<Block> for Arc<Client>
483where
484	E: CallExecutor<Block>,
485	Client: ExecutorProvider<Block, Executor = E> + HeaderBackend<Block>,
486{
487	fn get(&self) -> Result<AuthorityList, ClientError> {
488		self.executor()
489			.call(
490				self.expect_block_hash_from_id(&BlockId::Number(Zero::zero()))?,
491				"GrandpaApi_grandpa_authorities",
492				&[],
493				CallContext::Offchain,
494			)
495			.and_then(|call_result| {
496				Decode::decode(&mut &call_result[..]).map_err(|err| {
497					ClientError::CallResultDecode(
498						"failed to decode GRANDPA authorities set proof",
499						err,
500					)
501				})
502			})
503	}
504}
505
506/// Make block importer and link half necessary to tie the background voter
507/// to it.
508///
509/// The `justification_import_period` sets the minimum period on which
510/// justifications will be imported.  When importing a block, if it includes a
511/// justification it will only be processed if it fits within this period,
512/// otherwise it will be ignored (and won't be validated). This is to avoid
513/// slowing down sync by a peer serving us unnecessary justifications which
514/// aren't trivial to validate.
515pub fn block_import<BE, Block: BlockT, Client, SC>(
516	client: Arc<Client>,
517	justification_import_period: u32,
518	genesis_authorities_provider: &dyn GenesisAuthoritySetProvider<Block>,
519	select_chain: SC,
520	telemetry: Option<TelemetryHandle>,
521) -> Result<(GrandpaBlockImport<BE, Block, Client, SC>, LinkHalf<Block, Client, SC>), ClientError>
522where
523	SC: SelectChain<Block>,
524	BE: Backend<Block> + 'static,
525	Client: ClientForGrandpa<Block, BE> + 'static,
526{
527	block_import_with_authority_set_hard_forks(
528		client,
529		justification_import_period,
530		genesis_authorities_provider,
531		select_chain,
532		Default::default(),
533		telemetry,
534	)
535}
536
537/// A descriptor for an authority set hard fork. These are authority set changes
538/// that are not signalled by the runtime and instead are defined off-chain
539/// (hence the hard fork).
540pub struct AuthoritySetHardFork<Block: BlockT> {
541	/// The new authority set id.
542	pub set_id: SetId,
543	/// The block hash and number at which the hard fork should be applied.
544	pub block: (Block::Hash, NumberFor<Block>),
545	/// The authorities in the new set.
546	pub authorities: AuthorityList,
547	/// The latest block number that was finalized before this authority set
548	/// hard fork. When defined, the authority set change will be forced, i.e.
549	/// the node won't wait for the block above to be finalized before enacting
550	/// the change, and the given finalized number will be used as a base for
551	/// voting.
552	pub last_finalized: Option<NumberFor<Block>>,
553}
554
555/// Make block importer and link half necessary to tie the background voter to
556/// it. A vector of authority set hard forks can be passed, any authority set
557/// change signaled at the given block (either already signalled or in a further
558/// block when importing it) will be replaced by a standard change with the
559/// given static authorities.
560pub fn block_import_with_authority_set_hard_forks<BE, Block: BlockT, Client, SC>(
561	client: Arc<Client>,
562	justification_import_period: u32,
563	genesis_authorities_provider: &dyn GenesisAuthoritySetProvider<Block>,
564	select_chain: SC,
565	authority_set_hard_forks: Vec<AuthoritySetHardFork<Block>>,
566	telemetry: Option<TelemetryHandle>,
567) -> Result<(GrandpaBlockImport<BE, Block, Client, SC>, LinkHalf<Block, Client, SC>), ClientError>
568where
569	SC: SelectChain<Block>,
570	BE: Backend<Block> + 'static,
571	Client: ClientForGrandpa<Block, BE> + 'static,
572{
573	let chain_info = client.info();
574	let genesis_hash = chain_info.genesis_hash;
575
576	let persistent_data =
577		aux_schema::load_persistent(&*client, genesis_hash, <NumberFor<Block>>::zero(), {
578			let telemetry = telemetry.clone();
579			move || {
580				let authorities = genesis_authorities_provider.get()?;
581				telemetry!(
582					telemetry;
583					CONSENSUS_DEBUG;
584					"afg.loading_authorities";
585					"authorities_len" => ?authorities.len()
586				);
587				Ok(authorities)
588			}
589		})?;
590
591	let (voter_commands_tx, voter_commands_rx) =
592		tracing_unbounded("mpsc_grandpa_voter_command", 100_000);
593
594	let (justification_sender, justification_stream) = GrandpaJustificationStream::channel();
595
596	// create pending change objects with 0 delay for each authority set hard fork.
597	let authority_set_hard_forks = authority_set_hard_forks
598		.into_iter()
599		.map(|fork| {
600			let delay_kind = if let Some(last_finalized) = fork.last_finalized {
601				authorities::DelayKind::Best { median_last_finalized: last_finalized }
602			} else {
603				authorities::DelayKind::Finalized
604			};
605
606			(
607				fork.set_id,
608				authorities::PendingChange {
609					next_authorities: fork.authorities,
610					delay: Zero::zero(),
611					canon_hash: fork.block.0,
612					canon_height: fork.block.1,
613					delay_kind,
614				},
615			)
616		})
617		.collect();
618
619	Ok((
620		GrandpaBlockImport::new(
621			client.clone(),
622			justification_import_period,
623			select_chain.clone(),
624			persistent_data.authority_set.clone(),
625			voter_commands_tx,
626			authority_set_hard_forks,
627			justification_sender.clone(),
628			telemetry.clone(),
629		),
630		LinkHalf {
631			client,
632			select_chain,
633			persistent_data,
634			voter_commands_rx,
635			justification_sender,
636			justification_stream,
637			telemetry,
638		},
639	))
640}
641
642fn global_communication<BE, Block: BlockT, C, N, S>(
643	set_id: SetId,
644	voters: &Arc<VoterSet<AuthorityId>>,
645	client: Arc<C>,
646	network: &NetworkBridge<Block, N, S>,
647	keystore: Option<&KeystorePtr>,
648	metrics: Option<until_imported::Metrics>,
649) -> (
650	impl Stream<
651		Item = Result<
652			CommunicationInH<Block, Block::Hash>,
653			CommandOrError<Block::Hash, NumberFor<Block>>,
654		>,
655	>,
656	impl Sink<
657		CommunicationOutH<Block, Block::Hash>,
658		Error = CommandOrError<Block::Hash, NumberFor<Block>>,
659	>,
660)
661where
662	BE: Backend<Block> + 'static,
663	C: ClientForGrandpa<Block, BE> + 'static,
664	N: NetworkT<Block>,
665	S: SyncingT<Block>,
666	NumberFor<Block>: BlockNumberOps,
667{
668	let is_voter = local_authority_id(voters, keystore).is_some();
669
670	// verification stream
671	let (global_in, global_out) =
672		network.global_communication(communication::SetId(set_id), voters.clone(), is_voter);
673
674	// block commit and catch up messages until relevant blocks are imported.
675	let global_in = UntilGlobalMessageBlocksImported::new(
676		client.import_notification_stream(),
677		network.clone(),
678		client.clone(),
679		global_in,
680		"global",
681		metrics,
682	);
683
684	let global_in = global_in.map_err(CommandOrError::from);
685	let global_out = global_out.sink_map_err(CommandOrError::from);
686
687	(global_in, global_out)
688}
689
690/// Parameters used to run Grandpa.
691pub struct GrandpaParams<Block: BlockT, C, N, S, SC, VR> {
692	/// Configuration for the GRANDPA service.
693	pub config: Config,
694	/// A link to the block import worker.
695	pub link: LinkHalf<Block, C, SC>,
696	/// The Network instance.
697	///
698	/// It is assumed that this network will feed us Grandpa notifications. When using the
699	/// `sc_network` crate, it is assumed that the Grandpa notifications protocol has been passed
700	/// to the configuration of the networking. See [`grandpa_peers_set_config`].
701	pub network: N,
702	/// Event stream for syncing-related events.
703	pub sync: S,
704	/// Handle for interacting with `Notifications`.
705	pub notification_service: Box<dyn NotificationService>,
706	/// A voting rule used to potentially restrict target votes.
707	pub voting_rule: VR,
708	/// The prometheus metrics registry.
709	pub prometheus_registry: Option<prometheus_endpoint::Registry>,
710	/// The voter state is exposed at an RPC endpoint.
711	pub shared_voter_state: SharedVoterState,
712	/// TelemetryHandle instance.
713	pub telemetry: Option<TelemetryHandle>,
714	/// Offchain transaction pool factory.
715	///
716	/// This will be used to create an offchain transaction pool instance for sending an
717	/// equivocation report from the runtime.
718	pub offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
719}
720
721/// Returns the configuration value to put in
722/// [`sc_network::config::FullNetworkConfiguration`].
723/// For standard protocol name see [`crate::protocol_standard_name`].
724pub fn grandpa_peers_set_config<B: BlockT, N: NetworkBackend<B, <B as BlockT>::Hash>>(
725	protocol_name: ProtocolName,
726	metrics: sc_network::service::NotificationMetrics,
727	peer_store_handle: Arc<dyn sc_network::peer_store::PeerStoreProvider>,
728) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
729	use communication::grandpa_protocol_name;
730	N::notification_config(
731		protocol_name,
732		grandpa_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect(),
733		// Notifications reach ~256kiB in size at the time of writing on Kusama and Polkadot.
734		1024 * 1024,
735		None,
736		sc_network::config::SetConfig {
737			in_peers: 0,
738			out_peers: 0,
739			reserved_nodes: Vec::new(),
740			non_reserved_mode: sc_network::config::NonReservedPeerMode::Deny,
741		},
742		metrics,
743		peer_store_handle,
744	)
745}
746
747/// Run a GRANDPA voter as a task. Provide configuration and a link to a
748/// block import worker that has already been instantiated with `block_import`.
749pub fn run_grandpa_voter<Block: BlockT, BE: 'static, C, N, S, SC, VR>(
750	grandpa_params: GrandpaParams<Block, C, N, S, SC, VR>,
751) -> sp_blockchain::Result<impl Future<Output = ()> + Send>
752where
753	BE: Backend<Block> + 'static,
754	N: NetworkT<Block> + Sync + 'static,
755	S: SyncingT<Block> + Sync + 'static,
756	SC: SelectChain<Block> + 'static,
757	VR: VotingRule<Block, C> + Clone + 'static,
758	NumberFor<Block>: BlockNumberOps,
759	C: ClientForGrandpa<Block, BE> + 'static,
760	C::Api: GrandpaApi<Block>,
761{
762	let GrandpaParams {
763		mut config,
764		link,
765		network,
766		sync,
767		notification_service,
768		voting_rule,
769		prometheus_registry,
770		shared_voter_state,
771		telemetry,
772		offchain_tx_pool_factory,
773	} = grandpa_params;
774
775	// NOTE: we have recently removed `run_grandpa_observer` from the public
776	// API, I felt it is easier to just ignore this field rather than removing
777	// it from the config temporarily. This should be removed after #5013 is
778	// fixed and we re-add the observer to the public API.
779	config.observer_enabled = false;
780
781	let LinkHalf {
782		client,
783		select_chain,
784		persistent_data,
785		voter_commands_rx,
786		justification_sender,
787		justification_stream: _,
788		telemetry: _,
789	} = link;
790
791	let network = NetworkBridge::new(
792		network,
793		sync,
794		notification_service,
795		config.clone(),
796		persistent_data.set_state.clone(),
797		prometheus_registry.as_ref(),
798		telemetry.clone(),
799	);
800
801	let conf = config.clone();
802	let telemetry_task =
803		if let Some(telemetry_on_connect) = telemetry.as_ref().map(|x| x.on_connect_stream()) {
804			let authorities = persistent_data.authority_set.clone();
805			let telemetry = telemetry.clone();
806			let events = telemetry_on_connect.for_each(move |_| {
807				let current_authorities = authorities.current_authorities();
808				let set_id = authorities.set_id();
809				let maybe_authority_id =
810					local_authority_id(&current_authorities, conf.keystore.as_ref());
811
812				let authorities =
813					current_authorities.iter().map(|(id, _)| id.to_string()).collect::<Vec<_>>();
814
815				let authorities = serde_json::to_string(&authorities).expect(
816					"authorities is always at least an empty vector; \
817					 elements are always of type string",
818				);
819
820				telemetry!(
821					telemetry;
822					CONSENSUS_INFO;
823					"afg.authority_set";
824					"authority_id" => maybe_authority_id.map_or("".into(), |s| s.to_string()),
825					"authority_set_id" => ?set_id,
826					"authorities" => authorities,
827				);
828
829				future::ready(())
830			});
831			future::Either::Left(events)
832		} else {
833			future::Either::Right(future::pending())
834		};
835
836	let voter_work = VoterWork::new(
837		client,
838		config,
839		network,
840		select_chain,
841		voting_rule,
842		persistent_data,
843		voter_commands_rx,
844		prometheus_registry,
845		shared_voter_state,
846		justification_sender,
847		telemetry,
848		offchain_tx_pool_factory,
849	);
850
851	let voter_work = voter_work.map(|res| match res {
852		Ok(()) => error!(
853			target: LOG_TARGET,
854			"GRANDPA voter future has concluded naturally, this should be unreachable."
855		),
856		Err(e) => error!(target: LOG_TARGET, "GRANDPA voter error: {}", e),
857	});
858
859	// Make sure that `telemetry_task` doesn't accidentally finish and kill grandpa.
860	let telemetry_task = telemetry_task.then(|_| future::pending::<()>());
861
862	Ok(future::select(voter_work, telemetry_task).map(drop))
863}
864
865struct Metrics {
866	environment: environment::Metrics,
867	until_imported: until_imported::Metrics,
868}
869
870impl Metrics {
871	fn register(registry: &Registry) -> Result<Self, PrometheusError> {
872		Ok(Metrics {
873			environment: environment::Metrics::register(registry)?,
874			until_imported: until_imported::Metrics::register(registry)?,
875		})
876	}
877}
878
879/// Future that powers the voter.
880#[must_use]
881struct VoterWork<B, Block: BlockT, C, N: NetworkT<Block>, S: SyncingT<Block>, SC, VR> {
882	voter: Pin<
883		Box<dyn Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> + Send>,
884	>,
885	shared_voter_state: SharedVoterState,
886	env: Arc<Environment<B, Block, C, N, S, SC, VR>>,
887	voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
888	network: NetworkBridge<Block, N, S>,
889	telemetry: Option<TelemetryHandle>,
890	/// Prometheus metrics.
891	metrics: Option<Metrics>,
892}
893
894impl<B, Block, C, N, S, SC, VR> VoterWork<B, Block, C, N, S, SC, VR>
895where
896	Block: BlockT,
897	B: Backend<Block> + 'static,
898	C: ClientForGrandpa<Block, B> + 'static,
899	C::Api: GrandpaApi<Block>,
900	N: NetworkT<Block> + Sync,
901	S: SyncingT<Block> + Sync,
902	NumberFor<Block>: BlockNumberOps,
903	SC: SelectChain<Block> + 'static,
904	VR: VotingRule<Block, C> + Clone + 'static,
905{
906	fn new(
907		client: Arc<C>,
908		config: Config,
909		network: NetworkBridge<Block, N, S>,
910		select_chain: SC,
911		voting_rule: VR,
912		persistent_data: PersistentData<Block>,
913		voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
914		prometheus_registry: Option<prometheus_endpoint::Registry>,
915		shared_voter_state: SharedVoterState,
916		justification_sender: GrandpaJustificationSender<Block>,
917		telemetry: Option<TelemetryHandle>,
918		offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
919	) -> Self {
920		let metrics = match prometheus_registry.as_ref().map(Metrics::register) {
921			Some(Ok(metrics)) => Some(metrics),
922			Some(Err(e)) => {
923				debug!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
924				None
925			},
926			None => None,
927		};
928
929		let voters = persistent_data.authority_set.current_authorities();
930		let env = Arc::new(Environment {
931			client,
932			select_chain,
933			voting_rule,
934			voters: Arc::new(voters),
935			config,
936			network: network.clone(),
937			set_id: persistent_data.authority_set.set_id(),
938			authority_set: persistent_data.authority_set.clone(),
939			voter_set_state: persistent_data.set_state,
940			metrics: metrics.as_ref().map(|m| m.environment.clone()),
941			justification_sender: Some(justification_sender),
942			telemetry: telemetry.clone(),
943			offchain_tx_pool_factory,
944			_phantom: PhantomData,
945		});
946
947		let mut work = VoterWork {
948			// `voter` is set to a temporary value and replaced below when
949			// calling `rebuild_voter`.
950			voter: Box::pin(future::pending()),
951			shared_voter_state,
952			env,
953			voter_commands_rx,
954			network,
955			telemetry,
956			metrics,
957		};
958		work.rebuild_voter();
959		work
960	}
961
962	/// Rebuilds the `self.voter` field using the current authority set
963	/// state. This method should be called when we know that the authority set
964	/// has changed (e.g. as signalled by a voter command).
965	fn rebuild_voter(&mut self) {
966		debug!(
967			target: LOG_TARGET,
968			"{}: Starting new voter with set ID {}",
969			self.env.config.name(),
970			self.env.set_id
971		);
972
973		let maybe_authority_id =
974			local_authority_id(&self.env.voters, self.env.config.keystore.as_ref());
975		let authority_id = maybe_authority_id.map_or("<unknown>".into(), |s| s.to_string());
976
977		telemetry!(
978			self.telemetry;
979			CONSENSUS_DEBUG;
980			"afg.starting_new_voter";
981			"name" => ?self.env.config.name(),
982			"set_id" => ?self.env.set_id,
983			"authority_id" => authority_id,
984		);
985
986		let chain_info = self.env.client.info();
987
988		let authorities = self.env.voters.iter().map(|(id, _)| id.to_string()).collect::<Vec<_>>();
989
990		let authorities = serde_json::to_string(&authorities).expect(
991			"authorities is always at least an empty vector; elements are always of type string; qed.",
992		);
993
994		telemetry!(
995			self.telemetry;
996			CONSENSUS_INFO;
997			"afg.authority_set";
998			"number" => ?chain_info.finalized_number,
999			"hash" => ?chain_info.finalized_hash,
1000			"authority_id" => authority_id,
1001			"authority_set_id" => ?self.env.set_id,
1002			"authorities" => authorities,
1003		);
1004
1005		match &*self.env.voter_set_state.read() {
1006			VoterSetState::Live { completed_rounds, .. } => {
1007				let last_finalized = (chain_info.finalized_hash, chain_info.finalized_number);
1008
1009				let global_comms = global_communication(
1010					self.env.set_id,
1011					&self.env.voters,
1012					self.env.client.clone(),
1013					&self.env.network,
1014					self.env.config.keystore.as_ref(),
1015					self.metrics.as_ref().map(|m| m.until_imported.clone()),
1016				);
1017
1018				let last_completed_round = completed_rounds.last();
1019
1020				let voter = voter::Voter::new(
1021					self.env.clone(),
1022					(*self.env.voters).clone(),
1023					global_comms,
1024					last_completed_round.number,
1025					last_completed_round.votes.clone(),
1026					last_completed_round.base,
1027					last_finalized,
1028				);
1029
1030				// Repoint shared_voter_state so that the RPC endpoint can query the state
1031				if self.shared_voter_state.reset(voter.voter_state()).is_none() {
1032					info!(
1033						target: LOG_TARGET,
1034						"Timed out trying to update shared GRANDPA voter state. \
1035						RPC endpoints may return stale data."
1036					);
1037				}
1038
1039				self.voter = Box::pin(voter);
1040			},
1041			VoterSetState::Paused { .. } => self.voter = Box::pin(future::pending()),
1042		};
1043	}
1044
1045	fn handle_voter_command(
1046		&mut self,
1047		command: VoterCommand<Block::Hash, NumberFor<Block>>,
1048	) -> Result<(), Error> {
1049		match command {
1050			VoterCommand::ChangeAuthorities(new) => {
1051				let voters: Vec<String> =
1052					new.authorities.iter().map(move |(a, _)| format!("{}", a)).collect();
1053				telemetry!(
1054					self.telemetry;
1055					CONSENSUS_INFO;
1056					"afg.voter_command_change_authorities";
1057					"number" => ?new.canon_number,
1058					"hash" => ?new.canon_hash,
1059					"voters" => ?voters,
1060					"set_id" => ?new.set_id,
1061				);
1062
1063				self.env.update_voter_set_state(|_| {
1064					// start the new authority set using the block where the
1065					// set changed (not where the signal happened!) as the base.
1066					let set_state = VoterSetState::live(
1067						new.set_id,
1068						&*self.env.authority_set.inner(),
1069						(new.canon_hash, new.canon_number),
1070					);
1071
1072					aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
1073					Ok(Some(set_state))
1074				})?;
1075
1076				let voters = Arc::new(VoterSet::new(new.authorities.into_iter()).expect(
1077					"new authorities come from pending change; pending change comes from \
1078					 `AuthoritySet`; `AuthoritySet` validates authorities is non-empty and \
1079					 weights are non-zero; qed.",
1080				));
1081
1082				self.env = Arc::new(Environment {
1083					voters,
1084					set_id: new.set_id,
1085					voter_set_state: self.env.voter_set_state.clone(),
1086					client: self.env.client.clone(),
1087					select_chain: self.env.select_chain.clone(),
1088					config: self.env.config.clone(),
1089					authority_set: self.env.authority_set.clone(),
1090					network: self.env.network.clone(),
1091					voting_rule: self.env.voting_rule.clone(),
1092					metrics: self.env.metrics.clone(),
1093					justification_sender: self.env.justification_sender.clone(),
1094					telemetry: self.telemetry.clone(),
1095					offchain_tx_pool_factory: self.env.offchain_tx_pool_factory.clone(),
1096					_phantom: PhantomData,
1097				});
1098
1099				self.rebuild_voter();
1100				Ok(())
1101			},
1102			VoterCommand::Pause(reason) => {
1103				info!(target: LOG_TARGET, "Pausing old validator set: {}", reason);
1104
1105				// not racing because old voter is shut down.
1106				self.env.update_voter_set_state(|voter_set_state| {
1107					let completed_rounds = voter_set_state.completed_rounds();
1108					let set_state = VoterSetState::Paused { completed_rounds };
1109
1110					aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
1111					Ok(Some(set_state))
1112				})?;
1113
1114				self.rebuild_voter();
1115				Ok(())
1116			},
1117		}
1118	}
1119}
1120
1121impl<B, Block, C, N, S, SC, VR> Future for VoterWork<B, Block, C, N, S, SC, VR>
1122where
1123	Block: BlockT,
1124	B: Backend<Block> + 'static,
1125	N: NetworkT<Block> + Sync,
1126	S: SyncingT<Block> + Sync,
1127	NumberFor<Block>: BlockNumberOps,
1128	SC: SelectChain<Block> + 'static,
1129	C: ClientForGrandpa<Block, B> + 'static,
1130	C::Api: GrandpaApi<Block>,
1131	VR: VotingRule<Block, C> + Clone + 'static,
1132{
1133	type Output = Result<(), Error>;
1134
1135	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1136		match Future::poll(Pin::new(&mut self.voter), cx) {
1137			Poll::Pending => {},
1138			Poll::Ready(Ok(())) => {
1139				// voters don't conclude naturally
1140				return Poll::Ready(Err(Error::Safety(
1141					"consensus-grandpa inner voter has concluded.".into(),
1142				)));
1143			},
1144			Poll::Ready(Err(CommandOrError::Error(e))) => {
1145				// return inner observer error
1146				return Poll::Ready(Err(e));
1147			},
1148			Poll::Ready(Err(CommandOrError::VoterCommand(command))) => {
1149				// some command issued internally
1150				self.handle_voter_command(command)?;
1151				cx.waker().wake_by_ref();
1152			},
1153		}
1154
1155		match Stream::poll_next(Pin::new(&mut self.voter_commands_rx), cx) {
1156			Poll::Pending => {},
1157			Poll::Ready(None) => {
1158				// the `voter_commands_rx` stream should never conclude since it's never closed.
1159				return Poll::Ready(Err(Error::Safety("`voter_commands_rx` was closed.".into())));
1160			},
1161			Poll::Ready(Some(command)) => {
1162				// some command issued externally
1163				self.handle_voter_command(command)?;
1164				cx.waker().wake_by_ref();
1165			},
1166		}
1167
1168		Future::poll(Pin::new(&mut self.network), cx)
1169	}
1170}
1171
1172/// Checks if this node has any available keys in the keystore for any authority id in the given
1173/// voter set.  Returns the authority id for which keys are available, or `None` if no keys are
1174/// available.
1175fn local_authority_id(
1176	voters: &VoterSet<AuthorityId>,
1177	keystore: Option<&KeystorePtr>,
1178) -> Option<AuthorityId> {
1179	keystore.and_then(|keystore| {
1180		voters
1181			.iter()
1182			.find(|(p, _)| keystore.has_keys(&[(p.to_raw_vec(), AuthorityId::ID)]))
1183			.map(|(p, _)| p.clone())
1184	})
1185}
1186
1187/// Reverts protocol aux data to at most the last finalized block.
1188/// In particular, standard and forced authority set changes announced after the
1189/// revert point are removed.
1190pub fn revert<Block, Client>(client: Arc<Client>, blocks: NumberFor<Block>) -> ClientResult<()>
1191where
1192	Block: BlockT,
1193	Client: AuxStore + HeaderMetadata<Block, Error = ClientError> + HeaderBackend<Block>,
1194{
1195	let best_number = client.info().best_number;
1196	let finalized = client.info().finalized_number;
1197
1198	let revertible = blocks.min(best_number - finalized);
1199	if revertible == Zero::zero() {
1200		return Ok(());
1201	}
1202
1203	let number = best_number - revertible;
1204	let hash = client
1205		.block_hash_from_id(&BlockId::Number(number))?
1206		.ok_or(ClientError::Backend(format!(
1207			"Unexpected hash lookup failure for block number: {}",
1208			number
1209		)))?;
1210
1211	let info = client.info();
1212
1213	let persistent_data: PersistentData<Block> =
1214		aux_schema::load_persistent(&*client, info.genesis_hash, Zero::zero(), || {
1215			const MSG: &str = "Unexpected missing grandpa data during revert";
1216			Err(ClientError::Application(Box::from(MSG)))
1217		})?;
1218
1219	let shared_authority_set = persistent_data.authority_set;
1220	let mut authority_set = shared_authority_set.inner();
1221
1222	let is_descendent_of = is_descendent_of(&*client, None);
1223	authority_set.revert(hash, number, &is_descendent_of);
1224
1225	// The following has the side effect to properly reset the current voter state.
1226	let (set_id, set_ref) = authority_set.current();
1227	let new_set = Some(NewAuthoritySet {
1228		canon_hash: info.finalized_hash,
1229		canon_number: info.finalized_number,
1230		set_id,
1231		authorities: set_ref.to_vec(),
1232	});
1233	aux_schema::update_authority_set::<Block, _, _>(&authority_set, new_set.as_ref(), |values| {
1234		client.insert_aux(values, None)
1235	})
1236}