1use crate::{
20 communication::{
21 notification::{
22 BeefyBestBlockSender, BeefyBestBlockStream, BeefyVersionedFinalityProofSender,
23 BeefyVersionedFinalityProofStream,
24 },
25 peers::KnownPeers,
26 request_response::{
27 outgoing_requests_engine::OnDemandJustificationsEngine, BeefyJustifsRequestHandler,
28 },
29 },
30 error::Error,
31 import::BeefyBlockImport,
32 metrics::register_metrics,
33};
34use futures::{stream::Fuse, FutureExt, StreamExt};
35use log::{debug, error, info, trace, warn};
36use parking_lot::Mutex;
37use prometheus_endpoint::Registry;
38use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotification, Finalizer};
39use sc_consensus::BlockImport;
40use sc_network::{NetworkRequest, NotificationService, ProtocolName};
41use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing};
42use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
43use sp_api::ProvideRuntimeApi;
44use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend};
45use sp_consensus::{Error as ConsensusError, SyncOracle};
46use sp_consensus_beefy::{
47 AuthorityIdBound, BeefyApi, ConsensusLog, PayloadProvider, ValidatorSet, BEEFY_ENGINE_ID,
48};
49use sp_keystore::KeystorePtr;
50use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero};
51use std::{
52 collections::{BTreeMap, VecDeque},
53 future::Future,
54 marker::PhantomData,
55 pin::Pin,
56 sync::Arc,
57 time::Duration,
58};
59
60mod aux_schema;
61mod error;
62mod keystore;
63mod metrics;
64mod round;
65mod worker;
66
67pub mod communication;
68pub mod import;
69pub mod justification;
70
71use crate::{
72 communication::gossip::GossipValidator,
73 fisherman::Fisherman,
74 justification::BeefyVersionedFinalityProof,
75 keystore::BeefyKeystore,
76 metrics::VoterMetrics,
77 round::Rounds,
78 worker::{BeefyWorker, PersistedState},
79};
80pub use communication::beefy_protocol_name::{
81 gossip_protocol_name, justifications_protocol_name as justifs_protocol_name,
82};
83use sp_runtime::generic::OpaqueDigestItemId;
84
85mod fisherman;
86#[cfg(test)]
87mod tests;
88
89const LOG_TARGET: &str = "beefy";
90
91const HEADER_SYNC_DELAY: Duration = Duration::from_secs(60);
92
93type FinalityNotifications<Block> =
94 sc_utils::mpsc::TracingUnboundedReceiver<UnpinnedFinalityNotification<Block>>;
95pub trait Client<B, BE>:
100 BlockchainEvents<B> + HeaderBackend<B> + Finalizer<B, BE> + Send + Sync
101where
102 B: Block,
103 BE: Backend<B>,
104{
105 }
107
108impl<B, BE, T> Client<B, BE> for T
109where
110 B: Block,
111 BE: Backend<B>,
112 T: BlockchainEvents<B>
113 + HeaderBackend<B>
114 + Finalizer<B, BE>
115 + ProvideRuntimeApi<B>
116 + Send
117 + Sync,
118{
119 }
121
122#[derive(Clone)]
125pub struct BeefyVoterLinks<B: Block, AuthorityId: AuthorityIdBound> {
126 pub from_block_import_justif_stream: BeefyVersionedFinalityProofStream<B, AuthorityId>,
129
130 pub to_rpc_justif_sender: BeefyVersionedFinalityProofSender<B, AuthorityId>,
133 pub to_rpc_best_block_sender: BeefyBestBlockSender<B>,
135}
136
137#[derive(Clone)]
139pub struct BeefyRPCLinks<B: Block, AuthorityId: AuthorityIdBound> {
140 pub from_voter_justif_stream: BeefyVersionedFinalityProofStream<B, AuthorityId>,
142 pub from_voter_best_beefy_stream: BeefyBestBlockStream<B>,
144}
145
146pub fn beefy_block_import_and_links<B, BE, RuntimeApi, I, AuthorityId: AuthorityIdBound>(
148 wrapped_block_import: I,
149 backend: Arc<BE>,
150 runtime: Arc<RuntimeApi>,
151 prometheus_registry: Option<Registry>,
152) -> (
153 BeefyBlockImport<B, BE, RuntimeApi, I, AuthorityId>,
154 BeefyVoterLinks<B, AuthorityId>,
155 BeefyRPCLinks<B, AuthorityId>,
156)
157where
158 B: Block,
159 BE: Backend<B>,
160 I: BlockImport<B, Error = ConsensusError> + Send + Sync,
161 RuntimeApi: ProvideRuntimeApi<B> + Send + Sync,
162 RuntimeApi::Api: BeefyApi<B, AuthorityId>,
163 AuthorityId: AuthorityIdBound,
164{
165 let (to_rpc_justif_sender, from_voter_justif_stream) =
167 BeefyVersionedFinalityProofStream::<B, AuthorityId>::channel();
168 let (to_rpc_best_block_sender, from_voter_best_beefy_stream) =
169 BeefyBestBlockStream::<B>::channel();
170
171 let (to_voter_justif_sender, from_block_import_justif_stream) =
173 BeefyVersionedFinalityProofStream::<B, AuthorityId>::channel();
174 let metrics = register_metrics(prometheus_registry);
175
176 let import = BeefyBlockImport::new(
178 backend,
179 runtime,
180 wrapped_block_import,
181 to_voter_justif_sender,
182 metrics,
183 );
184 let voter_links = BeefyVoterLinks {
185 from_block_import_justif_stream,
186 to_rpc_justif_sender,
187 to_rpc_best_block_sender,
188 };
189 let rpc_links = BeefyRPCLinks { from_voter_best_beefy_stream, from_voter_justif_stream };
190
191 (import, voter_links, rpc_links)
192}
193
194pub struct BeefyNetworkParams<B: Block, N, S> {
196 pub network: Arc<N>,
198 pub sync: Arc<S>,
200 pub notification_service: Box<dyn NotificationService>,
202 pub gossip_protocol_name: ProtocolName,
205 pub justifications_protocol_name: ProtocolName,
208
209 pub _phantom: PhantomData<B>,
210}
211
212pub struct BeefyParams<B: Block, BE, C, N, P, R, S, AuthorityId: AuthorityIdBound> {
214 pub client: Arc<C>,
216 pub backend: Arc<BE>,
218 pub payload_provider: P,
220 pub runtime: Arc<R>,
222 pub key_store: Option<KeystorePtr>,
224 pub network_params: BeefyNetworkParams<B, N, S>,
226 pub min_block_delta: u32,
228 pub prometheus_registry: Option<Registry>,
230 pub links: BeefyVoterLinks<B, AuthorityId>,
232 pub on_demand_justifications_handler: BeefyJustifsRequestHandler<B, C>,
234 pub is_authority: bool,
236}
237pub(crate) struct BeefyComms<B: Block, N, AuthorityId: AuthorityIdBound> {
241 pub gossip_engine: GossipEngine<B>,
242 pub gossip_validator: Arc<GossipValidator<B, N, AuthorityId>>,
243 pub on_demand_justifications: OnDemandJustificationsEngine<B, AuthorityId>,
244}
245
246pub(crate) struct BeefyWorkerBuilder<B: Block, BE, RuntimeApi, AuthorityId: AuthorityIdBound> {
253 backend: Arc<BE>,
255 runtime: Arc<RuntimeApi>,
256 key_store: BeefyKeystore<AuthorityId>,
257 metrics: Option<VoterMetrics>,
259 persisted_state: PersistedState<B, AuthorityId>,
260}
261
262impl<B, BE, R, AuthorityId> BeefyWorkerBuilder<B, BE, R, AuthorityId>
263where
264 B: Block + codec::Codec,
265 BE: Backend<B>,
266 R: ProvideRuntimeApi<B>,
267 R::Api: BeefyApi<B, AuthorityId>,
268 AuthorityId: AuthorityIdBound,
269{
270 pub async fn async_initialize<N>(
277 backend: Arc<BE>,
278 runtime: Arc<R>,
279 key_store: BeefyKeystore<AuthorityId>,
280 metrics: Option<VoterMetrics>,
281 min_block_delta: u32,
282 gossip_validator: Arc<GossipValidator<B, N, AuthorityId>>,
283 finality_notifications: &mut Fuse<FinalityNotifications<B>>,
284 is_authority: bool,
285 ) -> Result<Self, Error> {
286 let (beefy_genesis, best_grandpa) =
288 wait_for_runtime_pallet(&*runtime, finality_notifications).await?;
289
290 let persisted_state = Self::load_or_init_state(
291 beefy_genesis,
292 best_grandpa,
293 min_block_delta,
294 backend.clone(),
295 runtime.clone(),
296 &key_store,
297 &metrics,
298 is_authority,
299 )
300 .await?;
301 persisted_state
303 .gossip_filter_config()
304 .map(|f| gossip_validator.update_filter(f))?;
305
306 Ok(BeefyWorkerBuilder { backend, runtime, key_store, metrics, persisted_state })
307 }
308
309 pub fn build<P, S, N>(
311 self,
312 payload_provider: P,
313 sync: Arc<S>,
314 comms: BeefyComms<B, N, AuthorityId>,
315 links: BeefyVoterLinks<B, AuthorityId>,
316 pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B, AuthorityId>>,
317 is_authority: bool,
318 ) -> BeefyWorker<B, BE, P, R, S, N, AuthorityId> {
319 let key_store = Arc::new(self.key_store);
320 BeefyWorker {
321 backend: self.backend.clone(),
322 runtime: self.runtime.clone(),
323 key_store: key_store.clone(),
324 payload_provider,
325 sync,
326 fisherman: Arc::new(Fisherman::new(self.backend, self.runtime, key_store)),
327 metrics: self.metrics,
328 persisted_state: self.persisted_state,
329 comms,
330 links,
331 pending_justifications,
332 is_authority,
333 }
334 }
335
336 async fn init_state(
342 beefy_genesis: NumberFor<B>,
343 best_grandpa: <B as Block>::Header,
344 min_block_delta: u32,
345 backend: Arc<BE>,
346 runtime: Arc<R>,
347 ) -> Result<PersistedState<B, AuthorityId>, Error> {
348 let blockchain = backend.blockchain();
349
350 let beefy_genesis = runtime
351 .runtime_api()
352 .beefy_genesis(best_grandpa.hash())
353 .ok()
354 .flatten()
355 .filter(|genesis| *genesis == beefy_genesis)
356 .ok_or_else(|| Error::Backend("BEEFY pallet expected to be active.".into()))?;
357 let mut sessions = VecDeque::new();
360 let mut header = best_grandpa.clone();
361 let state = loop {
362 if let Some(true) = blockchain
363 .justifications(header.hash())
364 .ok()
365 .flatten()
366 .map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
367 {
368 debug!(
369 target: LOG_TARGET,
370 "🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
371 *header.number()
372 );
373 let best_beefy = *header.number();
374 if sessions.is_empty() {
376 let active_set =
377 expect_validator_set(runtime.as_ref(), backend.as_ref(), &header).await?;
378 let mut rounds = Rounds::new(best_beefy, active_set);
379 rounds.conclude(best_beefy);
381 sessions.push_front(rounds);
382 }
383 let state = PersistedState::checked_new(
384 best_grandpa,
385 best_beefy,
386 sessions,
387 min_block_delta,
388 beefy_genesis,
389 )
390 .ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?;
391 break state;
392 }
393
394 if *header.number() == beefy_genesis {
395 let genesis_set =
397 expect_validator_set(runtime.as_ref(), backend.as_ref(), &header).await?;
398 info!(
399 target: LOG_TARGET,
400 "🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
401 Starting voting rounds at block {:?}, genesis validator set {:?}.",
402 beefy_genesis,
403 genesis_set,
404 );
405
406 sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
407 break PersistedState::checked_new(
408 best_grandpa,
409 Zero::zero(),
410 sessions,
411 min_block_delta,
412 beefy_genesis,
413 )
414 .ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?;
415 }
416
417 if let Some(active) = find_authorities_change::<B, AuthorityId>(&header) {
418 debug!(
419 target: LOG_TARGET,
420 "🥩 Marking block {:?} as BEEFY Mandatory.",
421 *header.number()
422 );
423 sessions.push_front(Rounds::new(*header.number(), active));
424 }
425
426 header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?;
428 };
429
430 aux_schema::write_current_version(backend.as_ref())?;
431 aux_schema::write_voter_state(backend.as_ref(), &state)?;
432 Ok(state)
433 }
434
435 async fn load_or_init_state(
436 beefy_genesis: NumberFor<B>,
437 best_grandpa: <B as Block>::Header,
438 min_block_delta: u32,
439 backend: Arc<BE>,
440 runtime: Arc<R>,
441 key_store: &BeefyKeystore<AuthorityId>,
442 metrics: &Option<VoterMetrics>,
443 is_authority: bool,
444 ) -> Result<PersistedState<B, AuthorityId>, Error> {
445 if let Some(mut state) = crate::aux_schema::load_persistent(backend.as_ref())?
447 .filter(|state| state.pallet_genesis() == beefy_genesis)
449 {
450 state.set_best_grandpa(best_grandpa.clone());
452 state.set_min_block_delta(min_block_delta);
454 debug!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db.");
455 trace!(target: LOG_TARGET, "🥩 Loaded state: {:?}.", state);
456
457 let mut new_sessions = vec![];
459 let mut header = best_grandpa.clone();
460 while *header.number() > state.best_beefy() {
461 if state.voting_oracle().can_add_session(*header.number()) {
462 if let Some(active) = find_authorities_change::<B, AuthorityId>(&header) {
463 new_sessions.push((active, *header.number()));
464 }
465 }
466 header =
467 wait_for_parent_header(backend.blockchain(), header, HEADER_SYNC_DELAY).await?;
468 }
469
470 for (validator_set, new_session_start) in new_sessions.drain(..).rev() {
472 debug!(
473 target: LOG_TARGET,
474 "🥩 Handling missed BEEFY session after node restart: {:?}.",
475 new_session_start
476 );
477 state.init_session_at(
478 new_session_start,
479 validator_set,
480 key_store,
481 metrics,
482 is_authority,
483 );
484 }
485 return Ok(state);
486 }
487
488 Self::init_state(beefy_genesis, best_grandpa, min_block_delta, backend, runtime).await
490 }
491}
492
493struct UnpinnedFinalityNotification<B: Block> {
497 pub hash: B::Hash,
499 pub header: B::Header,
501 pub tree_route: Arc<[B::Hash]>,
505}
506
507impl<B: Block> From<FinalityNotification<B>> for UnpinnedFinalityNotification<B> {
508 fn from(value: FinalityNotification<B>) -> Self {
509 UnpinnedFinalityNotification {
510 hash: value.hash,
511 header: value.header,
512 tree_route: value.tree_route,
513 }
514 }
515}
516
517pub async fn start_beefy_gadget<B, BE, C, N, P, R, S, AuthorityId>(
521 beefy_params: BeefyParams<B, BE, C, N, P, R, S, AuthorityId>,
522) where
523 B: Block,
524 BE: Backend<B>,
525 C: Client<B, BE> + BlockBackend<B>,
526 P: PayloadProvider<B> + Clone,
527 R: ProvideRuntimeApi<B>,
528 R::Api: BeefyApi<B, AuthorityId>,
529 N: GossipNetwork<B> + NetworkRequest + Send + Sync + 'static,
530 S: GossipSyncing<B> + SyncOracle + 'static,
531 AuthorityId: AuthorityIdBound,
532{
533 let BeefyParams {
534 client,
535 backend,
536 payload_provider,
537 runtime,
538 key_store,
539 network_params,
540 min_block_delta,
541 prometheus_registry,
542 links,
543 mut on_demand_justifications_handler,
544 is_authority,
545 } = beefy_params;
546
547 let BeefyNetworkParams {
548 network,
549 sync,
550 notification_service,
551 gossip_protocol_name,
552 justifications_protocol_name,
553 ..
554 } = network_params;
555
556 let metrics = register_metrics(prometheus_registry.clone());
557
558 let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();
559
560 let finality_notifications = client.finality_notification_stream();
563 let (mut transformer, mut finality_notifications) =
564 finality_notification_transformer_future(finality_notifications);
565
566 let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
567 let gossip_validator =
570 communication::gossip::GossipValidator::new(known_peers.clone(), network.clone());
571 let gossip_validator = Arc::new(gossip_validator);
572 let gossip_engine = GossipEngine::new(
573 network.clone(),
574 sync.clone(),
575 notification_service,
576 gossip_protocol_name.clone(),
577 gossip_validator.clone(),
578 None,
579 );
580
581 let on_demand_justifications = OnDemandJustificationsEngine::new(
584 network.clone(),
585 justifications_protocol_name.clone(),
586 known_peers,
587 prometheus_registry.clone(),
588 );
589 let mut beefy_comms = BeefyComms { gossip_engine, gossip_validator, on_demand_justifications };
590
591 loop {
594 let worker_builder = futures::select! {
596 builder_init_result = BeefyWorkerBuilder::async_initialize(
597 backend.clone(),
598 runtime.clone(),
599 key_store.clone().into(),
600 metrics.clone(),
601 min_block_delta,
602 beefy_comms.gossip_validator.clone(),
603 &mut finality_notifications,
604 is_authority,
605 ).fuse() => {
606 match builder_init_result {
607 Ok(builder) => builder,
608 Err(e) => {
609 error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", e);
610 return
611 },
612 }
613 },
614 _ = &mut beefy_comms.gossip_engine => {
616 error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated.");
617 return
618 },
619 _ = &mut transformer => {
620 error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated.");
621 return
622 },
623 };
624
625 let worker = worker_builder.build(
626 payload_provider.clone(),
627 sync.clone(),
628 beefy_comms,
629 links.clone(),
630 BTreeMap::new(),
631 is_authority,
632 );
633
634 futures::select! {
635 result = worker.run(&mut block_import_justif, &mut finality_notifications).fuse() => {
636 match result {
637 (error::Error::ConsensusReset, reuse_comms) => {
638 error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
639 beefy_comms = reuse_comms;
640 continue;
641 },
642 (err, _) => {
643 error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", err)
644 }
645 }
646 },
647 odj_handler_error = on_demand_justifications_handler.run().fuse() => {
648 error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_error)
649 },
650 _ = &mut transformer => {
651 error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated.");
652 }
653 }
654 return;
655 }
656}
657
658fn finality_notification_transformer_future<B>(
661 mut finality_notifications: sc_client_api::FinalityNotifications<B>,
662) -> (
663 Pin<Box<futures::future::Fuse<impl Future<Output = ()> + Sized>>>,
664 Fuse<TracingUnboundedReceiver<UnpinnedFinalityNotification<B>>>,
665)
666where
667 B: Block,
668{
669 let (tx, rx) = tracing_unbounded("beefy-notification-transformer-channel", 10000);
670 let transformer_fut = async move {
671 while let Some(notification) = finality_notifications.next().await {
672 debug!(target: LOG_TARGET, "🥩 Transforming grandpa notification. #{}({:?})", notification.header.number(), notification.hash);
673 if let Err(err) = tx.unbounded_send(UnpinnedFinalityNotification::from(notification)) {
674 error!(target: LOG_TARGET, "🥩 Unable to send transformed notification. Shutting down. err = {}", err);
675 return
676 };
677 }
678 };
679 (Box::pin(transformer_fut.fuse()), rx.fuse())
680}
681
682async fn wait_for_parent_header<B, BC>(
689 blockchain: &BC,
690 current: <B as Block>::Header,
691 delay: Duration,
692) -> Result<<B as Block>::Header, Error>
693where
694 B: Block,
695 BC: BlockchainBackend<B>,
696{
697 if *current.number() == Zero::zero() {
698 let msg = format!("header {} is Genesis, there is no parent for it", current.hash());
699 warn!(target: LOG_TARGET, "{}", msg);
700 return Err(Error::Backend(msg));
701 }
702 loop {
703 match blockchain
704 .header(*current.parent_hash())
705 .map_err(|e| Error::Backend(e.to_string()))?
706 {
707 Some(parent) => return Ok(parent),
708 None => {
709 info!(
710 target: LOG_TARGET,
711 "🥩 Parent of header number {} not found. \
712 BEEFY gadget waiting for header sync to finish ...",
713 current.number()
714 );
715 tokio::time::sleep(delay).await;
716 },
717 }
718 }
719}
720
721async fn wait_for_runtime_pallet<B, R, AuthorityId: AuthorityIdBound>(
724 runtime: &R,
725 finality: &mut Fuse<FinalityNotifications<B>>,
726) -> Result<(NumberFor<B>, <B as Block>::Header), Error>
727where
728 B: Block,
729 R: ProvideRuntimeApi<B>,
730 R::Api: BeefyApi<B, AuthorityId>,
731{
732 info!(target: LOG_TARGET, "🥩 BEEFY gadget waiting for BEEFY pallet to become available...");
733 loop {
734 let notif = finality.next().await.ok_or_else(|| {
735 let err_msg = "🥩 Finality stream has unexpectedly terminated.".into();
736 error!(target: LOG_TARGET, "{}", err_msg);
737 Error::Backend(err_msg)
738 })?;
739 let at = notif.header.hash();
740 if let Some(start) = runtime.runtime_api().beefy_genesis(at).ok().flatten() {
741 if *notif.header.number() >= start {
742 info!(
744 target: LOG_TARGET,
745 "🥩 BEEFY pallet available: block {:?} beefy genesis {:?}",
746 notif.header.number(), start
747 );
748 return Ok((start, notif.header));
749 }
750 }
751 }
752}
753
754async fn expect_validator_set<B, BE, R, AuthorityId: AuthorityIdBound>(
760 runtime: &R,
761 backend: &BE,
762 at_header: &B::Header,
763) -> Result<ValidatorSet<AuthorityId>, Error>
764where
765 B: Block,
766 BE: Backend<B>,
767 R: ProvideRuntimeApi<B>,
768 R::Api: BeefyApi<B, AuthorityId>,
769{
770 let blockchain = backend.blockchain();
771 debug!(
774 target: LOG_TARGET,
775 "🥩 Trying to find validator set active at header(number {:?}, hash {:?})",
776 at_header.number(),
777 at_header.hash()
778 );
779 let mut header = at_header.clone();
780 loop {
781 debug!(target: LOG_TARGET, "🥩 Looking for auth set change at block number: {:?}", *header.number());
782 if let Ok(Some(active)) = runtime.runtime_api().validator_set(header.hash()) {
783 return Ok(active);
784 } else {
785 match find_authorities_change::<B, AuthorityId>(&header) {
786 Some(active) => return Ok(active),
787 None =>
790 header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY)
791 .await
792 .map_err(|e| Error::Backend(e.to_string()))?,
793 }
794 }
795 }
796}
797
798pub(crate) fn find_authorities_change<B, AuthorityId>(
801 header: &B::Header,
802) -> Option<ValidatorSet<AuthorityId>>
803where
804 B: Block,
805 AuthorityId: AuthorityIdBound,
806{
807 let id = OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID);
808
809 let filter = |log: ConsensusLog<AuthorityId>| match log {
810 ConsensusLog::AuthoritiesChange(validator_set) => Some(validator_set),
811 _ => None,
812 };
813 header.digest().convert_first(|l| l.try_to(id).and_then(filter))
814}