1#![warn(missing_docs)]
58
59use codec::Decode;
60use futures::{prelude::*, StreamExt};
61use log::{debug, error, info};
62use parking_lot::RwLock;
63use prometheus_endpoint::{PrometheusError, Registry};
64use sc_client_api::{
65 backend::{AuxStore, Backend},
66 utils::is_descendent_of,
67 BlockchainEvents, CallExecutor, ExecutorProvider, Finalizer, LockImportRun, StorageProvider,
68};
69use sc_consensus::BlockImport;
70use sc_network::{types::ProtocolName, NetworkBackend, NotificationService};
71use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO};
72use sc_transaction_pool_api::OffchainTransactionPoolFactory;
73use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
74use sp_api::ProvideRuntimeApi;
75use sp_application_crypto::AppCrypto;
76use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata, Result as ClientResult};
77use sp_consensus::SelectChain;
78use sp_consensus_grandpa::{
79 AuthorityList, AuthoritySignature, SetId, CLIENT_LOG_TARGET as LOG_TARGET,
80};
81use sp_core::{crypto::ByteArray, traits::CallContext};
82use sp_keystore::KeystorePtr;
83use sp_runtime::{
84 generic::BlockId,
85 traits::{Block as BlockT, NumberFor, Zero},
86};
87
88pub use finality_grandpa::BlockNumberOps;
89use finality_grandpa::{voter, voter_set::VoterSet, Error as GrandpaError};
90
91use std::{
92 fmt, io,
93 pin::Pin,
94 sync::Arc,
95 task::{Context, Poll},
96 time::Duration,
97};
98
99macro_rules! grandpa_log {
103 ($condition:expr, $($msg: expr),+ $(,)?) => {
104 {
105 let log_level = if $condition {
106 log::Level::Debug
107 } else {
108 log::Level::Info
109 };
110
111 log::log!(target: LOG_TARGET, log_level, $($msg),+);
112 }
113 };
114}
115
116mod authorities;
117mod aux_schema;
118mod communication;
119mod environment;
120mod finality_proof;
121mod import;
122mod justification;
123mod notification;
124mod observer;
125mod until_imported;
126mod voting_rule;
127pub mod warp_proof;
128
129pub use authorities::{AuthoritySet, AuthoritySetChanges, SharedAuthoritySet};
130pub use aux_schema::best_justification;
131pub use communication::grandpa_protocol_name::standard_name as protocol_standard_name;
132pub use finality_grandpa::voter::report;
133pub use finality_proof::{FinalityProof, FinalityProofError, FinalityProofProvider};
134pub use import::{find_forced_change, find_scheduled_change, GrandpaBlockImport};
135pub use justification::GrandpaJustification;
136pub use notification::{GrandpaJustificationSender, GrandpaJustificationStream};
137pub use observer::run_grandpa_observer;
138pub use voting_rule::{
139 BeforeBestBlockBy, ThreeQuartersOfTheUnfinalizedChain, VotingRule, VotingRuleResult,
140 VotingRulesBuilder,
141};
142
143use aux_schema::PersistentData;
144use communication::{Network as NetworkT, NetworkBridge, Syncing as SyncingT};
145use environment::{Environment, VoterSetState};
146use until_imported::UntilGlobalMessageBlocksImported;
147
148pub use sp_consensus_grandpa::{
150 AuthorityId, AuthorityPair, CatchUp, Commit, CompactCommit, GrandpaApi, Message, Precommit,
151 Prevote, PrimaryPropose, ScheduledChange, SignedMessage,
152};
153use std::marker::PhantomData;
154
155#[cfg(test)]
156mod tests;
157
158type CommunicationIn<Block> = voter::CommunicationIn<
162 <Block as BlockT>::Hash,
163 NumberFor<Block>,
164 AuthoritySignature,
165 AuthorityId,
166>;
167type CommunicationInH<Block, H> =
171 voter::CommunicationIn<H, NumberFor<Block>, AuthoritySignature, AuthorityId>;
172
173type CommunicationOutH<Block, H> =
177 voter::CommunicationOut<H, NumberFor<Block>, AuthoritySignature, AuthorityId>;
178
179pub struct SharedVoterState {
181 inner: Arc<RwLock<Option<Box<dyn voter::VoterState<AuthorityId> + Sync + Send>>>>,
182}
183
184impl SharedVoterState {
185 pub fn empty() -> Self {
187 Self { inner: Arc::new(RwLock::new(None)) }
188 }
189
190 fn reset(
191 &self,
192 voter_state: Box<dyn voter::VoterState<AuthorityId> + Sync + Send>,
193 ) -> Option<()> {
194 let mut shared_voter_state = self.inner.try_write_for(Duration::from_secs(1))?;
195
196 *shared_voter_state = Some(voter_state);
197 Some(())
198 }
199
200 pub fn voter_state(&self) -> Option<report::VoterState<AuthorityId>> {
202 self.inner.read().as_ref().map(|vs| vs.get())
203 }
204}
205
206impl Clone for SharedVoterState {
207 fn clone(&self) -> Self {
208 SharedVoterState { inner: self.inner.clone() }
209 }
210}
211
212#[derive(Clone)]
214pub struct Config {
215 pub gossip_duration: Duration,
217 pub justification_generation_period: u32,
221 pub observer_enabled: bool,
226 pub local_role: sc_network::config::Role,
228 pub name: Option<String>,
230 pub keystore: Option<KeystorePtr>,
232 pub telemetry: Option<TelemetryHandle>,
234 pub protocol_name: ProtocolName,
236}
237
238impl Config {
239 fn name(&self) -> &str {
240 self.name.as_deref().unwrap_or("<unknown>")
241 }
242}
243
244#[derive(Debug, thiserror::Error)]
246pub enum Error {
247 #[error("grandpa error: {0}")]
249 Grandpa(#[from] GrandpaError),
250
251 #[error("network error: {0}")]
253 Network(String),
254
255 #[error("blockchain error: {0}")]
257 Blockchain(String),
258
259 #[error("could not complete a round on disk: {0}")]
261 Client(#[from] ClientError),
262
263 #[error("could not sign outgoing message: {0}")]
265 Signing(String),
266
267 #[error("safety invariant has been violated: {0}")]
269 Safety(String),
270
271 #[error("a timer failed to fire: {0}")]
273 Timer(io::Error),
274
275 #[error("runtime API request failed: {0}")]
277 RuntimeApi(sp_api::ApiError),
278}
279
280pub(crate) trait BlockStatus<Block: BlockT> {
282 fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error>;
286}
287
288impl<Block: BlockT, Client> BlockStatus<Block> for Arc<Client>
289where
290 Client: HeaderBackend<Block>,
291 NumberFor<Block>: BlockNumberOps,
292{
293 fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error> {
294 self.block_number_from_id(&BlockId::Hash(hash))
295 .map_err(|e| Error::Blockchain(e.to_string()))
296 }
297}
298
299pub trait ClientForGrandpa<Block, BE>:
303 LockImportRun<Block, BE>
304 + Finalizer<Block, BE>
305 + AuxStore
306 + HeaderMetadata<Block, Error = sp_blockchain::Error>
307 + HeaderBackend<Block>
308 + BlockchainEvents<Block>
309 + ProvideRuntimeApi<Block>
310 + ExecutorProvider<Block>
311 + BlockImport<Block, Error = sp_consensus::Error>
312 + StorageProvider<Block, BE>
313where
314 BE: Backend<Block>,
315 Block: BlockT,
316{
317}
318
319impl<Block, BE, T> ClientForGrandpa<Block, BE> for T
320where
321 BE: Backend<Block>,
322 Block: BlockT,
323 T: LockImportRun<Block, BE>
324 + Finalizer<Block, BE>
325 + AuxStore
326 + HeaderMetadata<Block, Error = sp_blockchain::Error>
327 + HeaderBackend<Block>
328 + BlockchainEvents<Block>
329 + ProvideRuntimeApi<Block>
330 + ExecutorProvider<Block>
331 + BlockImport<Block, Error = sp_consensus::Error>
332 + StorageProvider<Block, BE>,
333{
334}
335
336pub(crate) trait BlockSyncRequester<Block: BlockT> {
338 fn set_sync_fork_request(
345 &self,
346 peers: Vec<sc_network_types::PeerId>,
347 hash: Block::Hash,
348 number: NumberFor<Block>,
349 );
350}
351
352impl<Block, Network, Syncing> BlockSyncRequester<Block> for NetworkBridge<Block, Network, Syncing>
353where
354 Block: BlockT,
355 Network: NetworkT<Block>,
356 Syncing: SyncingT<Block>,
357{
358 fn set_sync_fork_request(
359 &self,
360 peers: Vec<sc_network_types::PeerId>,
361 hash: Block::Hash,
362 number: NumberFor<Block>,
363 ) {
364 NetworkBridge::set_sync_fork_request(self, peers, hash, number)
365 }
366}
367
368#[derive(Debug)]
370pub(crate) struct NewAuthoritySet<H, N> {
371 pub(crate) canon_number: N,
372 pub(crate) canon_hash: H,
373 pub(crate) set_id: SetId,
374 pub(crate) authorities: AuthorityList,
375}
376
377#[derive(Debug)]
379pub(crate) enum VoterCommand<H, N> {
380 Pause(String),
382 ChangeAuthorities(NewAuthoritySet<H, N>),
384}
385
386impl<H, N> fmt::Display for VoterCommand<H, N> {
387 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
388 match *self {
389 VoterCommand::Pause(ref reason) => write!(f, "Pausing voter: {}", reason),
390 VoterCommand::ChangeAuthorities(_) => write!(f, "Changing authorities"),
391 }
392 }
393}
394
395#[derive(Debug)]
397pub(crate) enum CommandOrError<H, N> {
398 Error(Error),
400 VoterCommand(VoterCommand<H, N>),
402}
403
404impl<H, N> From<Error> for CommandOrError<H, N> {
405 fn from(e: Error) -> Self {
406 CommandOrError::Error(e)
407 }
408}
409
410impl<H, N> From<ClientError> for CommandOrError<H, N> {
411 fn from(e: ClientError) -> Self {
412 CommandOrError::Error(Error::Client(e))
413 }
414}
415
416impl<H, N> From<finality_grandpa::Error> for CommandOrError<H, N> {
417 fn from(e: finality_grandpa::Error) -> Self {
418 CommandOrError::Error(Error::from(e))
419 }
420}
421
422impl<H, N> From<VoterCommand<H, N>> for CommandOrError<H, N> {
423 fn from(e: VoterCommand<H, N>) -> Self {
424 CommandOrError::VoterCommand(e)
425 }
426}
427
428impl<H: fmt::Debug, N: fmt::Debug> ::std::error::Error for CommandOrError<H, N> {}
429
430impl<H, N> fmt::Display for CommandOrError<H, N> {
431 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
432 match *self {
433 CommandOrError::Error(ref e) => write!(f, "{}", e),
434 CommandOrError::VoterCommand(ref cmd) => write!(f, "{}", cmd),
435 }
436 }
437}
438
439pub struct LinkHalf<Block: BlockT, C, SC> {
441 client: Arc<C>,
442 select_chain: SC,
443 persistent_data: PersistentData<Block>,
444 voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
445 justification_sender: GrandpaJustificationSender<Block>,
446 justification_stream: GrandpaJustificationStream<Block>,
447 telemetry: Option<TelemetryHandle>,
448}
449
450impl<Block: BlockT, C, SC> LinkHalf<Block, C, SC> {
451 pub fn shared_authority_set(&self) -> &SharedAuthoritySet<Block::Hash, NumberFor<Block>> {
453 &self.persistent_data.authority_set
454 }
455
456 pub fn justification_stream(&self) -> GrandpaJustificationStream<Block> {
458 self.justification_stream.clone()
459 }
460}
461
462pub trait GenesisAuthoritySetProvider<Block: BlockT> {
464 fn get(&self) -> Result<AuthorityList, ClientError>;
466}
467
468impl<Block: BlockT, E, Client> GenesisAuthoritySetProvider<Block> for Arc<Client>
469where
470 E: CallExecutor<Block>,
471 Client: ExecutorProvider<Block, Executor = E> + HeaderBackend<Block>,
472{
473 fn get(&self) -> Result<AuthorityList, ClientError> {
474 self.executor()
475 .call(
476 self.expect_block_hash_from_id(&BlockId::Number(Zero::zero()))?,
477 "GrandpaApi_grandpa_authorities",
478 &[],
479 CallContext::Offchain,
480 )
481 .and_then(|call_result| {
482 Decode::decode(&mut &call_result[..]).map_err(|err| {
483 ClientError::CallResultDecode(
484 "failed to decode GRANDPA authorities set proof",
485 err,
486 )
487 })
488 })
489 }
490}
491
492pub fn block_import<BE, Block: BlockT, Client, SC>(
502 client: Arc<Client>,
503 justification_import_period: u32,
504 genesis_authorities_provider: &dyn GenesisAuthoritySetProvider<Block>,
505 select_chain: SC,
506 telemetry: Option<TelemetryHandle>,
507) -> Result<(GrandpaBlockImport<BE, Block, Client, SC>, LinkHalf<Block, Client, SC>), ClientError>
508where
509 SC: SelectChain<Block>,
510 BE: Backend<Block> + 'static,
511 Client: ClientForGrandpa<Block, BE> + 'static,
512{
513 block_import_with_authority_set_hard_forks(
514 client,
515 justification_import_period,
516 genesis_authorities_provider,
517 select_chain,
518 Default::default(),
519 telemetry,
520 )
521}
522
523pub struct AuthoritySetHardFork<Block: BlockT> {
527 pub set_id: SetId,
529 pub block: (Block::Hash, NumberFor<Block>),
531 pub authorities: AuthorityList,
533 pub last_finalized: Option<NumberFor<Block>>,
539}
540
541pub fn block_import_with_authority_set_hard_forks<BE, Block: BlockT, Client, SC>(
547 client: Arc<Client>,
548 justification_import_period: u32,
549 genesis_authorities_provider: &dyn GenesisAuthoritySetProvider<Block>,
550 select_chain: SC,
551 authority_set_hard_forks: Vec<AuthoritySetHardFork<Block>>,
552 telemetry: Option<TelemetryHandle>,
553) -> Result<(GrandpaBlockImport<BE, Block, Client, SC>, LinkHalf<Block, Client, SC>), ClientError>
554where
555 SC: SelectChain<Block>,
556 BE: Backend<Block> + 'static,
557 Client: ClientForGrandpa<Block, BE> + 'static,
558{
559 let chain_info = client.info();
560 let genesis_hash = chain_info.genesis_hash;
561
562 let persistent_data =
563 aux_schema::load_persistent(&*client, genesis_hash, <NumberFor<Block>>::zero(), {
564 let telemetry = telemetry.clone();
565 move || {
566 let authorities = genesis_authorities_provider.get()?;
567 telemetry!(
568 telemetry;
569 CONSENSUS_DEBUG;
570 "afg.loading_authorities";
571 "authorities_len" => ?authorities.len()
572 );
573 Ok(authorities)
574 }
575 })?;
576
577 let (voter_commands_tx, voter_commands_rx) =
578 tracing_unbounded("mpsc_grandpa_voter_command", 100_000);
579
580 let (justification_sender, justification_stream) = GrandpaJustificationStream::channel();
581
582 let authority_set_hard_forks = authority_set_hard_forks
584 .into_iter()
585 .map(|fork| {
586 let delay_kind = if let Some(last_finalized) = fork.last_finalized {
587 authorities::DelayKind::Best { median_last_finalized: last_finalized }
588 } else {
589 authorities::DelayKind::Finalized
590 };
591
592 (
593 fork.set_id,
594 authorities::PendingChange {
595 next_authorities: fork.authorities,
596 delay: Zero::zero(),
597 canon_hash: fork.block.0,
598 canon_height: fork.block.1,
599 delay_kind,
600 },
601 )
602 })
603 .collect();
604
605 Ok((
606 GrandpaBlockImport::new(
607 client.clone(),
608 justification_import_period,
609 select_chain.clone(),
610 persistent_data.authority_set.clone(),
611 voter_commands_tx,
612 authority_set_hard_forks,
613 justification_sender.clone(),
614 telemetry.clone(),
615 ),
616 LinkHalf {
617 client,
618 select_chain,
619 persistent_data,
620 voter_commands_rx,
621 justification_sender,
622 justification_stream,
623 telemetry,
624 },
625 ))
626}
627
628fn global_communication<BE, Block: BlockT, C, N, S>(
629 set_id: SetId,
630 voters: &Arc<VoterSet<AuthorityId>>,
631 client: Arc<C>,
632 network: &NetworkBridge<Block, N, S>,
633 keystore: Option<&KeystorePtr>,
634 metrics: Option<until_imported::Metrics>,
635) -> (
636 impl Stream<
637 Item = Result<
638 CommunicationInH<Block, Block::Hash>,
639 CommandOrError<Block::Hash, NumberFor<Block>>,
640 >,
641 >,
642 impl Sink<
643 CommunicationOutH<Block, Block::Hash>,
644 Error = CommandOrError<Block::Hash, NumberFor<Block>>,
645 >,
646)
647where
648 BE: Backend<Block> + 'static,
649 C: ClientForGrandpa<Block, BE> + 'static,
650 N: NetworkT<Block>,
651 S: SyncingT<Block>,
652 NumberFor<Block>: BlockNumberOps,
653{
654 let is_voter = local_authority_id(voters, keystore).is_some();
655
656 let (global_in, global_out) =
658 network.global_communication(communication::SetId(set_id), voters.clone(), is_voter);
659
660 let global_in = UntilGlobalMessageBlocksImported::new(
662 client.import_notification_stream(),
663 network.clone(),
664 client.clone(),
665 global_in,
666 "global",
667 metrics,
668 );
669
670 let global_in = global_in.map_err(CommandOrError::from);
671 let global_out = global_out.sink_map_err(CommandOrError::from);
672
673 (global_in, global_out)
674}
675
676pub struct GrandpaParams<Block: BlockT, C, N, S, SC, VR> {
678 pub config: Config,
680 pub link: LinkHalf<Block, C, SC>,
682 pub network: N,
688 pub sync: S,
690 pub notification_service: Box<dyn NotificationService>,
692 pub voting_rule: VR,
694 pub prometheus_registry: Option<prometheus_endpoint::Registry>,
696 pub shared_voter_state: SharedVoterState,
698 pub telemetry: Option<TelemetryHandle>,
700 pub offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
705}
706
707pub fn grandpa_peers_set_config<B: BlockT, N: NetworkBackend<B, <B as BlockT>::Hash>>(
711 protocol_name: ProtocolName,
712 metrics: sc_network::service::NotificationMetrics,
713 peer_store_handle: Arc<dyn sc_network::peer_store::PeerStoreProvider>,
714) -> (N::NotificationProtocolConfig, Box<dyn NotificationService>) {
715 use communication::grandpa_protocol_name;
716 N::notification_config(
717 protocol_name,
718 grandpa_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect(),
719 1024 * 1024,
721 None,
722 sc_network::config::SetConfig {
723 in_peers: 0,
724 out_peers: 0,
725 reserved_nodes: Vec::new(),
726 non_reserved_mode: sc_network::config::NonReservedPeerMode::Deny,
727 },
728 metrics,
729 peer_store_handle,
730 )
731}
732
733pub fn run_grandpa_voter<Block: BlockT, BE: 'static, C, N, S, SC, VR>(
736 grandpa_params: GrandpaParams<Block, C, N, S, SC, VR>,
737) -> sp_blockchain::Result<impl Future<Output = ()> + Send>
738where
739 BE: Backend<Block> + 'static,
740 N: NetworkT<Block> + Sync + 'static,
741 S: SyncingT<Block> + Sync + 'static,
742 SC: SelectChain<Block> + 'static,
743 VR: VotingRule<Block, C> + Clone + 'static,
744 NumberFor<Block>: BlockNumberOps,
745 C: ClientForGrandpa<Block, BE> + 'static,
746 C::Api: GrandpaApi<Block>,
747{
748 let GrandpaParams {
749 mut config,
750 link,
751 network,
752 sync,
753 notification_service,
754 voting_rule,
755 prometheus_registry,
756 shared_voter_state,
757 telemetry,
758 offchain_tx_pool_factory,
759 } = grandpa_params;
760
761 config.observer_enabled = false;
766
767 let LinkHalf {
768 client,
769 select_chain,
770 persistent_data,
771 voter_commands_rx,
772 justification_sender,
773 justification_stream: _,
774 telemetry: _,
775 } = link;
776
777 let network = NetworkBridge::new(
778 network,
779 sync,
780 notification_service,
781 config.clone(),
782 persistent_data.set_state.clone(),
783 prometheus_registry.as_ref(),
784 telemetry.clone(),
785 );
786
787 let conf = config.clone();
788 let telemetry_task =
789 if let Some(telemetry_on_connect) = telemetry.as_ref().map(|x| x.on_connect_stream()) {
790 let authorities = persistent_data.authority_set.clone();
791 let telemetry = telemetry.clone();
792 let events = telemetry_on_connect.for_each(move |_| {
793 let current_authorities = authorities.current_authorities();
794 let set_id = authorities.set_id();
795 let maybe_authority_id =
796 local_authority_id(¤t_authorities, conf.keystore.as_ref());
797
798 let authorities =
799 current_authorities.iter().map(|(id, _)| id.to_string()).collect::<Vec<_>>();
800
801 let authorities = serde_json::to_string(&authorities).expect(
802 "authorities is always at least an empty vector; \
803 elements are always of type string",
804 );
805
806 telemetry!(
807 telemetry;
808 CONSENSUS_INFO;
809 "afg.authority_set";
810 "authority_id" => maybe_authority_id.map_or("".into(), |s| s.to_string()),
811 "authority_set_id" => ?set_id,
812 "authorities" => authorities,
813 );
814
815 future::ready(())
816 });
817 future::Either::Left(events)
818 } else {
819 future::Either::Right(future::pending())
820 };
821
822 let voter_work = VoterWork::new(
823 client,
824 config,
825 network,
826 select_chain,
827 voting_rule,
828 persistent_data,
829 voter_commands_rx,
830 prometheus_registry,
831 shared_voter_state,
832 justification_sender,
833 telemetry,
834 offchain_tx_pool_factory,
835 );
836
837 let voter_work = voter_work.map(|res| match res {
838 Ok(()) => error!(
839 target: LOG_TARGET,
840 "GRANDPA voter future has concluded naturally, this should be unreachable."
841 ),
842 Err(e) => error!(target: LOG_TARGET, "GRANDPA voter error: {}", e),
843 });
844
845 let telemetry_task = telemetry_task.then(|_| future::pending::<()>());
847
848 Ok(future::select(voter_work, telemetry_task).map(drop))
849}
850
851struct Metrics {
852 environment: environment::Metrics,
853 until_imported: until_imported::Metrics,
854}
855
856impl Metrics {
857 fn register(registry: &Registry) -> Result<Self, PrometheusError> {
858 Ok(Metrics {
859 environment: environment::Metrics::register(registry)?,
860 until_imported: until_imported::Metrics::register(registry)?,
861 })
862 }
863}
864
865#[must_use]
867struct VoterWork<B, Block: BlockT, C, N: NetworkT<Block>, S: SyncingT<Block>, SC, VR> {
868 voter: Pin<
869 Box<dyn Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> + Send>,
870 >,
871 shared_voter_state: SharedVoterState,
872 env: Arc<Environment<B, Block, C, N, S, SC, VR>>,
873 voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
874 network: NetworkBridge<Block, N, S>,
875 telemetry: Option<TelemetryHandle>,
876 metrics: Option<Metrics>,
878}
879
880impl<B, Block, C, N, S, SC, VR> VoterWork<B, Block, C, N, S, SC, VR>
881where
882 Block: BlockT,
883 B: Backend<Block> + 'static,
884 C: ClientForGrandpa<Block, B> + 'static,
885 C::Api: GrandpaApi<Block>,
886 N: NetworkT<Block> + Sync,
887 S: SyncingT<Block> + Sync,
888 NumberFor<Block>: BlockNumberOps,
889 SC: SelectChain<Block> + 'static,
890 VR: VotingRule<Block, C> + Clone + 'static,
891{
892 fn new(
893 client: Arc<C>,
894 config: Config,
895 network: NetworkBridge<Block, N, S>,
896 select_chain: SC,
897 voting_rule: VR,
898 persistent_data: PersistentData<Block>,
899 voter_commands_rx: TracingUnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
900 prometheus_registry: Option<prometheus_endpoint::Registry>,
901 shared_voter_state: SharedVoterState,
902 justification_sender: GrandpaJustificationSender<Block>,
903 telemetry: Option<TelemetryHandle>,
904 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
905 ) -> Self {
906 let metrics = match prometheus_registry.as_ref().map(Metrics::register) {
907 Some(Ok(metrics)) => Some(metrics),
908 Some(Err(e)) => {
909 debug!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
910 None
911 },
912 None => None,
913 };
914
915 let voters = persistent_data.authority_set.current_authorities();
916 let env = Arc::new(Environment {
917 client,
918 select_chain,
919 voting_rule,
920 voters: Arc::new(voters),
921 config,
922 network: network.clone(),
923 set_id: persistent_data.authority_set.set_id(),
924 authority_set: persistent_data.authority_set.clone(),
925 voter_set_state: persistent_data.set_state,
926 metrics: metrics.as_ref().map(|m| m.environment.clone()),
927 justification_sender: Some(justification_sender),
928 telemetry: telemetry.clone(),
929 offchain_tx_pool_factory,
930 _phantom: PhantomData,
931 });
932
933 let mut work = VoterWork {
934 voter: Box::pin(future::pending()),
937 shared_voter_state,
938 env,
939 voter_commands_rx,
940 network,
941 telemetry,
942 metrics,
943 };
944 work.rebuild_voter();
945 work
946 }
947
948 fn rebuild_voter(&mut self) {
952 debug!(
953 target: LOG_TARGET,
954 "{}: Starting new voter with set ID {}",
955 self.env.config.name(),
956 self.env.set_id
957 );
958
959 let maybe_authority_id =
960 local_authority_id(&self.env.voters, self.env.config.keystore.as_ref());
961 let authority_id = maybe_authority_id.map_or("<unknown>".into(), |s| s.to_string());
962
963 telemetry!(
964 self.telemetry;
965 CONSENSUS_DEBUG;
966 "afg.starting_new_voter";
967 "name" => ?self.env.config.name(),
968 "set_id" => ?self.env.set_id,
969 "authority_id" => authority_id,
970 );
971
972 let chain_info = self.env.client.info();
973
974 let authorities = self.env.voters.iter().map(|(id, _)| id.to_string()).collect::<Vec<_>>();
975
976 let authorities = serde_json::to_string(&authorities).expect(
977 "authorities is always at least an empty vector; elements are always of type string; qed.",
978 );
979
980 telemetry!(
981 self.telemetry;
982 CONSENSUS_INFO;
983 "afg.authority_set";
984 "number" => ?chain_info.finalized_number,
985 "hash" => ?chain_info.finalized_hash,
986 "authority_id" => authority_id,
987 "authority_set_id" => ?self.env.set_id,
988 "authorities" => authorities,
989 );
990
991 match &*self.env.voter_set_state.read() {
992 VoterSetState::Live { completed_rounds, .. } => {
993 let last_finalized = (chain_info.finalized_hash, chain_info.finalized_number);
994
995 let global_comms = global_communication(
996 self.env.set_id,
997 &self.env.voters,
998 self.env.client.clone(),
999 &self.env.network,
1000 self.env.config.keystore.as_ref(),
1001 self.metrics.as_ref().map(|m| m.until_imported.clone()),
1002 );
1003
1004 let last_completed_round = completed_rounds.last();
1005
1006 let voter = voter::Voter::new(
1007 self.env.clone(),
1008 (*self.env.voters).clone(),
1009 global_comms,
1010 last_completed_round.number,
1011 last_completed_round.votes.clone(),
1012 last_completed_round.base,
1013 last_finalized,
1014 );
1015
1016 if self.shared_voter_state.reset(voter.voter_state()).is_none() {
1018 info!(
1019 target: LOG_TARGET,
1020 "Timed out trying to update shared GRANDPA voter state. \
1021 RPC endpoints may return stale data."
1022 );
1023 }
1024
1025 self.voter = Box::pin(voter);
1026 },
1027 VoterSetState::Paused { .. } => self.voter = Box::pin(future::pending()),
1028 };
1029 }
1030
1031 fn handle_voter_command(
1032 &mut self,
1033 command: VoterCommand<Block::Hash, NumberFor<Block>>,
1034 ) -> Result<(), Error> {
1035 match command {
1036 VoterCommand::ChangeAuthorities(new) => {
1037 let voters: Vec<String> =
1038 new.authorities.iter().map(move |(a, _)| format!("{}", a)).collect();
1039 telemetry!(
1040 self.telemetry;
1041 CONSENSUS_INFO;
1042 "afg.voter_command_change_authorities";
1043 "number" => ?new.canon_number,
1044 "hash" => ?new.canon_hash,
1045 "voters" => ?voters,
1046 "set_id" => ?new.set_id,
1047 );
1048
1049 self.env.update_voter_set_state(|_| {
1050 let set_state = VoterSetState::live(
1053 new.set_id,
1054 &*self.env.authority_set.inner(),
1055 (new.canon_hash, new.canon_number),
1056 );
1057
1058 aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
1059 Ok(Some(set_state))
1060 })?;
1061
1062 let voters = Arc::new(VoterSet::new(new.authorities.into_iter()).expect(
1063 "new authorities come from pending change; pending change comes from \
1064 `AuthoritySet`; `AuthoritySet` validates authorities is non-empty and \
1065 weights are non-zero; qed.",
1066 ));
1067
1068 self.env = Arc::new(Environment {
1069 voters,
1070 set_id: new.set_id,
1071 voter_set_state: self.env.voter_set_state.clone(),
1072 client: self.env.client.clone(),
1073 select_chain: self.env.select_chain.clone(),
1074 config: self.env.config.clone(),
1075 authority_set: self.env.authority_set.clone(),
1076 network: self.env.network.clone(),
1077 voting_rule: self.env.voting_rule.clone(),
1078 metrics: self.env.metrics.clone(),
1079 justification_sender: self.env.justification_sender.clone(),
1080 telemetry: self.telemetry.clone(),
1081 offchain_tx_pool_factory: self.env.offchain_tx_pool_factory.clone(),
1082 _phantom: PhantomData,
1083 });
1084
1085 self.rebuild_voter();
1086 Ok(())
1087 },
1088 VoterCommand::Pause(reason) => {
1089 info!(target: LOG_TARGET, "Pausing old validator set: {}", reason);
1090
1091 self.env.update_voter_set_state(|voter_set_state| {
1093 let completed_rounds = voter_set_state.completed_rounds();
1094 let set_state = VoterSetState::Paused { completed_rounds };
1095
1096 aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
1097 Ok(Some(set_state))
1098 })?;
1099
1100 self.rebuild_voter();
1101 Ok(())
1102 },
1103 }
1104 }
1105}
1106
1107impl<B, Block, C, N, S, SC, VR> Future for VoterWork<B, Block, C, N, S, SC, VR>
1108where
1109 Block: BlockT,
1110 B: Backend<Block> + 'static,
1111 N: NetworkT<Block> + Sync,
1112 S: SyncingT<Block> + Sync,
1113 NumberFor<Block>: BlockNumberOps,
1114 SC: SelectChain<Block> + 'static,
1115 C: ClientForGrandpa<Block, B> + 'static,
1116 C::Api: GrandpaApi<Block>,
1117 VR: VotingRule<Block, C> + Clone + 'static,
1118{
1119 type Output = Result<(), Error>;
1120
1121 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1122 match Future::poll(Pin::new(&mut self.voter), cx) {
1123 Poll::Pending => {},
1124 Poll::Ready(Ok(())) => {
1125 return Poll::Ready(Err(Error::Safety(
1127 "consensus-grandpa inner voter has concluded.".into(),
1128 )))
1129 },
1130 Poll::Ready(Err(CommandOrError::Error(e))) => {
1131 return Poll::Ready(Err(e))
1133 },
1134 Poll::Ready(Err(CommandOrError::VoterCommand(command))) => {
1135 self.handle_voter_command(command)?;
1137 cx.waker().wake_by_ref();
1138 },
1139 }
1140
1141 match Stream::poll_next(Pin::new(&mut self.voter_commands_rx), cx) {
1142 Poll::Pending => {},
1143 Poll::Ready(None) => {
1144 return Poll::Ready(Err(Error::Safety("`voter_commands_rx` was closed.".into())))
1146 },
1147 Poll::Ready(Some(command)) => {
1148 self.handle_voter_command(command)?;
1150 cx.waker().wake_by_ref();
1151 },
1152 }
1153
1154 Future::poll(Pin::new(&mut self.network), cx)
1155 }
1156}
1157
1158fn local_authority_id(
1162 voters: &VoterSet<AuthorityId>,
1163 keystore: Option<&KeystorePtr>,
1164) -> Option<AuthorityId> {
1165 keystore.and_then(|keystore| {
1166 voters
1167 .iter()
1168 .find(|(p, _)| keystore.has_keys(&[(p.to_raw_vec(), AuthorityId::ID)]))
1169 .map(|(p, _)| p.clone())
1170 })
1171}
1172
1173pub fn revert<Block, Client>(client: Arc<Client>, blocks: NumberFor<Block>) -> ClientResult<()>
1177where
1178 Block: BlockT,
1179 Client: AuxStore + HeaderMetadata<Block, Error = ClientError> + HeaderBackend<Block>,
1180{
1181 let best_number = client.info().best_number;
1182 let finalized = client.info().finalized_number;
1183
1184 let revertible = blocks.min(best_number - finalized);
1185 if revertible == Zero::zero() {
1186 return Ok(())
1187 }
1188
1189 let number = best_number - revertible;
1190 let hash = client
1191 .block_hash_from_id(&BlockId::Number(number))?
1192 .ok_or(ClientError::Backend(format!(
1193 "Unexpected hash lookup failure for block number: {}",
1194 number
1195 )))?;
1196
1197 let info = client.info();
1198
1199 let persistent_data: PersistentData<Block> =
1200 aux_schema::load_persistent(&*client, info.genesis_hash, Zero::zero(), || {
1201 const MSG: &str = "Unexpected missing grandpa data during revert";
1202 Err(ClientError::Application(Box::from(MSG)))
1203 })?;
1204
1205 let shared_authority_set = persistent_data.authority_set;
1206 let mut authority_set = shared_authority_set.inner();
1207
1208 let is_descendent_of = is_descendent_of(&*client, None);
1209 authority_set.revert(hash, number, &is_descendent_of);
1210
1211 let (set_id, set_ref) = authority_set.current();
1213 let new_set = Some(NewAuthoritySet {
1214 canon_hash: info.finalized_hash,
1215 canon_number: info.finalized_number,
1216 set_id,
1217 authorities: set_ref.to_vec(),
1218 });
1219 aux_schema::update_authority_set::<Block, _, _>(&authority_set, new_set.as_ref(), |values| {
1220 client.insert_aux(values, None)
1221 })
1222}