referrerpolicy=no-referrer-when-downgrade

sc_consensus_grandpa/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Integration of the GRANDPA finality gadget into substrate.
20//!
21//! This crate is unstable and the API and usage may change.
22//!
23//! This crate provides a long-running future that produces finality notifications.
24//!
25//! # Usage
26//!
27//! First, create a block-import wrapper with the `block_import` function. The
28//! GRANDPA worker needs to be linked together with this block import object, so
29//! a `LinkHalf` is returned as well. All blocks imported (from network or
30//! consensus or otherwise) must pass through this wrapper, otherwise consensus
31//! is likely to break in unexpected ways.
32//!
33//! Next, use the `LinkHalf` and a local configuration to `run_grandpa_voter`.
34//! This requires a `Network` implementation. The returned future should be
35//! driven to completion and will finalize blocks in the background.
36//!
37//! # Changing authority sets
38//!
39//! The rough idea behind changing authority sets in GRANDPA is that at some point,
40//! we obtain agreement for some maximum block height that the current set can
41//! finalize, and once a block with that height is finalized the next set will
42//! pick up finalization from there.
43//!
44//! Technically speaking, this would be implemented as a voting rule which says,
45//! "if there is a signal for a change in N blocks in block B, only vote on
46//! chains with length NUM(B) + N if they contain B". This conditional-inclusion
47//! logic is complex to compute because it requires looking arbitrarily far
48//! back in the chain.
49//!
50//! Instead, we keep track of a list of all signals we've seen so far (across
51//! all forks), sorted ascending by the block number they would be applied at.
52//! We never vote on chains with number higher than the earliest handoff block
53//! number (this is num(signal) + N). When finalizing a block, we either apply
54//! or prune any signaled changes based on whether the signaling block is
55//! included in the newly-finalized chain.
56
57#![warn(missing_docs)]
58
59use codec::Decode;
60use futures::{prelude::*, StreamExt};
61use log::{debug, error, info};
62use parking_lot::RwLock;
63use prometheus_endpoint::{PrometheusError, Registry};
64use sc_client_api::{
65	backend::{AuxStore, Backend},
66	utils::is_descendent_of,
67	BlockchainEvents, CallExecutor, ExecutorProvider, Finalizer, LockImportRun, StorageProvider,
68};
69use sc_consensus::BlockImport;
70use sc_network::{types::ProtocolName, NetworkBackend, NotificationService};
71use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
72use sc_transaction_pool_api::OffchainTransactionPoolFactory;
73use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
74use sp_api::ProvideRuntimeApi;
75use sp_application_crypto::AppCrypto;
76use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata, Result as ClientResult};
77use sp_consensus::SelectChain;
78use sp_consensus_grandpa::{
79	AuthorityList, AuthoritySignature, SetId, CLIENT_LOG_TARGET as LOG_TARGET,
80};
81use sp_core::{crypto::ByteArray, traits::CallContext};
82use sp_keystore::KeystorePtr;
83use sp_runtime::{
84	generic::BlockId,
85	traits::{Block as BlockT, NumberFor, Zero},
86};
87
88pub use finality_grandpa::BlockNumberOps;
89use finality_grandpa::{voter, voter_set::VoterSet, Error as GrandpaError};
90
91use std::{
92	fmt, io,
93	pin::Pin,
94	sync::Arc,
95	task::{Context, Poll},
96	time::Duration,
97};
98
99// utility logging macro that takes as first argument a conditional to
100// decide whether to log under debug or info level (useful to restrict
101// logging under initial sync).
102macro_rules! grandpa_log {
103	($condition:expr, $($msg: expr),+ $(,)?) => {
104		{
105			let log_level = if $condition {
106				log::Level::Debug
107			} else {
108				log::Level::Info
109			};
110
111			log::log!(target: LOG_TARGET, log_level, $($msg),+);
112		}
113	};
114}
115
116mod authorities;
117mod aux_schema;
118mod communication;
119mod environment;
120mod finality_proof;
121mod import;
122mod justification;
123mod notification;
124mod observer;
125mod until_imported;
126mod voting_rule;
127pub mod warp_proof;
128
129pub use authorities::{AuthoritySet, AuthoritySetChanges, SharedAuthoritySet};
130pub use aux_schema::best_justification;
131pub use communication::grandpa_protocol_name::standard_name as protocol_standard_name;
132pub use finality_grandpa::voter::report;
133pub use finality_proof::{FinalityProof, FinalityProofError, FinalityProofProvider};
134pub use import::{find_forced_change, find_scheduled_change, GrandpaBlockImport};
135pub use justification::GrandpaJustification;
136pub use notification::{GrandpaJustificationSender, GrandpaJustificationStream};
137pub use observer::run_grandpa_observer;
138pub use voting_rule::{
139	BeforeBestBlockBy, ThreeQuartersOfTheUnfinalizedChain, VotingRule, VotingRuleResult,
140	VotingRulesBuilder,
141};
142
143use aux_schema::PersistentData;
144use communication::{Network as NetworkT, NetworkBridge, Syncing as SyncingT};
145use environment::{Environment, VoterSetState};
146use until_imported::UntilGlobalMessageBlocksImported;
147
148// Re-export these two because it's just so damn convenient.
149pub use sp_consensus_grandpa::{
150	AuthorityId, AuthorityPair, CatchUp, Commit, CompactCommit, GrandpaApi, Message, Precommit,
151	Prevote, PrimaryPropose, ScheduledChange, SignedMessage,
152};
153use std::marker::PhantomData;
154
155#[cfg(test)]
156mod tests;
157
158/// A global communication input stream for commits and catch up messages. Not
159/// exposed publicly, used internally to simplify types in the communication
160/// layer.
161type CommunicationIn<Block> = voter::CommunicationIn<
162	<Block as BlockT>::Hash,
163	NumberFor<Block>,
164	AuthoritySignature,
165	AuthorityId,
166>;
167/// Global communication input stream for commits and catch up messages, with
168/// the hash type not being derived from the block, useful for forcing the hash
169/// to some type (e.g. `H256`) when the compiler can't do the inference.
170type CommunicationInH<Block, H> =
171	voter::CommunicationIn<H, NumberFor<Block>, AuthoritySignature, AuthorityId>;
172
173/// Global communication sink for commits with the hash type not being derived
174/// from the block, useful for forcing the hash to some type (e.g. `H256`) when
175/// the compiler can't do the inference.
176type CommunicationOutH<Block, H> =
177	voter::CommunicationOut<H, NumberFor<Block>, AuthoritySignature, AuthorityId>;
178
179/// Shared voter state for querying.
180pub struct SharedVoterState {
181	inner: Arc<RwLock<Option<Box<dyn voter::VoterState<AuthorityId> + Sync + Send>>>>,
182}
183
184impl SharedVoterState {
185	/// Create a new empty `SharedVoterState` instance.
186	pub fn empty() -> Self {
187		Self { inner: Arc::new(RwLock::new(None)) }
188	}
189
190	fn reset(
191		&self,
192		voter_state: Box<dyn voter::VoterState<AuthorityId> + Sync + Send>,
193	) -> Option<()> {
194		let mut shared_voter_state = self.inner.try_write_for(Duration::from_secs(1))?;
195
196		*shared_voter_state = Some(voter_state);
197		Some(())
198	}
199
200	/// Get the inner `VoterState` instance.
201	pub fn voter_state(&self) -> Option<report::VoterState<AuthorityId>> {
202		self.inner.read().as_ref().map(|vs| vs.get())
203	}
204}
205
206impl Clone for SharedVoterState {
207	fn clone(&self) -> Self {
208		SharedVoterState { inner: self.inner.clone() }
209	}
210}
211
212/// Configuration for the GRANDPA service
213#[derive(Clone)]
214pub struct Config {
215	/// The expected duration for a message to be gossiped across the network.
216	pub gossip_duration: Duration,
217	/// Justification generation period (in blocks). GRANDPA will try to generate
218	/// justifications at least every justification_generation_period blocks. There
219	/// are some other events which might cause justification generation.
220	pub justification_generation_period: u32,
221	/// Whether the GRANDPA observer protocol is live on the network and thereby
222	/// a full-node not running as a validator is running the GRANDPA observer
223	/// protocol (we will only issue catch-up requests to authorities when the
224	/// observer protocol is enabled).
225	pub observer_enabled: bool,
226	/// The role of the local node (i.e. authority, full-node or light).
227	pub local_role: sc_network::config::Role,
228	/// Some local identifier of the voter.
229	pub name: Option<String>,
230	/// The keystore that manages the keys of this node.
231	pub keystore: Option<KeystorePtr>,
232	/// TelemetryHandle instance.
233	pub telemetry: Option<TelemetryHandle>,
234	/// Chain specific GRANDPA protocol name. See [`crate::protocol_standard_name`].
235	pub protocol_name: ProtocolName,
236}
237
238impl Config {
239	fn name(&self) -> &str {
240		self.name.as_deref().unwrap_or("<unknown>")
241	}
242}
243
244/// Errors that can occur while voting in GRANDPA.
245#[derive(Debug, thiserror::Error)]
246pub enum Error {
247	/// An error within grandpa.
248	#[error("grandpa error: {0}")]
249	Grandpa(#[from] GrandpaError),
250
251	/// A network error.
252	#[error("network error: {0}")]
253	Network(String),
254
255	/// A blockchain error.
256	#[error("blockchain error: {0}")]
257	Blockchain(String),
258
259	/// Could not complete a round on disk.
260	#[error("could not complete a round on disk: {0}")]
261	Client(#[from] ClientError),
262
263	/// Could not sign outgoing message
264	#[error("could not sign outgoing message: {0}")]
265	Signing(String),
266
267	/// An invariant has been violated (e.g. not finalizing pending change blocks in-order)
268	#[error("safety invariant has been violated: {0}")]
269	Safety(String),
270
271	/// A timer failed to fire.
272	#[error("a timer failed to fire: {0}")]
273	Timer(io::Error),
274
275	/// A runtime api request failed.
276	#[error("runtime API request failed: {0}")]
277	RuntimeApi(sp_api::ApiError),
278}
279
280/// Something which can determine if a block is known.
281pub(crate) trait BlockStatus<Block: BlockT> {
282	/// Return `Ok(Some(number))` or `Ok(None)` depending on whether the block
283	/// is definitely known and has been imported.
284	/// If an unexpected error occurs, return that.
285	fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error>;
286}
287
288impl<Block: BlockT, Client> BlockStatus<Block> for Arc<Client>
289where
290	Client: HeaderBackend<Block>,
291	NumberFor<Block>: BlockNumberOps,
292{
293	fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error> {
294		self.block_number_from_id(&BlockId::Hash(hash))
295			.map_err(|e| Error::Blockchain(e.to_string()))
296	}
297}
298
299/// A trait that includes all the client functionalities grandpa requires.
300/// Ideally this would be a trait alias, we're not there yet.
301/// tracking issue <https://github.com/rust-lang/rust/issues/41517>
302pub trait ClientForGrandpa<Block, BE>:
303	LockImportRun<Block, BE>
304	+ Finalizer<Block, BE>
305	+ AuxStore
306	+ HeaderMetadata<Block, Error = sp_blockchain::Error>
307	+ HeaderBackend<Block>
308	+ BlockchainEvents<Block>
309	+ ProvideRuntimeApi<Block>
310	+ ExecutorProvider<Block>
311	+ BlockImport<Block, Error = sp_consensus::Error>
312	+ StorageProvider<Block, BE>
313where
314	BE: Backend<Block>,
315	Block: BlockT,
316{
317}
318
319impl<Block, BE, T> ClientForGrandpa<Block, BE> for T
320where
321	BE: Backend<Block>,
322	Block: BlockT,
323	T: LockImportRun<Block, BE>
324		+ Finalizer<Block, BE>
325		+ AuxStore
326		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
327		+ HeaderBackend<Block>
328		+ BlockchainEvents<Block>
329		+ ProvideRuntimeApi<Block>
330		+ ExecutorProvider<Block>
331		+ BlockImport<Block, Error = sp_consensus::Error>
332		+ StorageProvider<Block, BE>,
333{
334}
335
336/// Something that one can ask to do a block sync request.
337pub(crate) trait BlockSyncRequester<Block: BlockT> {
338	/// Notifies the sync service to try and sync the given block from the given
339	/// peers.
340	///
341	/// If the given vector of peers is empty then the underlying implementation
342	/// should make a best effort to fetch the block from any peers it is
343	/// connected to (NOTE: this assumption will change in the future #3629).
344	fn set_sync_fork_request(
345		&self,
346		peers: Vec<sc_network_types::PeerId>,
347		hash: Block::Hash,
348		number: NumberFor<Block>,
349	);
350}
351
352impl<Block, Network, Syncing> BlockSyncRequester<Block> for NetworkBridge<Block, Network, Syncing>
353where
354	Block: BlockT,
355	Network: NetworkT<Block>,
356	Syncing: SyncingT<Block>,
357{
358	fn set_sync_fork_request(
359		&self,
360		peers: Vec<sc_network_types::PeerId>,
361		hash: Block::Hash,
362		number: NumberFor<Block>,
363	) {
364		NetworkBridge::set_sync_fork_request(self, peers, hash, number)
365	}
366}
367
368/// A new authority set along with the canonical block it changed at.
369#[derive(Debug)]
370pub(crate) struct NewAuthoritySet<H, N> {
371	pub(crate) canon_number: N,
372	pub(crate) canon_hash: H,
373	pub(crate) set_id: SetId,
374	pub(crate) authorities: AuthorityList,
375}
376
377/// Commands issued to the voter.
378#[derive(Debug)]
379pub(crate) enum VoterCommand<H, N> {
380	/// Pause the voter for given reason.
381	Pause(String),
382	/// New authorities.
383	ChangeAuthorities(NewAuthoritySet<H, N>),
384}
385
386impl<H, N> fmt::Display for VoterCommand<H, N> {
387	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
388		match *self {
389			VoterCommand::Pause(ref reason) => write!(f, "Pausing voter: {}", reason),
390			VoterCommand::ChangeAuthorities(_) => write!(f, "Changing authorities"),
391		}
392	}
393}
394
395/// Signals either an early exit of a voter or an error.
396#[derive(Debug)]
397pub(crate) enum CommandOrError<H, N> {
398	/// An error occurred.
399	Error(Error),
400	/// A command to the voter.
401	VoterCommand(VoterCommand<H, N>),
402}
403
404impl<H, N> From<Error> for CommandOrError<H, N> {
405	fn from(e: Error) -> Self {
406		CommandOrError::Error(e)
407	}
408}
409
410impl<H, N> From<ClientError> for CommandOrError<H, N> {
411	fn from(e: ClientError) -> Self {
412		CommandOrError::Error(Error::Client(e))
413	}
414}
415
416impl<H, N> From<finality_grandpa::Error> for CommandOrError<H, N> {
417	fn from(e: finality_grandpa::Error) -> Self {
418		CommandOrError::Error(Error::from(e))
419	}
420}
421
422impl<H, N> From<VoterCommand<H, N>> for CommandOrError<H, N> {
423	fn from(e: VoterCommand<H, N>) -> Self {
424		CommandOrError::VoterCommand(e)
425	}
426}
427
428impl<H: fmt::Debug, N: fmt::Debug> ::std::error::Error for CommandOrError<H, N> {}
429
430impl<H, N> fmt::Display for CommandOrError<H, N> {
431	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
432		match *self {
433			CommandOrError::Error(ref e) => write!(f, "{}", e),
434			CommandOrError::VoterCommand(ref cmd) => write!(f, "{}", cmd),
435		}
436	}
437}
438
439/// Link between the block importer and the background voter.
440pub struct LinkHalf<Block: BlockT, C, SC> {
441	client: Arc<C>,
442	select_chain: SC,
443	persistent_data: PersistentData<Block>,
444	voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
445	justification_sender: GrandpaJustificationSender<Block>,
446	justification_stream: GrandpaJustificationStream<Block>,
447	telemetry: Option<TelemetryHandle>,
448}
449
450impl<Block: BlockT, C, SC> LinkHalf<Block, C, SC> {
451	/// Get the shared authority set.
452	pub fn shared_authority_set(&self) -> &SharedAuthoritySet<Block::Hash, NumberFor<Block>> {
453		&self.persistent_data.authority_set
454	}
455
456	/// Get the receiving end of justification notifications.
457	pub fn justification_stream(&self) -> GrandpaJustificationStream<Block> {
458		self.justification_stream.clone()
459	}
460}
461
462/// Provider for the Grandpa authority set configured on the genesis block.
463pub trait GenesisAuthoritySetProvider<Block: BlockT> {
464	/// Get the authority set at the genesis block.
465	fn get(&self) -> Result<AuthorityList, ClientError>;
466}
467
468impl<Block: BlockT, E, Client> GenesisAuthoritySetProvider<Block> for Arc<Client>
469where
470	E: CallExecutor<Block>,
471	Client: ExecutorProvider<Block, Executor = E> + HeaderBackend<Block>,
472{
473	fn get(&self) -> Result<AuthorityList, ClientError> {
474		self.executor()
475			.call(
476				self.expect_block_hash_from_id(&BlockId::Number(Zero::zero()))?,
477				"GrandpaApi_grandpa_authorities",
478				&[],
479				CallContext::Offchain,
480			)
481			.and_then(|call_result| {
482				Decode::decode(&mut &call_result[..]).map_err(|err| {
483					ClientError::CallResultDecode(
484						"failed to decode GRANDPA authorities set proof",
485						err,
486					)
487				})
488			})
489	}
490}
491
492/// Make block importer and link half necessary to tie the background voter
493/// to it.
494///
495/// The `justification_import_period` sets the minimum period on which
496/// justifications will be imported.  When importing a block, if it includes a
497/// justification it will only be processed if it fits within this period,
498/// otherwise it will be ignored (and won't be validated). This is to avoid
499/// slowing down sync by a peer serving us unnecessary justifications which
500/// aren't trivial to validate.
501pub fn block_import<BE, Block: BlockT, Client, SC>(
502	client: Arc<Client>,
503	justification_import_period: u32,
504	genesis_authorities_provider: &dyn GenesisAuthoritySetProvider<Block>,
505	select_chain: SC,
506	telemetry: Option<TelemetryHandle>,
507) -> Result<(GrandpaBlockImport<BE, Block, Client, SC>, LinkHalf<Block, Client, SC>), ClientError>
508where
509	SC: SelectChain<Block>,
510	BE: Backend<Block> + 'static,
511	Client: ClientForGrandpa<Block, BE> + 'static,
512{
513	block_import_with_authority_set_hard_forks(
514		client,
515		justification_import_period,
516		genesis_authorities_provider,
517		select_chain,
518		Default::default(),
519		telemetry,
520	)
521}
522
523/// A descriptor for an authority set hard fork. These are authority set changes
524/// that are not signalled by the runtime and instead are defined off-chain
525/// (hence the hard fork).
526pub struct AuthoritySetHardFork<Block: BlockT> {
527	/// The new authority set id.
528	pub set_id: SetId,
529	/// The block hash and number at which the hard fork should be applied.
530	pub block: (Block::Hash, NumberFor<Block>),
531	/// The authorities in the new set.
532	pub authorities: AuthorityList,
533	/// The latest block number that was finalized before this authority set
534	/// hard fork. When defined, the authority set change will be forced, i.e.
535	/// the node won't wait for the block above to be finalized before enacting
536	/// the change, and the given finalized number will be used as a base for
537	/// voting.
538	pub last_finalized: Option<NumberFor<Block>>,
539}
540
541/// Make block importer and link half necessary to tie the background voter to
542/// it. A vector of authority set hard forks can be passed, any authority set
543/// change signaled at the given block (either already signalled or in a further
544/// block when importing it) will be replaced by a standard change with the
545/// given static authorities.
546pub fn block_import_with_authority_set_hard_forks<BE, Block: BlockT, Client, SC>(
547	client: Arc<Client>,
548	justification_import_period: u32,
549	genesis_authorities_provider: &dyn GenesisAuthoritySetProvider<Block>,
550	select_chain: SC,
551	authority_set_hard_forks: Vec<AuthoritySetHardFork<Block>>,
552	telemetry: Option<TelemetryHandle>,
553) -> Result<(GrandpaBlockImport<BE, Block, Client, SC>, LinkHalf<Block, Client, SC>), ClientError>
554where
555	SC: SelectChain<Block>,
556	BE: Backend<Block> + 'static,
557	Client: ClientForGrandpa<Block, BE> + 'static,
558{
559	let chain_info = client.info();
560	let genesis_hash = chain_info.genesis_hash;
561
562	let persistent_data =
563		aux_schema::load_persistent(&*client, genesis_hash, <NumberFor<Block>>::zero(), {
564			let telemetry = telemetry.clone();
565			move || {
566				let authorities = genesis_authorities_provider.get()?;
567				telemetry!(
568					telemetry;
569					CONSENSUS_DEBUG;
570					"afg.loading_authorities";
571					"authorities_len" => ?authorities.len()
572				);
573				Ok(authorities)
574			}
575		})?;
576
577	let (voter_commands_tx, voter_commands_rx) =
578		tracing_unbounded("mpsc_grandpa_voter_command", 100_000);
579
580	let (justification_sender, justification_stream) = GrandpaJustificationStream::channel();
581
582	// create pending change objects with 0 delay for each authority set hard fork.
583	let authority_set_hard_forks = authority_set_hard_forks
584		.into_iter()
585		.map(|fork| {
586			let delay_kind = if let Some(last_finalized) = fork.last_finalized {
587				authorities::DelayKind::Best { median_last_finalized: last_finalized }
588			} else {
589				authorities::DelayKind::Finalized
590			};
591
592			(
593				fork.set_id,
594				authorities::PendingChange {
595					next_authorities: fork.authorities,
596					delay: Zero::zero(),
597					canon_hash: fork.block.0,
598					canon_height: fork.block.1,
599					delay_kind,
600				},
601			)
602		})
603		.collect();
604
605	Ok((
606		GrandpaBlockImport::new(
607			client.clone(),
608			justification_import_period,
609			select_chain.clone(),
610			persistent_data.authority_set.clone(),
611			voter_commands_tx,
612			authority_set_hard_forks,
613			justification_sender.clone(),
614			telemetry.clone(),
615		),
616		LinkHalf {
617			client,
618			select_chain,
619			persistent_data,
620			voter_commands_rx,
621			justification_sender,
622			justification_stream,
623			telemetry,
624		},
625	))
626}
627
628fn global_communication<BE, Block: BlockT, C, N, S>(
629	set_id: SetId,
630	voters: &Arc<VoterSet<AuthorityId>>,
631	client: Arc<C>,
632	network: &NetworkBridge<Block, N, S>,
633	keystore: Option<&KeystorePtr>,
634	metrics: Option<until_imported::Metrics>,
635) -> (
636	impl Stream<
637		Item = Result<
638			CommunicationInH<Block, Block::Hash>,
639			CommandOrError<Block::Hash, NumberFor<Block>>,
640		>,
641	>,
642	impl Sink<
643		CommunicationOutH<Block, Block::Hash>,
644		Error = CommandOrError<Block::Hash, NumberFor<Block>>,
645	>,
646)
647where
648	BE: Backend<Block> + 'static,
649	C: ClientForGrandpa<Block, BE> + 'static,
650	N: NetworkT<Block>,
651	S: SyncingT<Block>,
652	NumberFor<Block>: BlockNumberOps,
653{
654	let is_voter = local_authority_id(voters, keystore).is_some();
655
656	// verification stream
657	let (global_in, global_out) =
658		network.global_communication(communication::SetId(set_id), voters.clone(), is_voter);
659
660	// block commit and catch up messages until relevant blocks are imported.
661	let global_in = UntilGlobalMessageBlocksImported::new(
662		client.import_notification_stream(),
663		network.clone(),
664		client.clone(),
665		global_in,
666		"global",
667		metrics,
668	);
669
670	let global_in = global_in.map_err(CommandOrError::from);
671	let global_out = global_out.sink_map_err(CommandOrError::from);
672
673	(global_in, global_out)
674}
675
676/// Parameters used to run Grandpa.
677pub struct GrandpaParams<Block: BlockT, C, N, S, SC, VR> {
678	/// Configuration for the GRANDPA service.
679	pub config: Config,
680	/// A link to the block import worker.
681	pub link: LinkHalf<Block, C, SC>,
682	/// The Network instance.
683	///
684	/// It is assumed that this network will feed us Grandpa notifications. When using the
685	/// `sc_network` crate, it is assumed that the Grandpa notifications protocol has been passed
686	/// to the configuration of the networking. See [`grandpa_peers_set_config`].
687	pub network: N,
688	/// Event stream for syncing-related events.
689	pub sync: S,
690	/// Handle for interacting with `Notifications`.
691	pub notification_service: Box<dyn NotificationService>,
692	/// A voting rule used to potentially restrict target votes.
693	pub voting_rule: VR,
694	/// The prometheus metrics registry.
695	pub prometheus_registry: Option<prometheus_endpoint::Registry>,
696	/// The voter state is exposed at an RPC endpoint.
697	pub shared_voter_state: SharedVoterState,
698	/// TelemetryHandle instance.
699	pub telemetry: Option<TelemetryHandle>,
700	/// Offchain transaction pool factory.
701	///
702	/// This will be used to create an offchain transaction pool instance for sending an
703	/// equivocation report from the runtime.
704	pub offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
705}
706
707/// Returns the configuration value to put in
708/// [`sc_network::config::FullNetworkConfiguration`].
709/// For standard protocol name see [`crate::protocol_standard_name`].
710pub fn grandpa_peers_set_config<B: BlockT, N: NetworkBackend<B, <B as BlockT>::Hash>>(
711	protocol_name: ProtocolName,
712	metrics: sc_network::service::NotificationMetrics,
713	peer_store_handle: Arc<dyn sc_network::peer_store::PeerStoreProvider>,
714) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
715	use communication::grandpa_protocol_name;
716	N::notification_config(
717		protocol_name,
718		grandpa_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect(),
719		// Notifications reach ~256kiB in size at the time of writing on Kusama and Polkadot.
720		1024 * 1024,
721		None,
722		sc_network::config::SetConfig {
723			in_peers: 0,
724			out_peers: 0,
725			reserved_nodes: Vec::new(),
726			non_reserved_mode: sc_network::config::NonReservedPeerMode::Deny,
727		},
728		metrics,
729		peer_store_handle,
730	)
731}
732
733/// Run a GRANDPA voter as a task. Provide configuration and a link to a
734/// block import worker that has already been instantiated with `block_import`.
735pub fn run_grandpa_voter<Block: BlockT, BE: 'static, C, N, S, SC, VR>(
736	grandpa_params: GrandpaParams<Block, C, N, S, SC, VR>,
737) -> sp_blockchain::Result<impl Future<Output = ()> + Send>
738where
739	BE: Backend<Block> + 'static,
740	N: NetworkT<Block> + Sync + 'static,
741	S: SyncingT<Block> + Sync + 'static,
742	SC: SelectChain<Block> + 'static,
743	VR: VotingRule<Block, C> + Clone + 'static,
744	NumberFor<Block>: BlockNumberOps,
745	C: ClientForGrandpa<Block, BE> + 'static,
746	C::Api: GrandpaApi<Block>,
747{
748	let GrandpaParams {
749		mut config,
750		link,
751		network,
752		sync,
753		notification_service,
754		voting_rule,
755		prometheus_registry,
756		shared_voter_state,
757		telemetry,
758		offchain_tx_pool_factory,
759	} = grandpa_params;
760
761	// NOTE: we have recently removed `run_grandpa_observer` from the public
762	// API, I felt it is easier to just ignore this field rather than removing
763	// it from the config temporarily. This should be removed after #5013 is
764	// fixed and we re-add the observer to the public API.
765	config.observer_enabled = false;
766
767	let LinkHalf {
768		client,
769		select_chain,
770		persistent_data,
771		voter_commands_rx,
772		justification_sender,
773		justification_stream: _,
774		telemetry: _,
775	} = link;
776
777	let network = NetworkBridge::new(
778		network,
779		sync,
780		notification_service,
781		config.clone(),
782		persistent_data.set_state.clone(),
783		prometheus_registry.as_ref(),
784		telemetry.clone(),
785	);
786
787	let conf = config.clone();
788	let telemetry_task =
789		if let Some(telemetry_on_connect) = telemetry.as_ref().map(|x| x.on_connect_stream()) {
790			let authorities = persistent_data.authority_set.clone();
791			let telemetry = telemetry.clone();
792			let events = telemetry_on_connect.for_each(move |_| {
793				let current_authorities = authorities.current_authorities();
794				let set_id = authorities.set_id();
795				let maybe_authority_id =
796					local_authority_id(&current_authorities, conf.keystore.as_ref());
797
798				let authorities =
799					current_authorities.iter().map(|(id, _)| id.to_string()).collect::<Vec<_>>();
800
801				let authorities = serde_json::to_string(&authorities).expect(
802					"authorities is always at least an empty vector; \
803					 elements are always of type string",
804				);
805
806				telemetry!(
807					telemetry;
808					CONSENSUS_INFO;
809					"afg.authority_set";
810					"authority_id" => maybe_authority_id.map_or("".into(), |s| s.to_string()),
811					"authority_set_id" => ?set_id,
812					"authorities" => authorities,
813				);
814
815				future::ready(())
816			});
817			future::Either::Left(events)
818		} else {
819			future::Either::Right(future::pending())
820		};
821
822	let voter_work = VoterWork::new(
823		client,
824		config,
825		network,
826		select_chain,
827		voting_rule,
828		persistent_data,
829		voter_commands_rx,
830		prometheus_registry,
831		shared_voter_state,
832		justification_sender,
833		telemetry,
834		offchain_tx_pool_factory,
835	);
836
837	let voter_work = voter_work.map(|res| match res {
838		Ok(()) => error!(
839			target: LOG_TARGET,
840			"GRANDPA voter future has concluded naturally, this should be unreachable."
841		),
842		Err(e) => error!(target: LOG_TARGET, "GRANDPA voter error: {}", e),
843	});
844
845	// Make sure that `telemetry_task` doesn't accidentally finish and kill grandpa.
846	let telemetry_task = telemetry_task.then(|_| future::pending::<()>());
847
848	Ok(future::select(voter_work, telemetry_task).map(drop))
849}
850
851struct Metrics {
852	environment: environment::Metrics,
853	until_imported: until_imported::Metrics,
854}
855
856impl Metrics {
857	fn register(registry: &Registry) -> Result<Self, PrometheusError> {
858		Ok(Metrics {
859			environment: environment::Metrics::register(registry)?,
860			until_imported: until_imported::Metrics::register(registry)?,
861		})
862	}
863}
864
865/// Future that powers the voter.
866#[must_use]
867struct VoterWork<B, Block: BlockT, C, N: NetworkT<Block>, S: SyncingT<Block>, SC, VR> {
868	voter: Pin<
869		Box<dyn Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> + Send>,
870	>,
871	shared_voter_state: SharedVoterState,
872	env: Arc<Environment<B, Block, C, N, S, SC, VR>>,
873	voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
874	network: NetworkBridge<Block, N, S>,
875	telemetry: Option<TelemetryHandle>,
876	/// Prometheus metrics.
877	metrics: Option<Metrics>,
878}
879
880impl<B, Block, C, N, S, SC, VR> VoterWork<B, Block, C, N, S, SC, VR>
881where
882	Block: BlockT,
883	B: Backend<Block> + 'static,
884	C: ClientForGrandpa<Block, B> + 'static,
885	C::Api: GrandpaApi<Block>,
886	N: NetworkT<Block> + Sync,
887	S: SyncingT<Block> + Sync,
888	NumberFor<Block>: BlockNumberOps,
889	SC: SelectChain<Block> + 'static,
890	VR: VotingRule<Block, C> + Clone + 'static,
891{
892	fn new(
893		client: Arc<C>,
894		config: Config,
895		network: NetworkBridge<Block, N, S>,
896		select_chain: SC,
897		voting_rule: VR,
898		persistent_data: PersistentData<Block>,
899		voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
900		prometheus_registry: Option<prometheus_endpoint::Registry>,
901		shared_voter_state: SharedVoterState,
902		justification_sender: GrandpaJustificationSender<Block>,
903		telemetry: Option<TelemetryHandle>,
904		offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
905	) -> Self {
906		let metrics = match prometheus_registry.as_ref().map(Metrics::register) {
907			Some(Ok(metrics)) => Some(metrics),
908			Some(Err(e)) => {
909				debug!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
910				None
911			},
912			None => None,
913		};
914
915		let voters = persistent_data.authority_set.current_authorities();
916		let env = Arc::new(Environment {
917			client,
918			select_chain,
919			voting_rule,
920			voters: Arc::new(voters),
921			config,
922			network: network.clone(),
923			set_id: persistent_data.authority_set.set_id(),
924			authority_set: persistent_data.authority_set.clone(),
925			voter_set_state: persistent_data.set_state,
926			metrics: metrics.as_ref().map(|m| m.environment.clone()),
927			justification_sender: Some(justification_sender),
928			telemetry: telemetry.clone(),
929			offchain_tx_pool_factory,
930			_phantom: PhantomData,
931		});
932
933		let mut work = VoterWork {
934			// `voter` is set to a temporary value and replaced below when
935			// calling `rebuild_voter`.
936			voter: Box::pin(future::pending()),
937			shared_voter_state,
938			env,
939			voter_commands_rx,
940			network,
941			telemetry,
942			metrics,
943		};
944		work.rebuild_voter();
945		work
946	}
947
948	/// Rebuilds the `self.voter` field using the current authority set
949	/// state. This method should be called when we know that the authority set
950	/// has changed (e.g. as signalled by a voter command).
951	fn rebuild_voter(&mut self) {
952		debug!(
953			target: LOG_TARGET,
954			"{}: Starting new voter with set ID {}",
955			self.env.config.name(),
956			self.env.set_id
957		);
958
959		let maybe_authority_id =
960			local_authority_id(&self.env.voters, self.env.config.keystore.as_ref());
961		let authority_id = maybe_authority_id.map_or("<unknown>".into(), |s| s.to_string());
962
963		telemetry!(
964			self.telemetry;
965			CONSENSUS_DEBUG;
966			"afg.starting_new_voter";
967			"name" => ?self.env.config.name(),
968			"set_id" => ?self.env.set_id,
969			"authority_id" => authority_id,
970		);
971
972		let chain_info = self.env.client.info();
973
974		let authorities = self.env.voters.iter().map(|(id, _)| id.to_string()).collect::<Vec<_>>();
975
976		let authorities = serde_json::to_string(&authorities).expect(
977			"authorities is always at least an empty vector; elements are always of type string; qed.",
978		);
979
980		telemetry!(
981			self.telemetry;
982			CONSENSUS_INFO;
983			"afg.authority_set";
984			"number" => ?chain_info.finalized_number,
985			"hash" => ?chain_info.finalized_hash,
986			"authority_id" => authority_id,
987			"authority_set_id" => ?self.env.set_id,
988			"authorities" => authorities,
989		);
990
991		match &*self.env.voter_set_state.read() {
992			VoterSetState::Live { completed_rounds, .. } => {
993				let last_finalized = (chain_info.finalized_hash, chain_info.finalized_number);
994
995				let global_comms = global_communication(
996					self.env.set_id,
997					&self.env.voters,
998					self.env.client.clone(),
999					&self.env.network,
1000					self.env.config.keystore.as_ref(),
1001					self.metrics.as_ref().map(|m| m.until_imported.clone()),
1002				);
1003
1004				let last_completed_round = completed_rounds.last();
1005
1006				let voter = voter::Voter::new(
1007					self.env.clone(),
1008					(*self.env.voters).clone(),
1009					global_comms,
1010					last_completed_round.number,
1011					last_completed_round.votes.clone(),
1012					last_completed_round.base,
1013					last_finalized,
1014				);
1015
1016				// Repoint shared_voter_state so that the RPC endpoint can query the state
1017				if self.shared_voter_state.reset(voter.voter_state()).is_none() {
1018					info!(
1019						target: LOG_TARGET,
1020						"Timed out trying to update shared GRANDPA voter state. \
1021						RPC endpoints may return stale data."
1022					);
1023				}
1024
1025				self.voter = Box::pin(voter);
1026			},
1027			VoterSetState::Paused { .. } => self.voter = Box::pin(future::pending()),
1028		};
1029	}
1030
1031	fn handle_voter_command(
1032		&mut self,
1033		command: VoterCommand<Block::Hash, NumberFor<Block>>,
1034	) -> Result<(), Error> {
1035		match command {
1036			VoterCommand::ChangeAuthorities(new) => {
1037				let voters: Vec<String> =
1038					new.authorities.iter().map(move |(a, _)| format!("{}", a)).collect();
1039				telemetry!(
1040					self.telemetry;
1041					CONSENSUS_INFO;
1042					"afg.voter_command_change_authorities";
1043					"number" => ?new.canon_number,
1044					"hash" => ?new.canon_hash,
1045					"voters" => ?voters,
1046					"set_id" => ?new.set_id,
1047				);
1048
1049				self.env.update_voter_set_state(|_| {
1050					// start the new authority set using the block where the
1051					// set changed (not where the signal happened!) as the base.
1052					let set_state = VoterSetState::live(
1053						new.set_id,
1054						&*self.env.authority_set.inner(),
1055						(new.canon_hash, new.canon_number),
1056					);
1057
1058					aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
1059					Ok(Some(set_state))
1060				})?;
1061
1062				let voters = Arc::new(VoterSet::new(new.authorities.into_iter()).expect(
1063					"new authorities come from pending change; pending change comes from \
1064					 `AuthoritySet`; `AuthoritySet` validates authorities is non-empty and \
1065					 weights are non-zero; qed.",
1066				));
1067
1068				self.env = Arc::new(Environment {
1069					voters,
1070					set_id: new.set_id,
1071					voter_set_state: self.env.voter_set_state.clone(),
1072					client: self.env.client.clone(),
1073					select_chain: self.env.select_chain.clone(),
1074					config: self.env.config.clone(),
1075					authority_set: self.env.authority_set.clone(),
1076					network: self.env.network.clone(),
1077					voting_rule: self.env.voting_rule.clone(),
1078					metrics: self.env.metrics.clone(),
1079					justification_sender: self.env.justification_sender.clone(),
1080					telemetry: self.telemetry.clone(),
1081					offchain_tx_pool_factory: self.env.offchain_tx_pool_factory.clone(),
1082					_phantom: PhantomData,
1083				});
1084
1085				self.rebuild_voter();
1086				Ok(())
1087			},
1088			VoterCommand::Pause(reason) => {
1089				info!(target: LOG_TARGET, "Pausing old validator set: {}", reason);
1090
1091				// not racing because old voter is shut down.
1092				self.env.update_voter_set_state(|voter_set_state| {
1093					let completed_rounds = voter_set_state.completed_rounds();
1094					let set_state = VoterSetState::Paused { completed_rounds };
1095
1096					aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
1097					Ok(Some(set_state))
1098				})?;
1099
1100				self.rebuild_voter();
1101				Ok(())
1102			},
1103		}
1104	}
1105}
1106
1107impl<B, Block, C, N, S, SC, VR> Future for VoterWork<B, Block, C, N, S, SC, VR>
1108where
1109	Block: BlockT,
1110	B: Backend<Block> + 'static,
1111	N: NetworkT<Block> + Sync,
1112	S: SyncingT<Block> + Sync,
1113	NumberFor<Block>: BlockNumberOps,
1114	SC: SelectChain<Block> + 'static,
1115	C: ClientForGrandpa<Block, B> + 'static,
1116	C::Api: GrandpaApi<Block>,
1117	VR: VotingRule<Block, C> + Clone + 'static,
1118{
1119	type Output = Result<(), Error>;
1120
1121	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1122		match Future::poll(Pin::new(&mut self.voter), cx) {
1123			Poll::Pending => {},
1124			Poll::Ready(Ok(())) => {
1125				// voters don't conclude naturally
1126				return Poll::Ready(Err(Error::Safety(
1127					"consensus-grandpa inner voter has concluded.".into(),
1128				)))
1129			},
1130			Poll::Ready(Err(CommandOrError::Error(e))) => {
1131				// return inner observer error
1132				return Poll::Ready(Err(e))
1133			},
1134			Poll::Ready(Err(CommandOrError::VoterCommand(command))) => {
1135				// some command issued internally
1136				self.handle_voter_command(command)?;
1137				cx.waker().wake_by_ref();
1138			},
1139		}
1140
1141		match Stream::poll_next(Pin::new(&mut self.voter_commands_rx), cx) {
1142			Poll::Pending => {},
1143			Poll::Ready(None) => {
1144				// the `voter_commands_rx` stream should never conclude since it's never closed.
1145				return Poll::Ready(Err(Error::Safety("`voter_commands_rx` was closed.".into())))
1146			},
1147			Poll::Ready(Some(command)) => {
1148				// some command issued externally
1149				self.handle_voter_command(command)?;
1150				cx.waker().wake_by_ref();
1151			},
1152		}
1153
1154		Future::poll(Pin::new(&mut self.network), cx)
1155	}
1156}
1157
1158/// Checks if this node has any available keys in the keystore for any authority id in the given
1159/// voter set.  Returns the authority id for which keys are available, or `None` if no keys are
1160/// available.
1161fn local_authority_id(
1162	voters: &VoterSet<AuthorityId>,
1163	keystore: Option<&KeystorePtr>,
1164) -> Option<AuthorityId> {
1165	keystore.and_then(|keystore| {
1166		voters
1167			.iter()
1168			.find(|(p, _)| keystore.has_keys(&[(p.to_raw_vec(), AuthorityId::ID)]))
1169			.map(|(p, _)| p.clone())
1170	})
1171}
1172
1173/// Reverts protocol aux data to at most the last finalized block.
1174/// In particular, standard and forced authority set changes announced after the
1175/// revert point are removed.
1176pub fn revert<Block, Client>(client: Arc<Client>, blocks: NumberFor<Block>) -> ClientResult<()>
1177where
1178	Block: BlockT,
1179	Client: AuxStore + HeaderMetadata<Block, Error = ClientError> + HeaderBackend<Block>,
1180{
1181	let best_number = client.info().best_number;
1182	let finalized = client.info().finalized_number;
1183
1184	let revertible = blocks.min(best_number - finalized);
1185	if revertible == Zero::zero() {
1186		return Ok(())
1187	}
1188
1189	let number = best_number - revertible;
1190	let hash = client
1191		.block_hash_from_id(&BlockId::Number(number))?
1192		.ok_or(ClientError::Backend(format!(
1193			"Unexpected hash lookup failure for block number: {}",
1194			number
1195		)))?;
1196
1197	let info = client.info();
1198
1199	let persistent_data: PersistentData<Block> =
1200		aux_schema::load_persistent(&*client, info.genesis_hash, Zero::zero(), || {
1201			const MSG: &str = "Unexpected missing grandpa data during revert";
1202			Err(ClientError::Application(Box::from(MSG)))
1203		})?;
1204
1205	let shared_authority_set = persistent_data.authority_set;
1206	let mut authority_set = shared_authority_set.inner();
1207
1208	let is_descendent_of = is_descendent_of(&*client, None);
1209	authority_set.revert(hash, number, &is_descendent_of);
1210
1211	// The following has the side effect to properly reset the current voter state.
1212	let (set_id, set_ref) = authority_set.current();
1213	let new_set = Some(NewAuthoritySet {
1214		canon_hash: info.finalized_hash,
1215		canon_number: info.finalized_number,
1216		set_id,
1217		authorities: set_ref.to_vec(),
1218	});
1219	aux_schema::update_authority_set::<Block, _, _>(&authority_set, new_set.as_ref(), |values| {
1220		client.insert_aux(values, None)
1221	})
1222}