referrerpolicy=no-referrer-when-downgrade

sc_consensus_beefy/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use crate::{
20	communication::{
21		notification::{
22			BeefyBestBlockSender, BeefyBestBlockStream, BeefyVersionedFinalityProofSender,
23			BeefyVersionedFinalityProofStream,
24		},
25		peers::KnownPeers,
26		request_response::{
27			outgoing_requests_engine::OnDemandJustificationsEngine, BeefyJustifsRequestHandler,
28		},
29	},
30	error::Error,
31	import::BeefyBlockImport,
32	metrics::register_metrics,
33};
34use futures::{stream::Fuse, FutureExt, StreamExt};
35use log::{debug, error, info, trace, warn};
36use parking_lot::Mutex;
37use prometheus_endpoint::Registry;
38use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotification, Finalizer};
39use sc_consensus::BlockImport;
40use sc_network::{NetworkRequest, NotificationService, ProtocolName};
41use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing};
42use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
43use sp_api::ProvideRuntimeApi;
44use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend};
45use sp_consensus::{Error as ConsensusError, SyncOracle};
46use sp_consensus_beefy::{
47	AuthorityIdBound, BeefyApi, ConsensusLog, PayloadProvider, ValidatorSet, BEEFY_ENGINE_ID,
48};
49use sp_keystore::KeystorePtr;
50use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero};
51use std::{
52	collections::{BTreeMap, VecDeque},
53	future::Future,
54	marker::PhantomData,
55	pin::Pin,
56	sync::Arc,
57	time::Duration,
58};
59
60mod aux_schema;
61mod error;
62mod keystore;
63mod metrics;
64mod round;
65mod worker;
66
67pub mod communication;
68pub mod import;
69pub mod justification;
70
71use crate::{
72	communication::gossip::GossipValidator,
73	fisherman::Fisherman,
74	justification::BeefyVersionedFinalityProof,
75	keystore::BeefyKeystore,
76	metrics::VoterMetrics,
77	round::Rounds,
78	worker::{BeefyWorker, PersistedState},
79};
80pub use communication::beefy_protocol_name::{
81	gossip_protocol_name, justifications_protocol_name as justifs_protocol_name,
82};
83use sp_runtime::generic::OpaqueDigestItemId;
84
85mod fisherman;
86#[cfg(test)]
87mod tests;
88
89const LOG_TARGET: &str = "beefy";
90
91const HEADER_SYNC_DELAY: Duration = Duration::from_secs(60);
92
93type FinalityNotifications<Block> =
94	sc_utils::mpsc::TracingUnboundedReceiver<UnpinnedFinalityNotification<Block>>;
95/// A convenience BEEFY client trait that defines all the type bounds a BEEFY client
96/// has to satisfy. Ideally that should actually be a trait alias. Unfortunately as
97/// of today, Rust does not allow a type alias to be used as a trait bound. Tracking
98/// issue is <https://github.com/rust-lang/rust/issues/41517>.
99pub trait Client<B, BE>:
100	BlockchainEvents<B> + HeaderBackend<B> + Finalizer<B, BE> + Send + Sync
101where
102	B: Block,
103	BE: Backend<B>,
104{
105	// empty
106}
107
108impl<B, BE, T> Client<B, BE> for T
109where
110	B: Block,
111	BE: Backend<B>,
112	T: BlockchainEvents<B>
113		+ HeaderBackend<B>
114		+ Finalizer<B, BE>
115		+ ProvideRuntimeApi<B>
116		+ Send
117		+ Sync,
118{
119	// empty
120}
121
122/// Links between the block importer, the background voter and the RPC layer,
123/// to be used by the voter.
124#[derive(Clone)]
125pub struct BeefyVoterLinks<B: Block, AuthorityId: AuthorityIdBound> {
126	// BlockImport -> Voter links
127	/// Stream of BEEFY signed commitments from block import to voter.
128	pub from_block_import_justif_stream: BeefyVersionedFinalityProofStream<B, AuthorityId>,
129
130	// Voter -> RPC links
131	/// Sends BEEFY signed commitments from voter to RPC.
132	pub to_rpc_justif_sender: BeefyVersionedFinalityProofSender<B, AuthorityId>,
133	/// Sends BEEFY best block hashes from voter to RPC.
134	pub to_rpc_best_block_sender: BeefyBestBlockSender<B>,
135}
136
137/// Links used by the BEEFY RPC layer, from the BEEFY background voter.
138#[derive(Clone)]
139pub struct BeefyRPCLinks<B: Block, AuthorityId: AuthorityIdBound> {
140	/// Stream of signed commitments coming from the voter.
141	pub from_voter_justif_stream: BeefyVersionedFinalityProofStream<B, AuthorityId>,
142	/// Stream of BEEFY best block hashes coming from the voter.
143	pub from_voter_best_beefy_stream: BeefyBestBlockStream<B>,
144}
145
146/// Make block importer and link half necessary to tie the background voter to it.
147pub fn beefy_block_import_and_links<B, BE, RuntimeApi, I, AuthorityId: AuthorityIdBound>(
148	wrapped_block_import: I,
149	backend: Arc<BE>,
150	runtime: Arc<RuntimeApi>,
151	prometheus_registry: Option<Registry>,
152) -> (
153	BeefyBlockImport<B, BE, RuntimeApi, I, AuthorityId>,
154	BeefyVoterLinks<B, AuthorityId>,
155	BeefyRPCLinks<B, AuthorityId>,
156)
157where
158	B: Block,
159	BE: Backend<B>,
160	I: BlockImport<B, Error = ConsensusError> + Send + Sync,
161	RuntimeApi: ProvideRuntimeApi<B> + Send + Sync,
162	RuntimeApi::Api: BeefyApi<B, AuthorityId>,
163	AuthorityId: AuthorityIdBound,
164{
165	// Voter -> RPC links
166	let (to_rpc_justif_sender, from_voter_justif_stream) =
167		BeefyVersionedFinalityProofStream::<B, AuthorityId>::channel();
168	let (to_rpc_best_block_sender, from_voter_best_beefy_stream) =
169		BeefyBestBlockStream::<B>::channel();
170
171	// BlockImport -> Voter links
172	let (to_voter_justif_sender, from_block_import_justif_stream) =
173		BeefyVersionedFinalityProofStream::<B, AuthorityId>::channel();
174	let metrics = register_metrics(prometheus_registry);
175
176	// BlockImport
177	let import = BeefyBlockImport::new(
178		backend,
179		runtime,
180		wrapped_block_import,
181		to_voter_justif_sender,
182		metrics,
183	);
184	let voter_links = BeefyVoterLinks {
185		from_block_import_justif_stream,
186		to_rpc_justif_sender,
187		to_rpc_best_block_sender,
188	};
189	let rpc_links = BeefyRPCLinks { from_voter_best_beefy_stream, from_voter_justif_stream };
190
191	(import, voter_links, rpc_links)
192}
193
194/// BEEFY gadget network parameters.
195pub struct BeefyNetworkParams<B: Block, N, S> {
196	/// Network implementing gossip, requests and sync-oracle.
197	pub network: Arc<N>,
198	/// Syncing service implementing a sync oracle and an event stream for peers.
199	pub sync: Arc<S>,
200	/// Handle for receiving notification events.
201	pub notification_service: Box<dyn NotificationService>,
202	/// Chain specific BEEFY gossip protocol name. See
203	/// [`communication::beefy_protocol_name::gossip_protocol_name`].
204	pub gossip_protocol_name: ProtocolName,
205	/// Chain specific BEEFY on-demand justifications protocol name. See
206	/// [`communication::beefy_protocol_name::justifications_protocol_name`].
207	pub justifications_protocol_name: ProtocolName,
208
209	pub _phantom: PhantomData<B>,
210}
211
212/// BEEFY gadget initialization parameters.
213pub struct BeefyParams<B: Block, BE, C, N, P, R, S, AuthorityId: AuthorityIdBound> {
214	/// BEEFY client
215	pub client: Arc<C>,
216	/// Client Backend
217	pub backend: Arc<BE>,
218	/// BEEFY Payload provider
219	pub payload_provider: P,
220	/// Runtime Api Provider
221	pub runtime: Arc<R>,
222	/// Local key store
223	pub key_store: Option<KeystorePtr>,
224	/// BEEFY voter network params
225	pub network_params: BeefyNetworkParams<B, N, S>,
226	/// Minimal delta between blocks, BEEFY should vote for
227	pub min_block_delta: u32,
228	/// Prometheus metric registry
229	pub prometheus_registry: Option<Registry>,
230	/// Links between the block importer, the background voter and the RPC layer.
231	pub links: BeefyVoterLinks<B, AuthorityId>,
232	/// Handler for incoming BEEFY justifications requests from a remote peer.
233	pub on_demand_justifications_handler: BeefyJustifsRequestHandler<B, C>,
234	/// Whether running under "Authority" role.
235	pub is_authority: bool,
236}
237/// Helper object holding BEEFY worker communication/gossip components.
238///
239/// These are created once, but will be reused if worker is restarted/reinitialized.
240pub(crate) struct BeefyComms<B: Block, N, AuthorityId: AuthorityIdBound> {
241	pub gossip_engine: GossipEngine<B>,
242	pub gossip_validator: Arc<GossipValidator<B, N, AuthorityId>>,
243	pub on_demand_justifications: OnDemandJustificationsEngine<B, AuthorityId>,
244}
245
246/// Helper builder object for building [worker::BeefyWorker].
247///
248/// It has to do it in two steps: initialization and build, because the first step can sleep waiting
249/// for certain chain and backend conditions, and while sleeping we still need to pump the
250/// GossipEngine. Once initialization is done, the GossipEngine (and other pieces) are added to get
251/// the complete [worker::BeefyWorker] object.
252pub(crate) struct BeefyWorkerBuilder<B: Block, BE, RuntimeApi, AuthorityId: AuthorityIdBound> {
253	// utilities
254	backend: Arc<BE>,
255	runtime: Arc<RuntimeApi>,
256	key_store: BeefyKeystore<AuthorityId>,
257	// voter metrics
258	metrics: Option<VoterMetrics>,
259	persisted_state: PersistedState<B, AuthorityId>,
260}
261
262impl<B, BE, R, AuthorityId> BeefyWorkerBuilder<B, BE, R, AuthorityId>
263where
264	B: Block + codec::Codec,
265	BE: Backend<B>,
266	R: ProvideRuntimeApi<B>,
267	R::Api: BeefyApi<B, AuthorityId>,
268	AuthorityId: AuthorityIdBound,
269{
270	/// This will wait for the chain to enable BEEFY (if not yet enabled) and also wait for the
271	/// backend to sync all headers required by the voter to build a contiguous chain of mandatory
272	/// justifications. Then it builds the initial voter state using a combination of previously
273	/// persisted state in AUX DB and latest chain information/progress.
274	///
275	/// Returns a sane `BeefyWorkerBuilder` that can build the `BeefyWorker`.
276	pub async fn async_initialize<N>(
277		backend: Arc<BE>,
278		runtime: Arc<R>,
279		key_store: BeefyKeystore<AuthorityId>,
280		metrics: Option<VoterMetrics>,
281		min_block_delta: u32,
282		gossip_validator: Arc<GossipValidator<B, N, AuthorityId>>,
283		finality_notifications: &mut Fuse<FinalityNotifications<B>>,
284		is_authority: bool,
285	) -> Result<Self, Error> {
286		// Wait for BEEFY pallet to be active before starting voter.
287		let (beefy_genesis, best_grandpa) =
288			wait_for_runtime_pallet(&*runtime, finality_notifications).await?;
289
290		let persisted_state = Self::load_or_init_state(
291			beefy_genesis,
292			best_grandpa,
293			min_block_delta,
294			backend.clone(),
295			runtime.clone(),
296			&key_store,
297			&metrics,
298			is_authority,
299		)
300		.await?;
301		// Update the gossip validator with the right starting round and set id.
302		persisted_state
303			.gossip_filter_config()
304			.map(|f| gossip_validator.update_filter(f))?;
305
306		Ok(BeefyWorkerBuilder { backend, runtime, key_store, metrics, persisted_state })
307	}
308
309	/// Takes rest of missing pieces as params and builds the `BeefyWorker`.
310	pub fn build<P, S, N>(
311		self,
312		payload_provider: P,
313		sync: Arc<S>,
314		comms: BeefyComms<B, N, AuthorityId>,
315		links: BeefyVoterLinks<B, AuthorityId>,
316		pending_justifications: BTreeMap<NumberFor<B>, BeefyVersionedFinalityProof<B, AuthorityId>>,
317		is_authority: bool,
318	) -> BeefyWorker<B, BE, P, R, S, N, AuthorityId> {
319		let key_store = Arc::new(self.key_store);
320		BeefyWorker {
321			backend: self.backend.clone(),
322			runtime: self.runtime.clone(),
323			key_store: key_store.clone(),
324			payload_provider,
325			sync,
326			fisherman: Arc::new(Fisherman::new(self.backend, self.runtime, key_store)),
327			metrics: self.metrics,
328			persisted_state: self.persisted_state,
329			comms,
330			links,
331			pending_justifications,
332			is_authority,
333		}
334	}
335
336	// If no persisted state present, walk back the chain from first GRANDPA notification to either:
337	//  - latest BEEFY finalized block, or if none found on the way,
338	//  - BEEFY pallet genesis;
339	// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to
340	// finalize.
341	async fn init_state(
342		beefy_genesis: NumberFor<B>,
343		best_grandpa: <B as Block>::Header,
344		min_block_delta: u32,
345		backend: Arc<BE>,
346		runtime: Arc<R>,
347	) -> Result<PersistedState<B, AuthorityId>, Error> {
348		let blockchain = backend.blockchain();
349
350		let beefy_genesis = runtime
351			.runtime_api()
352			.beefy_genesis(best_grandpa.hash())
353			.ok()
354			.flatten()
355			.filter(|genesis| *genesis == beefy_genesis)
356			.ok_or_else(|| Error::Backend("BEEFY pallet expected to be active.".into()))?;
357		// Walk back the imported blocks and initialize voter either, at the last block with
358		// a BEEFY justification, or at pallet genesis block; voter will resume from there.
359		let mut sessions = VecDeque::new();
360		let mut header = best_grandpa.clone();
361		let state = loop {
362			if let Some(true) = blockchain
363				.justifications(header.hash())
364				.ok()
365				.flatten()
366				.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
367			{
368				debug!(
369					target: LOG_TARGET,
370					"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
371					*header.number()
372				);
373				let best_beefy = *header.number();
374				// If no session boundaries detected so far, just initialize new rounds here.
375				if sessions.is_empty() {
376					let active_set =
377						expect_validator_set(runtime.as_ref(), backend.as_ref(), &header).await?;
378					let mut rounds = Rounds::new(best_beefy, active_set);
379					// Mark the round as already finalized.
380					rounds.conclude(best_beefy);
381					sessions.push_front(rounds);
382				}
383				let state = PersistedState::checked_new(
384					best_grandpa,
385					best_beefy,
386					sessions,
387					min_block_delta,
388					beefy_genesis,
389				)
390				.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?;
391				break state;
392			}
393
394			if *header.number() == beefy_genesis {
395				// We've reached BEEFY genesis, initialize voter here.
396				let genesis_set =
397					expect_validator_set(runtime.as_ref(), backend.as_ref(), &header).await?;
398				info!(
399					target: LOG_TARGET,
400					"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
401					Starting voting rounds at block {:?}, genesis validator set {:?}.",
402					beefy_genesis,
403					genesis_set,
404				);
405
406				sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
407				break PersistedState::checked_new(
408					best_grandpa,
409					Zero::zero(),
410					sessions,
411					min_block_delta,
412					beefy_genesis,
413				)
414				.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?;
415			}
416
417			if let Some(active) = find_authorities_change::<B, AuthorityId>(&header) {
418				debug!(
419					target: LOG_TARGET,
420					"🥩 Marking block {:?} as BEEFY Mandatory.",
421					*header.number()
422				);
423				sessions.push_front(Rounds::new(*header.number(), active));
424			}
425
426			// Move up the chain.
427			header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?;
428		};
429
430		aux_schema::write_current_version(backend.as_ref())?;
431		aux_schema::write_voter_state(backend.as_ref(), &state)?;
432		Ok(state)
433	}
434
435	async fn load_or_init_state(
436		beefy_genesis: NumberFor<B>,
437		best_grandpa: <B as Block>::Header,
438		min_block_delta: u32,
439		backend: Arc<BE>,
440		runtime: Arc<R>,
441		key_store: &BeefyKeystore<AuthorityId>,
442		metrics: &Option<VoterMetrics>,
443		is_authority: bool,
444	) -> Result<PersistedState<B, AuthorityId>, Error> {
445		// Initialize voter state from AUX DB if compatible.
446		if let Some(mut state) = crate::aux_schema::load_persistent(backend.as_ref())?
447			// Verify state pallet genesis matches runtime.
448			.filter(|state| state.pallet_genesis() == beefy_genesis)
449		{
450			// Overwrite persisted state with current best GRANDPA block.
451			state.set_best_grandpa(best_grandpa.clone());
452			// Overwrite persisted data with newly provided `min_block_delta`.
453			state.set_min_block_delta(min_block_delta);
454			debug!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db.");
455			trace!(target: LOG_TARGET, "🥩 Loaded state: {:?}.", state);
456
457			// Make sure that all the headers that we need have been synced.
458			let mut new_sessions = vec![];
459			let mut header = best_grandpa.clone();
460			while *header.number() > state.best_beefy() {
461				if state.voting_oracle().can_add_session(*header.number()) {
462					if let Some(active) = find_authorities_change::<B, AuthorityId>(&header) {
463						new_sessions.push((active, *header.number()));
464					}
465				}
466				header =
467					wait_for_parent_header(backend.blockchain(), header, HEADER_SYNC_DELAY).await?;
468			}
469
470			// Make sure we didn't miss any sessions during node restart.
471			for (validator_set, new_session_start) in new_sessions.drain(..).rev() {
472				debug!(
473					target: LOG_TARGET,
474					"🥩 Handling missed BEEFY session after node restart: {:?}.",
475					new_session_start
476				);
477				state.init_session_at(
478					new_session_start,
479					validator_set,
480					key_store,
481					metrics,
482					is_authority,
483				);
484			}
485			return Ok(state);
486		}
487
488		// No valid voter-state persisted, re-initialize from pallet genesis.
489		Self::init_state(beefy_genesis, best_grandpa, min_block_delta, backend, runtime).await
490	}
491}
492
493/// Finality notification for consumption by BEEFY worker.
494/// This is a stripped down version of `sc_client_api::FinalityNotification` which does not keep
495/// blocks pinned.
496struct UnpinnedFinalityNotification<B: Block> {
497	/// Finalized block header hash.
498	pub hash: B::Hash,
499	/// Finalized block header.
500	pub header: B::Header,
501	/// Path from the old finalized to new finalized parent (implicitly finalized blocks).
502	///
503	/// This maps to the range `(old_finalized, new_finalized)`.
504	pub tree_route: Arc<[B::Hash]>,
505}
506
507impl<B: Block> From<FinalityNotification<B>> for UnpinnedFinalityNotification<B> {
508	fn from(value: FinalityNotification<B>) -> Self {
509		UnpinnedFinalityNotification {
510			hash: value.hash,
511			header: value.header,
512			tree_route: value.tree_route,
513		}
514	}
515}
516
517/// Start the BEEFY gadget.
518///
519/// This is a thin shim around running and awaiting a BEEFY worker.
520pub async fn start_beefy_gadget<B, BE, C, N, P, R, S, AuthorityId>(
521	beefy_params: BeefyParams<B, BE, C, N, P, R, S, AuthorityId>,
522) where
523	B: Block,
524	BE: Backend<B>,
525	C: Client<B, BE> + BlockBackend<B>,
526	P: PayloadProvider<B> + Clone,
527	R: ProvideRuntimeApi<B>,
528	R::Api: BeefyApi<B, AuthorityId>,
529	N: GossipNetwork<B> + NetworkRequest + Send + Sync + 'static,
530	S: GossipSyncing<B> + SyncOracle + 'static,
531	AuthorityId: AuthorityIdBound,
532{
533	let BeefyParams {
534		client,
535		backend,
536		payload_provider,
537		runtime,
538		key_store,
539		network_params,
540		min_block_delta,
541		prometheus_registry,
542		links,
543		mut on_demand_justifications_handler,
544		is_authority,
545	} = beefy_params;
546
547	let BeefyNetworkParams {
548		network,
549		sync,
550		notification_service,
551		gossip_protocol_name,
552		justifications_protocol_name,
553		..
554	} = network_params;
555
556	let metrics = register_metrics(prometheus_registry.clone());
557
558	let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();
559
560	// Subscribe to finality notifications and justifications before waiting for runtime pallet and
561	// reuse the streams, so we don't miss notifications while waiting for pallet to be available.
562	let finality_notifications = client.finality_notification_stream();
563	let (mut transformer, mut finality_notifications) =
564		finality_notification_transformer_future(finality_notifications);
565
566	let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
567	// Default votes filter is to discard everything.
568	// Validator is updated later with correct starting round and set id.
569	let gossip_validator =
570		communication::gossip::GossipValidator::new(known_peers.clone(), network.clone());
571	let gossip_validator = Arc::new(gossip_validator);
572	let gossip_engine = GossipEngine::new(
573		network.clone(),
574		sync.clone(),
575		notification_service,
576		gossip_protocol_name.clone(),
577		gossip_validator.clone(),
578		None,
579	);
580
581	// The `GossipValidator` adds and removes known peers based on valid votes and network
582	// events.
583	let on_demand_justifications = OnDemandJustificationsEngine::new(
584		network.clone(),
585		justifications_protocol_name.clone(),
586		known_peers,
587		prometheus_registry.clone(),
588	);
589	let mut beefy_comms = BeefyComms { gossip_engine, gossip_validator, on_demand_justifications };
590
591	// We re-create and re-run the worker in this loop in order to quickly reinit and resume after
592	// select recoverable errors.
593	loop {
594		// Make sure to pump gossip engine while waiting for initialization conditions.
595		let worker_builder = futures::select! {
596			builder_init_result = BeefyWorkerBuilder::async_initialize(
597				backend.clone(),
598				runtime.clone(),
599				key_store.clone().into(),
600				metrics.clone(),
601				min_block_delta,
602				beefy_comms.gossip_validator.clone(),
603				&mut finality_notifications,
604				is_authority,
605			).fuse() => {
606				match builder_init_result {
607					Ok(builder) => builder,
608					Err(e) => {
609						error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", e);
610						return
611					},
612				}
613			},
614			// Pump gossip engine.
615			_ = &mut beefy_comms.gossip_engine => {
616				error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated.");
617				return
618			},
619			_ = &mut transformer => {
620				error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated.");
621				return
622			},
623		};
624
625		let worker = worker_builder.build(
626			payload_provider.clone(),
627			sync.clone(),
628			beefy_comms,
629			links.clone(),
630			BTreeMap::new(),
631			is_authority,
632		);
633
634		futures::select! {
635			result = worker.run(&mut block_import_justif, &mut finality_notifications).fuse() => {
636				match result {
637					(error::Error::ConsensusReset, reuse_comms) => {
638						error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
639						beefy_comms = reuse_comms;
640						continue;
641					},
642					(err, _) => {
643						error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", err)
644					}
645				}
646			},
647			odj_handler_error = on_demand_justifications_handler.run().fuse() => {
648				error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_error)
649			},
650			_ = &mut transformer => {
651				error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated.");
652			}
653		}
654		return;
655	}
656}
657
658/// Produce a future that transformes finality notifications into a struct that does not keep blocks
659/// pinned.
660fn finality_notification_transformer_future<B>(
661	mut finality_notifications: sc_client_api::FinalityNotifications<B>,
662) -> (
663	Pin<Box<futures::future::Fuse<impl Future<Output = ()> + Sized>>>,
664	Fuse<TracingUnboundedReceiver<UnpinnedFinalityNotification<B>>>,
665)
666where
667	B: Block,
668{
669	let (tx, rx) = tracing_unbounded("beefy-notification-transformer-channel", 10000);
670	let transformer_fut = async move {
671		while let Some(notification) = finality_notifications.next().await {
672			debug!(target: LOG_TARGET, "🥩 Transforming grandpa notification. #{}({:?})", notification.header.number(), notification.hash);
673			if let Err(err) = tx.unbounded_send(UnpinnedFinalityNotification::from(notification)) {
674				error!(target: LOG_TARGET, "🥩 Unable to send transformed notification. Shutting down. err = {}", err);
675				return
676			};
677		}
678	};
679	(Box::pin(transformer_fut.fuse()), rx.fuse())
680}
681
682/// Waits until the parent header of `current` is available and returns it.
683///
684/// When the node uses GRANDPA warp sync it initially downloads only the mandatory GRANDPA headers.
685/// The rest of the headers (gap sync) are lazily downloaded later. But the BEEFY voter also needs
686/// the headers in range `[beefy_genesis..=best_grandpa]` to be available. This helper method
687/// enables us to wait until these headers have been synced.
688async fn wait_for_parent_header<B, BC>(
689	blockchain: &BC,
690	current: <B as Block>::Header,
691	delay: Duration,
692) -> Result<<B as Block>::Header, Error>
693where
694	B: Block,
695	BC: BlockchainBackend<B>,
696{
697	if *current.number() == Zero::zero() {
698		let msg = format!("header {} is Genesis, there is no parent for it", current.hash());
699		warn!(target: LOG_TARGET, "{}", msg);
700		return Err(Error::Backend(msg));
701	}
702	loop {
703		match blockchain
704			.header(*current.parent_hash())
705			.map_err(|e| Error::Backend(e.to_string()))?
706		{
707			Some(parent) => return Ok(parent),
708			None => {
709				info!(
710					target: LOG_TARGET,
711					"🥩 Parent of header number {} not found. \
712					BEEFY gadget waiting for header sync to finish ...",
713					current.number()
714				);
715				tokio::time::sleep(delay).await;
716			},
717		}
718	}
719}
720
721/// Wait for BEEFY runtime pallet to be available, return active validator set.
722/// Should be called only once during worker initialization.
723async fn wait_for_runtime_pallet<B, R, AuthorityId: AuthorityIdBound>(
724	runtime: &R,
725	finality: &mut Fuse<FinalityNotifications<B>>,
726) -> Result<(NumberFor<B>, <B as Block>::Header), Error>
727where
728	B: Block,
729	R: ProvideRuntimeApi<B>,
730	R::Api: BeefyApi<B, AuthorityId>,
731{
732	info!(target: LOG_TARGET, "🥩 BEEFY gadget waiting for BEEFY pallet to become available...");
733	loop {
734		let notif = finality.next().await.ok_or_else(|| {
735			let err_msg = "🥩 Finality stream has unexpectedly terminated.".into();
736			error!(target: LOG_TARGET, "{}", err_msg);
737			Error::Backend(err_msg)
738		})?;
739		let at = notif.header.hash();
740		if let Some(start) = runtime.runtime_api().beefy_genesis(at).ok().flatten() {
741			if *notif.header.number() >= start {
742				// Beefy pallet available, return header for best grandpa at the time.
743				info!(
744					target: LOG_TARGET,
745					"🥩 BEEFY pallet available: block {:?} beefy genesis {:?}",
746					notif.header.number(), start
747				);
748				return Ok((start, notif.header));
749			}
750		}
751	}
752}
753
754/// Provides validator set active `at_header`. It tries to get it from state, otherwise falls
755/// back to walk up the chain looking the validator set enactment in header digests.
756///
757/// Note: function will `async::sleep()` when walking back the chain if some needed header hasn't
758/// been synced yet (as it happens when warp syncing when headers are synced in the background).
759async fn expect_validator_set<B, BE, R, AuthorityId: AuthorityIdBound>(
760	runtime: &R,
761	backend: &BE,
762	at_header: &B::Header,
763) -> Result<ValidatorSet<AuthorityId>, Error>
764where
765	B: Block,
766	BE: Backend<B>,
767	R: ProvideRuntimeApi<B>,
768	R::Api: BeefyApi<B, AuthorityId>,
769{
770	let blockchain = backend.blockchain();
771	// Walk up the chain looking for the validator set active at 'at_header'. Process both state and
772	// header digests.
773	debug!(
774		target: LOG_TARGET,
775		"🥩 Trying to find validator set active at header(number {:?}, hash {:?})",
776		at_header.number(),
777		at_header.hash()
778	);
779	let mut header = at_header.clone();
780	loop {
781		debug!(target: LOG_TARGET, "🥩 Looking for auth set change at block number: {:?}", *header.number());
782		if let Ok(Some(active)) = runtime.runtime_api().validator_set(header.hash()) {
783			return Ok(active);
784		} else {
785			match find_authorities_change::<B, AuthorityId>(&header) {
786				Some(active) => return Ok(active),
787				// Move up the chain. Ultimately we'll get it from chain genesis state, or error out
788				// there.
789				None =>
790					header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY)
791						.await
792						.map_err(|e| Error::Backend(e.to_string()))?,
793			}
794		}
795	}
796}
797
798/// Scan the `header` digest log for a BEEFY validator set change. Return either the new
799/// validator set or `None` in case no validator set change has been signaled.
800pub(crate) fn find_authorities_change<B, AuthorityId>(
801	header: &B::Header,
802) -> Option<ValidatorSet<AuthorityId>>
803where
804	B: Block,
805	AuthorityId: AuthorityIdBound,
806{
807	let id = OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID);
808
809	let filter = |log: ConsensusLog<AuthorityId>| match log {
810		ConsensusLog::AuthoritiesChange(validator_set) => Some(validator_set),
811		_ => None,
812	};
813	header.digest().convert_first(|l| l.try_to(id).and_then(filter))
814}