1#![warn(missing_docs)]
58
59use codec::Decode;
60use futures::{prelude::*, StreamExt};
61use log::{debug, error, info};
62use parking_lot::RwLock;
63use prometheus_endpoint::{PrometheusError, Registry};
64use sc_client_api::{
65 backend::{AuxStore, Backend},
66 utils::is_descendent_of,
67 BlockchainEvents, CallExecutor, ExecutorProvider, Finalizer, LockImportRun, StorageProvider,
68};
69use sc_consensus::BlockImport;
70use sc_network::{types::ProtocolName, NetworkBackend, NotificationService};
71use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
72use sc_transaction_pool_api::OffchainTransactionPoolFactory;
73use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
74use sp_api::ProvideRuntimeApi;
75use sp_application_crypto::AppCrypto;
76use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata, Result as ClientResult};
77use sp_consensus::SelectChain;
78use sp_consensus_grandpa::{
79 AuthorityList, AuthoritySignature, SetId, CLIENT_LOG_TARGET as LOG_TARGET,
80};
81use sp_core::{crypto::ByteArray, traits::CallContext};
82use sp_keystore::KeystorePtr;
83use sp_runtime::{
84 generic::BlockId,
85 traits::{Block as BlockT, NumberFor, Zero},
86};
87
88pub use finality_grandpa::BlockNumberOps;
89use finality_grandpa::{voter, voter_set::VoterSet, Error as GrandpaError};
90
91use std::{
92 fmt, io,
93 pin::Pin,
94 sync::Arc,
95 task::{Context, Poll},
96 time::Duration,
97};
98
99macro_rules! grandpa_log {
103 ($condition:expr, $($msg: expr),+ $(,)?) => {
104 {
105 let log_level = if $condition {
106 log::Level::Debug
107 } else {
108 log::Level::Info
109 };
110
111 log::log!(target: LOG_TARGET, log_level, $($msg),+);
112 }
113 };
114}
115
116mod authorities;
117mod aux_schema;
118mod communication;
119mod environment;
120mod finality_proof;
121mod import;
122mod justification;
123mod notification;
124mod observer;
125mod until_imported;
126mod voting_rule;
127pub mod warp_proof;
128
129pub use authorities::{AuthoritySet, AuthoritySetChanges, SharedAuthoritySet};
130pub use aux_schema::best_justification;
131pub use communication::grandpa_protocol_name::standard_name as protocol_standard_name;
132pub use finality_grandpa::voter::report;
133pub use finality_proof::{FinalityProof, FinalityProofError, FinalityProofProvider};
134pub use import::{find_forced_change, find_scheduled_change, GrandpaBlockImport};
135pub use justification::GrandpaJustification;
136pub use notification::{GrandpaJustificationSender, GrandpaJustificationStream};
137pub use observer::run_grandpa_observer;
138pub use voting_rule::{
139 BeforeBestBlockBy, ThreeQuartersOfTheUnfinalizedChain, VotingRule, VotingRuleResult,
140 VotingRulesBuilder,
141};
142
143use aux_schema::PersistentData;
144use communication::{Network as NetworkT, NetworkBridge, Syncing as SyncingT};
145use environment::{Environment, VoterSetState};
146use until_imported::UntilGlobalMessageBlocksImported;
147
148pub use sp_consensus_grandpa::{
150 AuthorityId, AuthorityPair, CatchUp, Commit, CompactCommit, GrandpaApi, Message, Precommit,
151 Prevote, PrimaryPropose, ScheduledChange, SignedMessage, GRANDPA_ENGINE_ID,
152};
153use std::marker::PhantomData;
154
155#[derive(Debug, Clone)]
161pub struct GrandpaPruningFilter;
162
163impl sc_client_db::PruningFilter for GrandpaPruningFilter {
164 fn should_retain(&self, justifications: &sp_runtime::Justifications) -> bool {
165 justifications.get(GRANDPA_ENGINE_ID).is_some()
166 }
167}
168
169#[cfg(test)]
170mod tests;
171
172type CommunicationIn<Block> = voter::CommunicationIn<
176 <Block as BlockT>::Hash,
177 NumberFor<Block>,
178 AuthoritySignature,
179 AuthorityId,
180>;
181type CommunicationInH<Block, H> =
185 voter::CommunicationIn<H, NumberFor<Block>, AuthoritySignature, AuthorityId>;
186
187type CommunicationOutH<Block, H> =
191 voter::CommunicationOut<H, NumberFor<Block>, AuthoritySignature, AuthorityId>;
192
193pub struct SharedVoterState {
195 inner: Arc<RwLock<Option<Box<dyn voter::VoterState<AuthorityId> + Sync + Send>>>>,
196}
197
198impl SharedVoterState {
199 pub fn empty() -> Self {
201 Self { inner: Arc::new(RwLock::new(None)) }
202 }
203
204 fn reset(
205 &self,
206 voter_state: Box<dyn voter::VoterState<AuthorityId> + Sync + Send>,
207 ) -> Option<()> {
208 let mut shared_voter_state = self.inner.try_write_for(Duration::from_secs(1))?;
209
210 *shared_voter_state = Some(voter_state);
211 Some(())
212 }
213
214 pub fn voter_state(&self) -> Option<report::VoterState<AuthorityId>> {
216 self.inner.read().as_ref().map(|vs| vs.get())
217 }
218}
219
220impl Clone for SharedVoterState {
221 fn clone(&self) -> Self {
222 SharedVoterState { inner: self.inner.clone() }
223 }
224}
225
226#[derive(Clone)]
228pub struct Config {
229 pub gossip_duration: Duration,
231 pub justification_generation_period: u32,
235 pub observer_enabled: bool,
240 pub local_role: sc_network::config::Role,
242 pub name: Option<String>,
244 pub keystore: Option<KeystorePtr>,
246 pub telemetry: Option<TelemetryHandle>,
248 pub protocol_name: ProtocolName,
250}
251
252impl Config {
253 fn name(&self) -> &str {
254 self.name.as_deref().unwrap_or("<unknown>")
255 }
256}
257
258#[derive(Debug, thiserror::Error)]
260pub enum Error {
261 #[error("grandpa error: {0}")]
263 Grandpa(#[from] GrandpaError),
264
265 #[error("network error: {0}")]
267 Network(String),
268
269 #[error("blockchain error: {0}")]
271 Blockchain(String),
272
273 #[error("could not complete a round on disk: {0}")]
275 Client(#[from] ClientError),
276
277 #[error("could not sign outgoing message: {0}")]
279 Signing(String),
280
281 #[error("safety invariant has been violated: {0}")]
283 Safety(String),
284
285 #[error("a timer failed to fire: {0}")]
287 Timer(io::Error),
288
289 #[error("runtime API request failed: {0}")]
291 RuntimeApi(sp_api::ApiError),
292}
293
294pub(crate) trait BlockStatus<Block: BlockT> {
296 fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error>;
300}
301
302impl<Block: BlockT, Client> BlockStatus<Block> for Arc<Client>
303where
304 Client: HeaderBackend<Block>,
305 NumberFor<Block>: BlockNumberOps,
306{
307 fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error> {
308 self.block_number_from_id(&BlockId::Hash(hash))
309 .map_err(|e| Error::Blockchain(e.to_string()))
310 }
311}
312
313pub trait ClientForGrandpa<Block, BE>:
317 LockImportRun<Block, BE>
318 + Finalizer<Block, BE>
319 + AuxStore
320 + HeaderMetadata<Block, Error = sp_blockchain::Error>
321 + HeaderBackend<Block>
322 + BlockchainEvents<Block>
323 + ProvideRuntimeApi<Block>
324 + ExecutorProvider<Block>
325 + BlockImport<Block, Error = sp_consensus::Error>
326 + StorageProvider<Block, BE>
327where
328 BE: Backend<Block>,
329 Block: BlockT,
330{
331}
332
333impl<Block, BE, T> ClientForGrandpa<Block, BE> for T
334where
335 BE: Backend<Block>,
336 Block: BlockT,
337 T: LockImportRun<Block, BE>
338 + Finalizer<Block, BE>
339 + AuxStore
340 + HeaderMetadata<Block, Error = sp_blockchain::Error>
341 + HeaderBackend<Block>
342 + BlockchainEvents<Block>
343 + ProvideRuntimeApi<Block>
344 + ExecutorProvider<Block>
345 + BlockImport<Block, Error = sp_consensus::Error>
346 + StorageProvider<Block, BE>,
347{
348}
349
350pub(crate) trait BlockSyncRequester<Block: BlockT> {
352 fn set_sync_fork_request(
359 &self,
360 peers: Vec<sc_network_types::PeerId>,
361 hash: Block::Hash,
362 number: NumberFor<Block>,
363 );
364}
365
366impl<Block, Network, Syncing> BlockSyncRequester<Block> for NetworkBridge<Block, Network, Syncing>
367where
368 Block: BlockT,
369 Network: NetworkT<Block>,
370 Syncing: SyncingT<Block>,
371{
372 fn set_sync_fork_request(
373 &self,
374 peers: Vec<sc_network_types::PeerId>,
375 hash: Block::Hash,
376 number: NumberFor<Block>,
377 ) {
378 NetworkBridge::set_sync_fork_request(self, peers, hash, number)
379 }
380}
381
382#[derive(Debug)]
384pub(crate) struct NewAuthoritySet<H, N> {
385 pub(crate) canon_number: N,
386 pub(crate) canon_hash: H,
387 pub(crate) set_id: SetId,
388 pub(crate) authorities: AuthorityList,
389}
390
391#[derive(Debug)]
393pub(crate) enum VoterCommand<H, N> {
394 Pause(String),
396 ChangeAuthorities(NewAuthoritySet<H, N>),
398}
399
400impl<H, N> fmt::Display for VoterCommand<H, N> {
401 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
402 match *self {
403 VoterCommand::Pause(ref reason) => write!(f, "Pausing voter: {}", reason),
404 VoterCommand::ChangeAuthorities(_) => write!(f, "Changing authorities"),
405 }
406 }
407}
408
409#[derive(Debug)]
411pub(crate) enum CommandOrError<H, N> {
412 Error(Error),
414 VoterCommand(VoterCommand<H, N>),
416}
417
418impl<H, N> From<Error> for CommandOrError<H, N> {
419 fn from(e: Error) -> Self {
420 CommandOrError::Error(e)
421 }
422}
423
424impl<H, N> From<ClientError> for CommandOrError<H, N> {
425 fn from(e: ClientError) -> Self {
426 CommandOrError::Error(Error::Client(e))
427 }
428}
429
430impl<H, N> From<finality_grandpa::Error> for CommandOrError<H, N> {
431 fn from(e: finality_grandpa::Error) -> Self {
432 CommandOrError::Error(Error::from(e))
433 }
434}
435
436impl<H, N> From<VoterCommand<H, N>> for CommandOrError<H, N> {
437 fn from(e: VoterCommand<H, N>) -> Self {
438 CommandOrError::VoterCommand(e)
439 }
440}
441
442impl<H: fmt::Debug, N: fmt::Debug> ::std::error::Error for CommandOrError<H, N> {}
443
444impl<H, N> fmt::Display for CommandOrError<H, N> {
445 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
446 match *self {
447 CommandOrError::Error(ref e) => write!(f, "{}", e),
448 CommandOrError::VoterCommand(ref cmd) => write!(f, "{}", cmd),
449 }
450 }
451}
452
453pub struct LinkHalf<Block: BlockT, C, SC> {
455 client: Arc<C>,
456 select_chain: SC,
457 persistent_data: PersistentData<Block>,
458 voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
459 justification_sender: GrandpaJustificationSender<Block>,
460 justification_stream: GrandpaJustificationStream<Block>,
461 telemetry: Option<TelemetryHandle>,
462}
463
464impl<Block: BlockT, C, SC> LinkHalf<Block, C, SC> {
465 pub fn shared_authority_set(&self) -> &SharedAuthoritySet<Block::Hash, NumberFor<Block>> {
467 &self.persistent_data.authority_set
468 }
469
470 pub fn justification_stream(&self) -> GrandpaJustificationStream<Block> {
472 self.justification_stream.clone()
473 }
474}
475
476pub trait GenesisAuthoritySetProvider<Block: BlockT> {
478 fn get(&self) -> Result<AuthorityList, ClientError>;
480}
481
482impl<Block: BlockT, E, Client> GenesisAuthoritySetProvider<Block> for Arc<Client>
483where
484 E: CallExecutor<Block>,
485 Client: ExecutorProvider<Block, Executor = E> + HeaderBackend<Block>,
486{
487 fn get(&self) -> Result<AuthorityList, ClientError> {
488 self.executor()
489 .call(
490 self.expect_block_hash_from_id(&BlockId::Number(Zero::zero()))?,
491 "GrandpaApi_grandpa_authorities",
492 &[],
493 CallContext::Offchain,
494 )
495 .and_then(|call_result| {
496 Decode::decode(&mut &call_result[..]).map_err(|err| {
497 ClientError::CallResultDecode(
498 "failed to decode GRANDPA authorities set proof",
499 err,
500 )
501 })
502 })
503 }
504}
505
506pub fn block_import<BE, Block: BlockT, Client, SC>(
516 client: Arc<Client>,
517 justification_import_period: u32,
518 genesis_authorities_provider: &dyn GenesisAuthoritySetProvider<Block>,
519 select_chain: SC,
520 telemetry: Option<TelemetryHandle>,
521) -> Result<(GrandpaBlockImport<BE, Block, Client, SC>, LinkHalf<Block, Client, SC>), ClientError>
522where
523 SC: SelectChain<Block>,
524 BE: Backend<Block> + 'static,
525 Client: ClientForGrandpa<Block, BE> + 'static,
526{
527 block_import_with_authority_set_hard_forks(
528 client,
529 justification_import_period,
530 genesis_authorities_provider,
531 select_chain,
532 Default::default(),
533 telemetry,
534 )
535}
536
537pub struct AuthoritySetHardFork<Block: BlockT> {
541 pub set_id: SetId,
543 pub block: (Block::Hash, NumberFor<Block>),
545 pub authorities: AuthorityList,
547 pub last_finalized: Option<NumberFor<Block>>,
553}
554
555pub fn block_import_with_authority_set_hard_forks<BE, Block: BlockT, Client, SC>(
561 client: Arc<Client>,
562 justification_import_period: u32,
563 genesis_authorities_provider: &dyn GenesisAuthoritySetProvider<Block>,
564 select_chain: SC,
565 authority_set_hard_forks: Vec<AuthoritySetHardFork<Block>>,
566 telemetry: Option<TelemetryHandle>,
567) -> Result<(GrandpaBlockImport<BE, Block, Client, SC>, LinkHalf<Block, Client, SC>), ClientError>
568where
569 SC: SelectChain<Block>,
570 BE: Backend<Block> + 'static,
571 Client: ClientForGrandpa<Block, BE> + 'static,
572{
573 let chain_info = client.info();
574 let genesis_hash = chain_info.genesis_hash;
575
576 let persistent_data =
577 aux_schema::load_persistent(&*client, genesis_hash, <NumberFor<Block>>::zero(), {
578 let telemetry = telemetry.clone();
579 move || {
580 let authorities = genesis_authorities_provider.get()?;
581 telemetry!(
582 telemetry;
583 CONSENSUS_DEBUG;
584 "afg.loading_authorities";
585 "authorities_len" => ?authorities.len()
586 );
587 Ok(authorities)
588 }
589 })?;
590
591 let (voter_commands_tx, voter_commands_rx) =
592 tracing_unbounded("mpsc_grandpa_voter_command", 100_000);
593
594 let (justification_sender, justification_stream) = GrandpaJustificationStream::channel();
595
596 let authority_set_hard_forks = authority_set_hard_forks
598 .into_iter()
599 .map(|fork| {
600 let delay_kind = if let Some(last_finalized) = fork.last_finalized {
601 authorities::DelayKind::Best { median_last_finalized: last_finalized }
602 } else {
603 authorities::DelayKind::Finalized
604 };
605
606 (
607 fork.set_id,
608 authorities::PendingChange {
609 next_authorities: fork.authorities,
610 delay: Zero::zero(),
611 canon_hash: fork.block.0,
612 canon_height: fork.block.1,
613 delay_kind,
614 },
615 )
616 })
617 .collect();
618
619 Ok((
620 GrandpaBlockImport::new(
621 client.clone(),
622 justification_import_period,
623 select_chain.clone(),
624 persistent_data.authority_set.clone(),
625 voter_commands_tx,
626 authority_set_hard_forks,
627 justification_sender.clone(),
628 telemetry.clone(),
629 ),
630 LinkHalf {
631 client,
632 select_chain,
633 persistent_data,
634 voter_commands_rx,
635 justification_sender,
636 justification_stream,
637 telemetry,
638 },
639 ))
640}
641
642fn global_communication<BE, Block: BlockT, C, N, S>(
643 set_id: SetId,
644 voters: &Arc<VoterSet<AuthorityId>>,
645 client: Arc<C>,
646 network: &NetworkBridge<Block, N, S>,
647 keystore: Option<&KeystorePtr>,
648 metrics: Option<until_imported::Metrics>,
649) -> (
650 impl Stream<
651 Item = Result<
652 CommunicationInH<Block, Block::Hash>,
653 CommandOrError<Block::Hash, NumberFor<Block>>,
654 >,
655 >,
656 impl Sink<
657 CommunicationOutH<Block, Block::Hash>,
658 Error = CommandOrError<Block::Hash, NumberFor<Block>>,
659 >,
660)
661where
662 BE: Backend<Block> + 'static,
663 C: ClientForGrandpa<Block, BE> + 'static,
664 N: NetworkT<Block>,
665 S: SyncingT<Block>,
666 NumberFor<Block>: BlockNumberOps,
667{
668 let is_voter = local_authority_id(voters, keystore).is_some();
669
670 let (global_in, global_out) =
672 network.global_communication(communication::SetId(set_id), voters.clone(), is_voter);
673
674 let global_in = UntilGlobalMessageBlocksImported::new(
676 client.import_notification_stream(),
677 network.clone(),
678 client.clone(),
679 global_in,
680 "global",
681 metrics,
682 );
683
684 let global_in = global_in.map_err(CommandOrError::from);
685 let global_out = global_out.sink_map_err(CommandOrError::from);
686
687 (global_in, global_out)
688}
689
690pub struct GrandpaParams<Block: BlockT, C, N, S, SC, VR> {
692 pub config: Config,
694 pub link: LinkHalf<Block, C, SC>,
696 pub network: N,
702 pub sync: S,
704 pub notification_service: Box<dyn NotificationService>,
706 pub voting_rule: VR,
708 pub prometheus_registry: Option<prometheus_endpoint::Registry>,
710 pub shared_voter_state: SharedVoterState,
712 pub telemetry: Option<TelemetryHandle>,
714 pub offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
719}
720
721pub fn grandpa_peers_set_config<B: BlockT, N: NetworkBackend<B, <B as BlockT>::Hash>>(
725 protocol_name: ProtocolName,
726 metrics: sc_network::service::NotificationMetrics,
727 peer_store_handle: Arc<dyn sc_network::peer_store::PeerStoreProvider>,
728) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
729 use communication::grandpa_protocol_name;
730 N::notification_config(
731 protocol_name,
732 grandpa_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect(),
733 1024 * 1024,
735 None,
736 sc_network::config::SetConfig {
737 in_peers: 0,
738 out_peers: 0,
739 reserved_nodes: Vec::new(),
740 non_reserved_mode: sc_network::config::NonReservedPeerMode::Deny,
741 },
742 metrics,
743 peer_store_handle,
744 )
745}
746
747pub fn run_grandpa_voter<Block: BlockT, BE: 'static, C, N, S, SC, VR>(
750 grandpa_params: GrandpaParams<Block, C, N, S, SC, VR>,
751) -> sp_blockchain::Result<impl Future<Output = ()> + Send>
752where
753 BE: Backend<Block> + 'static,
754 N: NetworkT<Block> + Sync + 'static,
755 S: SyncingT<Block> + Sync + 'static,
756 SC: SelectChain<Block> + 'static,
757 VR: VotingRule<Block, C> + Clone + 'static,
758 NumberFor<Block>: BlockNumberOps,
759 C: ClientForGrandpa<Block, BE> + 'static,
760 C::Api: GrandpaApi<Block>,
761{
762 let GrandpaParams {
763 mut config,
764 link,
765 network,
766 sync,
767 notification_service,
768 voting_rule,
769 prometheus_registry,
770 shared_voter_state,
771 telemetry,
772 offchain_tx_pool_factory,
773 } = grandpa_params;
774
775 config.observer_enabled = false;
780
781 let LinkHalf {
782 client,
783 select_chain,
784 persistent_data,
785 voter_commands_rx,
786 justification_sender,
787 justification_stream: _,
788 telemetry: _,
789 } = link;
790
791 let network = NetworkBridge::new(
792 network,
793 sync,
794 notification_service,
795 config.clone(),
796 persistent_data.set_state.clone(),
797 prometheus_registry.as_ref(),
798 telemetry.clone(),
799 );
800
801 let conf = config.clone();
802 let telemetry_task =
803 if let Some(telemetry_on_connect) = telemetry.as_ref().map(|x| x.on_connect_stream()) {
804 let authorities = persistent_data.authority_set.clone();
805 let telemetry = telemetry.clone();
806 let events = telemetry_on_connect.for_each(move |_| {
807 let current_authorities = authorities.current_authorities();
808 let set_id = authorities.set_id();
809 let maybe_authority_id =
810 local_authority_id(¤t_authorities, conf.keystore.as_ref());
811
812 let authorities =
813 current_authorities.iter().map(|(id, _)| id.to_string()).collect::<Vec<_>>();
814
815 let authorities = serde_json::to_string(&authorities).expect(
816 "authorities is always at least an empty vector; \
817 elements are always of type string",
818 );
819
820 telemetry!(
821 telemetry;
822 CONSENSUS_INFO;
823 "afg.authority_set";
824 "authority_id" => maybe_authority_id.map_or("".into(), |s| s.to_string()),
825 "authority_set_id" => ?set_id,
826 "authorities" => authorities,
827 );
828
829 future::ready(())
830 });
831 future::Either::Left(events)
832 } else {
833 future::Either::Right(future::pending())
834 };
835
836 let voter_work = VoterWork::new(
837 client,
838 config,
839 network,
840 select_chain,
841 voting_rule,
842 persistent_data,
843 voter_commands_rx,
844 prometheus_registry,
845 shared_voter_state,
846 justification_sender,
847 telemetry,
848 offchain_tx_pool_factory,
849 );
850
851 let voter_work = voter_work.map(|res| match res {
852 Ok(()) => error!(
853 target: LOG_TARGET,
854 "GRANDPA voter future has concluded naturally, this should be unreachable."
855 ),
856 Err(e) => error!(target: LOG_TARGET, "GRANDPA voter error: {}", e),
857 });
858
859 let telemetry_task = telemetry_task.then(|_| future::pending::<()>());
861
862 Ok(future::select(voter_work, telemetry_task).map(drop))
863}
864
865struct Metrics {
866 environment: environment::Metrics,
867 until_imported: until_imported::Metrics,
868}
869
870impl Metrics {
871 fn register(registry: &Registry) -> Result<Self, PrometheusError> {
872 Ok(Metrics {
873 environment: environment::Metrics::register(registry)?,
874 until_imported: until_imported::Metrics::register(registry)?,
875 })
876 }
877}
878
879#[must_use]
881struct VoterWork<B, Block: BlockT, C, N: NetworkT<Block>, S: SyncingT<Block>, SC, VR> {
882 voter: Pin<
883 Box<dyn Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> + Send>,
884 >,
885 shared_voter_state: SharedVoterState,
886 env: Arc<Environment<B, Block, C, N, S, SC, VR>>,
887 voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
888 network: NetworkBridge<Block, N, S>,
889 telemetry: Option<TelemetryHandle>,
890 metrics: Option<Metrics>,
892}
893
894impl<B, Block, C, N, S, SC, VR> VoterWork<B, Block, C, N, S, SC, VR>
895where
896 Block: BlockT,
897 B: Backend<Block> + 'static,
898 C: ClientForGrandpa<Block, B> + 'static,
899 C::Api: GrandpaApi<Block>,
900 N: NetworkT<Block> + Sync,
901 S: SyncingT<Block> + Sync,
902 NumberFor<Block>: BlockNumberOps,
903 SC: SelectChain<Block> + 'static,
904 VR: VotingRule<Block, C> + Clone + 'static,
905{
906 fn new(
907 client: Arc<C>,
908 config: Config,
909 network: NetworkBridge<Block, N, S>,
910 select_chain: SC,
911 voting_rule: VR,
912 persistent_data: PersistentData<Block>,
913 voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
914 prometheus_registry: Option<prometheus_endpoint::Registry>,
915 shared_voter_state: SharedVoterState,
916 justification_sender: GrandpaJustificationSender<Block>,
917 telemetry: Option<TelemetryHandle>,
918 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
919 ) -> Self {
920 let metrics = match prometheus_registry.as_ref().map(Metrics::register) {
921 Some(Ok(metrics)) => Some(metrics),
922 Some(Err(e)) => {
923 debug!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
924 None
925 },
926 None => None,
927 };
928
929 let voters = persistent_data.authority_set.current_authorities();
930 let env = Arc::new(Environment {
931 client,
932 select_chain,
933 voting_rule,
934 voters: Arc::new(voters),
935 config,
936 network: network.clone(),
937 set_id: persistent_data.authority_set.set_id(),
938 authority_set: persistent_data.authority_set.clone(),
939 voter_set_state: persistent_data.set_state,
940 metrics: metrics.as_ref().map(|m| m.environment.clone()),
941 justification_sender: Some(justification_sender),
942 telemetry: telemetry.clone(),
943 offchain_tx_pool_factory,
944 _phantom: PhantomData,
945 });
946
947 let mut work = VoterWork {
948 voter: Box::pin(future::pending()),
951 shared_voter_state,
952 env,
953 voter_commands_rx,
954 network,
955 telemetry,
956 metrics,
957 };
958 work.rebuild_voter();
959 work
960 }
961
962 fn rebuild_voter(&mut self) {
966 debug!(
967 target: LOG_TARGET,
968 "{}: Starting new voter with set ID {}",
969 self.env.config.name(),
970 self.env.set_id
971 );
972
973 let maybe_authority_id =
974 local_authority_id(&self.env.voters, self.env.config.keystore.as_ref());
975 let authority_id = maybe_authority_id.map_or("<unknown>".into(), |s| s.to_string());
976
977 telemetry!(
978 self.telemetry;
979 CONSENSUS_DEBUG;
980 "afg.starting_new_voter";
981 "name" => ?self.env.config.name(),
982 "set_id" => ?self.env.set_id,
983 "authority_id" => authority_id,
984 );
985
986 let chain_info = self.env.client.info();
987
988 let authorities = self.env.voters.iter().map(|(id, _)| id.to_string()).collect::<Vec<_>>();
989
990 let authorities = serde_json::to_string(&authorities).expect(
991 "authorities is always at least an empty vector; elements are always of type string; qed.",
992 );
993
994 telemetry!(
995 self.telemetry;
996 CONSENSUS_INFO;
997 "afg.authority_set";
998 "number" => ?chain_info.finalized_number,
999 "hash" => ?chain_info.finalized_hash,
1000 "authority_id" => authority_id,
1001 "authority_set_id" => ?self.env.set_id,
1002 "authorities" => authorities,
1003 );
1004
1005 match &*self.env.voter_set_state.read() {
1006 VoterSetState::Live { completed_rounds, .. } => {
1007 let last_finalized = (chain_info.finalized_hash, chain_info.finalized_number);
1008
1009 let global_comms = global_communication(
1010 self.env.set_id,
1011 &self.env.voters,
1012 self.env.client.clone(),
1013 &self.env.network,
1014 self.env.config.keystore.as_ref(),
1015 self.metrics.as_ref().map(|m| m.until_imported.clone()),
1016 );
1017
1018 let last_completed_round = completed_rounds.last();
1019
1020 let voter = voter::Voter::new(
1021 self.env.clone(),
1022 (*self.env.voters).clone(),
1023 global_comms,
1024 last_completed_round.number,
1025 last_completed_round.votes.clone(),
1026 last_completed_round.base,
1027 last_finalized,
1028 );
1029
1030 if self.shared_voter_state.reset(voter.voter_state()).is_none() {
1032 info!(
1033 target: LOG_TARGET,
1034 "Timed out trying to update shared GRANDPA voter state. \
1035 RPC endpoints may return stale data."
1036 );
1037 }
1038
1039 self.voter = Box::pin(voter);
1040 },
1041 VoterSetState::Paused { .. } => self.voter = Box::pin(future::pending()),
1042 };
1043 }
1044
1045 fn handle_voter_command(
1046 &mut self,
1047 command: VoterCommand<Block::Hash, NumberFor<Block>>,
1048 ) -> Result<(), Error> {
1049 match command {
1050 VoterCommand::ChangeAuthorities(new) => {
1051 let voters: Vec<String> =
1052 new.authorities.iter().map(move |(a, _)| format!("{}", a)).collect();
1053 telemetry!(
1054 self.telemetry;
1055 CONSENSUS_INFO;
1056 "afg.voter_command_change_authorities";
1057 "number" => ?new.canon_number,
1058 "hash" => ?new.canon_hash,
1059 "voters" => ?voters,
1060 "set_id" => ?new.set_id,
1061 );
1062
1063 self.env.update_voter_set_state(|_| {
1064 let set_state = VoterSetState::live(
1067 new.set_id,
1068 &*self.env.authority_set.inner(),
1069 (new.canon_hash, new.canon_number),
1070 );
1071
1072 aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
1073 Ok(Some(set_state))
1074 })?;
1075
1076 let voters = Arc::new(VoterSet::new(new.authorities.into_iter()).expect(
1077 "new authorities come from pending change; pending change comes from \
1078 `AuthoritySet`; `AuthoritySet` validates authorities is non-empty and \
1079 weights are non-zero; qed.",
1080 ));
1081
1082 self.env = Arc::new(Environment {
1083 voters,
1084 set_id: new.set_id,
1085 voter_set_state: self.env.voter_set_state.clone(),
1086 client: self.env.client.clone(),
1087 select_chain: self.env.select_chain.clone(),
1088 config: self.env.config.clone(),
1089 authority_set: self.env.authority_set.clone(),
1090 network: self.env.network.clone(),
1091 voting_rule: self.env.voting_rule.clone(),
1092 metrics: self.env.metrics.clone(),
1093 justification_sender: self.env.justification_sender.clone(),
1094 telemetry: self.telemetry.clone(),
1095 offchain_tx_pool_factory: self.env.offchain_tx_pool_factory.clone(),
1096 _phantom: PhantomData,
1097 });
1098
1099 self.rebuild_voter();
1100 Ok(())
1101 },
1102 VoterCommand::Pause(reason) => {
1103 info!(target: LOG_TARGET, "Pausing old validator set: {}", reason);
1104
1105 self.env.update_voter_set_state(|voter_set_state| {
1107 let completed_rounds = voter_set_state.completed_rounds();
1108 let set_state = VoterSetState::Paused { completed_rounds };
1109
1110 aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
1111 Ok(Some(set_state))
1112 })?;
1113
1114 self.rebuild_voter();
1115 Ok(())
1116 },
1117 }
1118 }
1119}
1120
1121impl<B, Block, C, N, S, SC, VR> Future for VoterWork<B, Block, C, N, S, SC, VR>
1122where
1123 Block: BlockT,
1124 B: Backend<Block> + 'static,
1125 N: NetworkT<Block> + Sync,
1126 S: SyncingT<Block> + Sync,
1127 NumberFor<Block>: BlockNumberOps,
1128 SC: SelectChain<Block> + 'static,
1129 C: ClientForGrandpa<Block, B> + 'static,
1130 C::Api: GrandpaApi<Block>,
1131 VR: VotingRule<Block, C> + Clone + 'static,
1132{
1133 type Output = Result<(), Error>;
1134
1135 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1136 match Future::poll(Pin::new(&mut self.voter), cx) {
1137 Poll::Pending => {},
1138 Poll::Ready(Ok(())) => {
1139 return Poll::Ready(Err(Error::Safety(
1141 "consensus-grandpa inner voter has concluded.".into(),
1142 )));
1143 },
1144 Poll::Ready(Err(CommandOrError::Error(e))) => {
1145 return Poll::Ready(Err(e));
1147 },
1148 Poll::Ready(Err(CommandOrError::VoterCommand(command))) => {
1149 self.handle_voter_command(command)?;
1151 cx.waker().wake_by_ref();
1152 },
1153 }
1154
1155 match Stream::poll_next(Pin::new(&mut self.voter_commands_rx), cx) {
1156 Poll::Pending => {},
1157 Poll::Ready(None) => {
1158 return Poll::Ready(Err(Error::Safety("`voter_commands_rx` was closed.".into())));
1160 },
1161 Poll::Ready(Some(command)) => {
1162 self.handle_voter_command(command)?;
1164 cx.waker().wake_by_ref();
1165 },
1166 }
1167
1168 Future::poll(Pin::new(&mut self.network), cx)
1169 }
1170}
1171
1172fn local_authority_id(
1176 voters: &VoterSet<AuthorityId>,
1177 keystore: Option<&KeystorePtr>,
1178) -> Option<AuthorityId> {
1179 keystore.and_then(|keystore| {
1180 voters
1181 .iter()
1182 .find(|(p, _)| keystore.has_keys(&[(p.to_raw_vec(), AuthorityId::ID)]))
1183 .map(|(p, _)| p.clone())
1184 })
1185}
1186
1187pub fn revert<Block, Client>(client: Arc<Client>, blocks: NumberFor<Block>) -> ClientResult<()>
1191where
1192 Block: BlockT,
1193 Client: AuxStore + HeaderMetadata<Block, Error = ClientError> + HeaderBackend<Block>,
1194{
1195 let best_number = client.info().best_number;
1196 let finalized = client.info().finalized_number;
1197
1198 let revertible = blocks.min(best_number - finalized);
1199 if revertible == Zero::zero() {
1200 return Ok(());
1201 }
1202
1203 let number = best_number - revertible;
1204 let hash = client
1205 .block_hash_from_id(&BlockId::Number(number))?
1206 .ok_or(ClientError::Backend(format!(
1207 "Unexpected hash lookup failure for block number: {}",
1208 number
1209 )))?;
1210
1211 let info = client.info();
1212
1213 let persistent_data: PersistentData<Block> =
1214 aux_schema::load_persistent(&*client, info.genesis_hash, Zero::zero(), || {
1215 const MSG: &str = "Unexpected missing grandpa data during revert";
1216 Err(ClientError::Application(Box::from(MSG)))
1217 })?;
1218
1219 let shared_authority_set = persistent_data.authority_set;
1220 let mut authority_set = shared_authority_set.inner();
1221
1222 let is_descendent_of = is_descendent_of(&*client, None);
1223 authority_set.revert(hash, number, &is_descendent_of);
1224
1225 let (set_id, set_ref) = authority_set.current();
1227 let new_set = Some(NewAuthoritySet {
1228 canon_hash: info.finalized_hash,
1229 canon_number: info.finalized_number,
1230 set_id,
1231 authorities: set_ref.to_vec(),
1232 });
1233 aux_schema::update_authority_set::<Block, _, _>(&authority_set, new_set.as_ref(), |values| {
1234 client.insert_aux(values, None)
1235 })
1236}