referrerpolicy=no-referrer-when-downgrade

sc_consensus_babe/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! # BABE (Blind Assignment for Blockchain Extension)
20//!
21//! BABE is a slot-based block production mechanism which uses a VRF PRNG to
22//! randomly perform the slot allocation. On every slot, all the authorities
23//! generate a new random number with the VRF function and if it is lower than a
24//! given threshold (which is proportional to their weight/stake) they have a
25//! right to produce a block. The proof of the VRF function execution will be
26//! used by other peer to validate the legitimacy of the slot claim.
27//!
28//! The engine is also responsible for collecting entropy on-chain which will be
29//! used to seed the given VRF PRNG. An epoch is a contiguous number of slots
30//! under which we will be using the same authority set. During an epoch all VRF
31//! outputs produced as a result of block production will be collected on an
32//! on-chain randomness pool. Epoch changes are announced one epoch in advance,
33//! i.e. when ending epoch N, we announce the parameters (randomness,
34//! authorities, etc.) for epoch N+2.
35//!
36//! Since the slot assignment is randomized, it is possible that a slot is
37//! assigned to multiple validators in which case we will have a temporary fork,
38//! or that a slot is assigned to no validator in which case no block is
39//! produced. Which means that block times are not deterministic.
40//!
41//! The protocol has a parameter `c` [0, 1] for which `1 - c` is the probability
42//! of a slot being empty. The choice of this parameter affects the security of
43//! the protocol relating to maximum tolerable network delays.
44//!
45//! In addition to the VRF-based slot assignment described above, which we will
46//! call primary slots, the engine also supports a deterministic secondary slot
47//! assignment. Primary slots take precedence over secondary slots, when
48//! authoring the node starts by trying to claim a primary slot and falls back
49//! to a secondary slot claim attempt. The secondary slot assignment is done
50//! by picking the authority at index:
51//!
52//! `blake2_256(epoch_randomness ++ slot_number) % authorities_len`.
53//!
54//! The secondary slots supports either a `SecondaryPlain` or `SecondaryVRF`
55//! variant. Comparing with `SecondaryPlain` variant, the `SecondaryVRF` variant
56//! generates an additional VRF output. The output is not included in beacon
57//! randomness, but can be consumed by parachains.
58//!
59//! The fork choice rule is weight-based, where weight equals the number of
60//! primary blocks in the chain. We will pick the heaviest chain (more primary
61//! blocks) and will go with the longest one in case of a tie.
62//!
63//! An in-depth description and analysis of the protocol can be found here:
64//! <https://research.web3.foundation/Polkadot/protocols/block-production/Babe>
65
66#![forbid(unsafe_code)]
67#![warn(missing_docs)]
68
69use std::{
70	collections::HashSet,
71	future::Future,
72	ops::{Deref, DerefMut},
73	pin::Pin,
74	sync::Arc,
75	task::{Context, Poll},
76	time::Duration,
77};
78
79use codec::{Decode, Encode};
80use futures::{
81	channel::{
82		mpsc::{channel, Receiver, Sender},
83		oneshot,
84	},
85	prelude::*,
86};
87use log::{debug, info, log, trace, warn};
88use parking_lot::Mutex;
89use prometheus_endpoint::Registry;
90
91use sc_client_api::{
92	backend::AuxStore, AuxDataOperations, Backend as BackendT, FinalityNotification,
93	PreCommitActions, UsageProvider,
94};
95use sc_consensus::{
96	block_import::{
97		BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
98		StateAction,
99	},
100	import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
101};
102use sc_consensus_epochs::{
103	descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpoch,
104	ViableEpochDescriptor,
105};
106use sc_consensus_slots::{
107	check_equivocation, BackoffAuthoringBlocksStrategy, CheckedHeader, InherentDataProviderExt,
108	SlotInfo, StorageChanges,
109};
110use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
111use sc_transaction_pool_api::OffchainTransactionPoolFactory;
112use sp_api::{ApiExt, ProvideRuntimeApi};
113use sp_application_crypto::AppCrypto;
114use sp_block_builder::BlockBuilder as BlockBuilderApi;
115use sp_blockchain::{
116	Backend as _, BlockStatus, Error as ClientError, ForkBackend, HeaderBackend, HeaderMetadata,
117	Result as ClientResult,
118};
119use sp_consensus::{BlockOrigin, Environment, Error as ConsensusError, Proposer, SelectChain};
120use sp_consensus_babe::{inherents::BabeInherentData, SlotDuration};
121use sp_consensus_slots::Slot;
122use sp_core::traits::SpawnEssentialNamed;
123use sp_inherents::{CreateInherentDataProviders, InherentDataProvider};
124use sp_keystore::KeystorePtr;
125use sp_runtime::{
126	generic::OpaqueDigestItemId,
127	traits::{Block as BlockT, Header, NumberFor, SaturatedConversion, Zero},
128	DigestItem,
129};
130
131pub use sc_consensus_slots::SlotProportion;
132pub use sp_consensus::SyncOracle;
133pub use sp_consensus_babe::{
134	digests::{
135		CompatibleDigestItem, NextConfigDescriptor, NextEpochDescriptor, PreDigest,
136		PrimaryPreDigest, SecondaryPlainPreDigest,
137	},
138	AuthorityId, AuthorityPair, AuthoritySignature, BabeApi, BabeAuthorityWeight, BabeBlockWeight,
139	BabeConfiguration, BabeEpochConfiguration, ConsensusLog, Randomness, BABE_ENGINE_ID,
140};
141
142pub use aux_schema::load_block_weight as block_weight;
143use sp_timestamp::Timestamp;
144
145mod migration;
146mod verification;
147
148pub mod authorship;
149pub mod aux_schema;
150#[cfg(test)]
151mod tests;
152
153const LOG_TARGET: &str = "babe";
154
155/// VRF context used for slots claiming lottery.
156const AUTHORING_SCORE_VRF_CONTEXT: &[u8] = b"substrate-babe-vrf";
157
158/// VRF output length for slots claiming lottery.
159const AUTHORING_SCORE_LENGTH: usize = 16;
160
161/// BABE epoch information
162#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
163pub struct Epoch(sp_consensus_babe::Epoch);
164
165impl Deref for Epoch {
166	type Target = sp_consensus_babe::Epoch;
167
168	fn deref(&self) -> &Self::Target {
169		&self.0
170	}
171}
172
173impl DerefMut for Epoch {
174	fn deref_mut(&mut self) -> &mut Self::Target {
175		&mut self.0
176	}
177}
178
179impl From<sp_consensus_babe::Epoch> for Epoch {
180	fn from(epoch: sp_consensus_babe::Epoch) -> Self {
181		Epoch(epoch)
182	}
183}
184
185impl EpochT for Epoch {
186	type NextEpochDescriptor = (NextEpochDescriptor, BabeEpochConfiguration);
187	type Slot = Slot;
188
189	fn increment(
190		&self,
191		(descriptor, config): (NextEpochDescriptor, BabeEpochConfiguration),
192	) -> Epoch {
193		sp_consensus_babe::Epoch {
194			epoch_index: self.epoch_index + 1,
195			start_slot: self.start_slot + self.duration,
196			duration: self.duration,
197			authorities: descriptor.authorities,
198			randomness: descriptor.randomness,
199			config,
200		}
201		.into()
202	}
203
204	fn start_slot(&self) -> Slot {
205		self.start_slot
206	}
207
208	fn end_slot(&self) -> Slot {
209		self.start_slot + self.duration
210	}
211}
212
213impl Epoch {
214	/// Create the genesis epoch (epoch #0).
215	///
216	/// This is defined to start at the slot of the first block, so that has to be provided.
217	pub fn genesis(genesis_config: &BabeConfiguration, slot: Slot) -> Epoch {
218		sp_consensus_babe::Epoch {
219			epoch_index: 0,
220			start_slot: slot,
221			duration: genesis_config.epoch_length,
222			authorities: genesis_config.authorities.clone(),
223			randomness: genesis_config.randomness,
224			config: BabeEpochConfiguration {
225				c: genesis_config.c,
226				allowed_slots: genesis_config.allowed_slots,
227			},
228		}
229		.into()
230	}
231
232	/// Clone and tweak epoch information to refer to the specified slot.
233	///
234	/// All the information which depends on the slot value is recomputed and assigned
235	/// to the returned epoch instance.
236	///
237	/// The `slot` must be greater than or equal the original epoch start slot,
238	/// if is less this operation is equivalent to a simple clone.
239	pub fn clone_for_slot(&self, slot: Slot) -> Epoch {
240		let mut epoch = self.clone();
241
242		let skipped_epochs = *slot.saturating_sub(self.start_slot) / self.duration;
243
244		let epoch_index = epoch.epoch_index.checked_add(skipped_epochs).expect(
245			"epoch number is u64; it should be strictly smaller than number of slots; \
246				slots relate in some way to wall clock time; \
247				if u64 is not enough we should crash for safety; qed.",
248		);
249
250		let start_slot = skipped_epochs
251			.checked_mul(epoch.duration)
252			.and_then(|skipped_slots| epoch.start_slot.checked_add(skipped_slots))
253			.expect(
254				"slot number is u64; it should relate in some way to wall clock time; \
255				 if u64 is not enough we should crash for safety; qed.",
256			);
257
258		epoch.epoch_index = epoch_index;
259		epoch.start_slot = Slot::from(start_slot);
260
261		epoch
262	}
263}
264
265/// Errors encountered by the babe authorship task.
266#[derive(Debug, thiserror::Error)]
267pub enum Error<B: BlockT> {
268	/// Multiple BABE pre-runtime digests
269	#[error("Multiple BABE pre-runtime digests, rejecting!")]
270	MultiplePreRuntimeDigests,
271	/// No BABE pre-runtime digest found
272	#[error("No BABE pre-runtime digest found")]
273	NoPreRuntimeDigest,
274	/// Multiple BABE epoch change digests
275	#[error("Multiple BABE epoch change digests, rejecting!")]
276	MultipleEpochChangeDigests,
277	/// Multiple BABE config change digests
278	#[error("Multiple BABE config change digests, rejecting!")]
279	MultipleConfigChangeDigests,
280	/// Could not extract timestamp and slot
281	#[error("Could not extract timestamp and slot: {0}")]
282	Extraction(ConsensusError),
283	/// Could not fetch epoch
284	#[error("Could not fetch epoch at {0:?}")]
285	FetchEpoch(B::Hash),
286	/// Header rejected: too far in the future
287	#[error("Header {0:?} rejected: too far in the future")]
288	TooFarInFuture(B::Hash),
289	/// Parent unavailable. Cannot import
290	#[error("Parent ({0}) of {1} unavailable. Cannot import")]
291	ParentUnavailable(B::Hash, B::Hash),
292	/// Slot number must increase
293	#[error("Slot number must increase: parent slot: {0}, this slot: {1}")]
294	SlotMustIncrease(Slot, Slot),
295	/// Header has a bad seal
296	#[error("Header {0:?} has a bad seal")]
297	HeaderBadSeal(B::Hash),
298	/// Header is unsealed
299	#[error("Header {0:?} is unsealed")]
300	HeaderUnsealed(B::Hash),
301	/// Slot author not found
302	#[error("Slot author not found")]
303	SlotAuthorNotFound,
304	/// Secondary slot assignments are disabled for the current epoch.
305	#[error("Secondary slot assignments are disabled for the current epoch.")]
306	SecondarySlotAssignmentsDisabled,
307	/// Bad signature
308	#[error("Bad signature on {0:?}")]
309	BadSignature(B::Hash),
310	/// Invalid author: Expected secondary author
311	#[error("Invalid author: Expected secondary author: {0:?}, got: {1:?}.")]
312	InvalidAuthor(AuthorityId, AuthorityId),
313	/// No secondary author expected.
314	#[error("No secondary author expected.")]
315	NoSecondaryAuthorExpected,
316	/// VRF verification failed
317	#[error("VRF verification failed")]
318	VrfVerificationFailed,
319	/// Primary slot threshold too low
320	#[error("VRF output rejected, threshold {0} exceeded")]
321	VrfThresholdExceeded(u128),
322	/// Could not fetch parent header
323	#[error("Could not fetch parent header: {0}")]
324	FetchParentHeader(sp_blockchain::Error),
325	/// Expected epoch change to happen.
326	#[error("Expected epoch change to happen at {0:?}, s{1}")]
327	ExpectedEpochChange(B::Hash, Slot),
328	/// Unexpected config change.
329	#[error("Unexpected config change")]
330	UnexpectedConfigChange,
331	/// Unexpected epoch change
332	#[error("Unexpected epoch change")]
333	UnexpectedEpochChange,
334	/// Parent block has no associated weight
335	#[error("Parent block of {0} has no associated weight")]
336	ParentBlockNoAssociatedWeight(B::Hash),
337	/// Check inherents error
338	#[error("Checking inherents failed: {0}")]
339	CheckInherents(sp_inherents::Error),
340	/// Unhandled check inherents error
341	#[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))]
342	CheckInherentsUnhandled(sp_inherents::InherentIdentifier),
343	/// Create inherents error.
344	#[error("Creating inherents failed: {0}")]
345	CreateInherents(sp_inherents::Error),
346	/// Background worker is not running and therefore requests cannot be answered.
347	#[error("Background worker is not running")]
348	BackgroundWorkerTerminated,
349	/// Client error
350	#[error(transparent)]
351	Client(sp_blockchain::Error),
352	/// Runtime Api error.
353	#[error(transparent)]
354	RuntimeApi(sp_api::ApiError),
355	/// Fork tree error
356	#[error(transparent)]
357	ForkTree(Box<fork_tree::Error<sp_blockchain::Error>>),
358}
359
360impl<B: BlockT> From<Error<B>> for String {
361	fn from(error: Error<B>) -> String {
362		error.to_string()
363	}
364}
365
366fn babe_err<B: BlockT>(error: Error<B>) -> Error<B> {
367	debug!(target: LOG_TARGET, "{}", error);
368	error
369}
370
371/// Intermediate value passed to block importer.
372pub struct BabeIntermediate<B: BlockT> {
373	/// The epoch descriptor.
374	pub epoch_descriptor: ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
375}
376
377/// Intermediate key for Babe engine.
378pub static INTERMEDIATE_KEY: &[u8] = b"babe1";
379
380/// Read configuration from the runtime state at current best block.
381pub fn configuration<B: BlockT, C>(client: &C) -> ClientResult<BabeConfiguration>
382where
383	C: AuxStore + ProvideRuntimeApi<B> + UsageProvider<B>,
384	C::Api: BabeApi<B>,
385{
386	let at_hash = if client.usage_info().chain.finalized_state.is_some() {
387		client.usage_info().chain.best_hash
388	} else {
389		debug!(target: LOG_TARGET, "No finalized state is available. Reading config from genesis");
390		client.usage_info().chain.genesis_hash
391	};
392
393	let runtime_api = client.runtime_api();
394	let version = runtime_api.api_version::<dyn BabeApi<B>>(at_hash)?;
395
396	let config = match version {
397		Some(1) => {
398			#[allow(deprecated)]
399			{
400				runtime_api.configuration_before_version_2(at_hash)?.into()
401			}
402		},
403		Some(2) => runtime_api.configuration(at_hash)?,
404		_ =>
405			return Err(sp_blockchain::Error::VersionInvalid(
406				"Unsupported or invalid BabeApi version".to_string(),
407			)),
408	};
409	Ok(config)
410}
411
412/// Parameters for BABE.
413pub struct BabeParams<B: BlockT, C, SC, E, I, SO, L, CIDP, BS> {
414	/// The keystore that manages the keys of the node.
415	pub keystore: KeystorePtr,
416
417	/// The client to use
418	pub client: Arc<C>,
419
420	/// The SelectChain Strategy
421	pub select_chain: SC,
422
423	/// The environment we are producing blocks for.
424	pub env: E,
425
426	/// The underlying block-import object to supply our produced blocks to.
427	/// This must be a `BabeBlockImport` or a wrapper of it, otherwise
428	/// critical consensus logic will be omitted.
429	pub block_import: I,
430
431	/// A sync oracle
432	pub sync_oracle: SO,
433
434	/// Hook into the sync module to control the justification sync process.
435	pub justification_sync_link: L,
436
437	/// Something that can create the inherent data providers.
438	pub create_inherent_data_providers: CIDP,
439
440	/// Force authoring of blocks even if we are offline
441	pub force_authoring: bool,
442
443	/// Strategy and parameters for backing off block production.
444	pub backoff_authoring_blocks: Option<BS>,
445
446	/// The source of timestamps for relative slots
447	pub babe_link: BabeLink<B>,
448
449	/// The proportion of the slot dedicated to proposing.
450	///
451	/// The block proposing will be limited to this proportion of the slot from the starting of the
452	/// slot. However, the proposing can still take longer when there is some lenience factor
453	/// applied, because there were no blocks produced for some slots.
454	pub block_proposal_slot_portion: SlotProportion,
455
456	/// The maximum proportion of the slot dedicated to proposing with any lenience factor applied
457	/// due to no blocks being produced.
458	pub max_block_proposal_slot_portion: Option<SlotProportion>,
459
460	/// Handle use to report telemetries.
461	pub telemetry: Option<TelemetryHandle>,
462}
463
464/// Start the babe worker.
465pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, L, Error>(
466	BabeParams {
467		keystore,
468		client,
469		select_chain,
470		env,
471		block_import,
472		sync_oracle,
473		justification_sync_link,
474		create_inherent_data_providers,
475		force_authoring,
476		backoff_authoring_blocks,
477		babe_link,
478		block_proposal_slot_portion,
479		max_block_proposal_slot_portion,
480		telemetry,
481	}: BabeParams<B, C, SC, E, I, SO, L, CIDP, BS>,
482) -> Result<BabeWorker<B>, ConsensusError>
483where
484	B: BlockT,
485	C: ProvideRuntimeApi<B>
486		+ HeaderBackend<B>
487		+ HeaderMetadata<B, Error = ClientError>
488		+ Send
489		+ Sync
490		+ 'static,
491	C::Api: BabeApi<B>,
492	SC: SelectChain<B> + 'static,
493	E: Environment<B, Error = Error> + Send + Sync + 'static,
494	E::Proposer: Proposer<B, Error = Error>,
495	I: BlockImport<B, Error = ConsensusError> + Send + Sync + 'static,
496	SO: SyncOracle + Send + Sync + Clone + 'static,
497	L: sc_consensus::JustificationSyncLink<B> + 'static,
498	CIDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
499	CIDP::InherentDataProviders: InherentDataProviderExt + Send,
500	BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
501	Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
502{
503	let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
504
505	let worker = BabeSlotWorker {
506		client: client.clone(),
507		block_import,
508		env,
509		sync_oracle: sync_oracle.clone(),
510		justification_sync_link,
511		force_authoring,
512		backoff_authoring_blocks,
513		keystore,
514		epoch_changes: babe_link.epoch_changes.clone(),
515		slot_notification_sinks: slot_notification_sinks.clone(),
516		config: babe_link.config.clone(),
517		block_proposal_slot_portion,
518		max_block_proposal_slot_portion,
519		telemetry,
520	};
521
522	info!(target: LOG_TARGET, "๐Ÿ‘ถ Starting BABE Authorship worker");
523
524	let slot_worker = sc_consensus_slots::start_slot_worker(
525		babe_link.config.slot_duration(),
526		select_chain,
527		sc_consensus_slots::SimpleSlotWorkerToSlotWorker(worker),
528		sync_oracle,
529		create_inherent_data_providers,
530	);
531
532	Ok(BabeWorker { inner: Box::pin(slot_worker), slot_notification_sinks })
533}
534
535// Remove obsolete block's weight data by leveraging finality notifications.
536// This includes data for all finalized blocks (excluding the most recent one)
537// and all stale branches.
538fn aux_storage_cleanup<C: HeaderMetadata<Block> + HeaderBackend<Block>, Block: BlockT>(
539	client: &C,
540	notification: &FinalityNotification<Block>,
541) -> AuxDataOperations {
542	let mut hashes = HashSet::new();
543
544	let first = notification.tree_route.first().unwrap_or(&notification.hash);
545	match client.header_metadata(*first) {
546		Ok(meta) => {
547			hashes.insert(meta.parent);
548		},
549		Err(err) => {
550			warn!(target: LOG_TARGET, "Failed to lookup metadata for block `{:?}`: {}", first, err,)
551		},
552	}
553
554	// Cleans data for finalized block's ancestors
555	hashes.extend(
556		notification
557			.tree_route
558			.iter()
559			// Ensure we don't prune latest finalized block.
560			// This should not happen, but better be safe than sorry!
561			.filter(|h| **h != notification.hash),
562	);
563
564	// Cleans data for stale forks.
565	let stale_forks = match client.expand_forks(&notification.stale_heads) {
566		Ok(stale_forks) => stale_forks,
567		Err(e) => {
568			warn!(target: LOG_TARGET, "{:?}", e);
569
570			Default::default()
571		},
572	};
573	hashes.extend(stale_forks.iter());
574
575	hashes
576		.into_iter()
577		.map(|val| (aux_schema::block_weight_key(val), None))
578		.collect()
579}
580
581async fn answer_requests<B: BlockT, C>(
582	mut request_rx: Receiver<BabeRequest<B>>,
583	config: BabeConfiguration,
584	client: Arc<C>,
585	epoch_changes: SharedEpochChanges<B, Epoch>,
586) where
587	C: HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
588{
589	while let Some(request) = request_rx.next().await {
590		match request {
591			BabeRequest::EpochData(response) => {
592				let _ = response.send(epoch_changes.shared_data().clone());
593			},
594			BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, response) => {
595				let lookup = || {
596					let epoch_changes = epoch_changes.shared_data();
597					epoch_changes
598						.epoch_data_for_child_of(
599							descendent_query(&*client),
600							&parent_hash,
601							parent_number,
602							slot,
603							|slot| Epoch::genesis(&config, slot),
604						)
605						.map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
606						.ok_or(Error::<B>::FetchEpoch(parent_hash))
607				};
608
609				let _ = response.send(lookup());
610			},
611		}
612	}
613}
614
615/// Requests to the BABE service.
616enum BabeRequest<B: BlockT> {
617	/// Request all available epoch data.
618	EpochData(oneshot::Sender<EpochChangesFor<B, Epoch>>),
619	/// Request the epoch that a child of the given block, with the given slot number would have.
620	///
621	/// The parent block is identified by its hash and number.
622	EpochDataForChildOf(B::Hash, NumberFor<B>, Slot, oneshot::Sender<Result<Epoch, Error<B>>>),
623}
624
625/// A handle to the BABE worker for issuing requests.
626#[derive(Clone)]
627pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);
628
629impl<B: BlockT> BabeWorkerHandle<B> {
630	async fn send_request(&self, request: BabeRequest<B>) -> Result<(), Error<B>> {
631		match self.0.clone().send(request).await {
632			Err(err) if err.is_disconnected() => return Err(Error::BackgroundWorkerTerminated),
633			Err(err) => warn!(
634				target: LOG_TARGET,
635				"Unhandled error when sending request to worker: {:?}", err
636			),
637			_ => {},
638		}
639
640		Ok(())
641	}
642
643	/// Fetch all available epoch data.
644	pub async fn epoch_data(&self) -> Result<EpochChangesFor<B, Epoch>, Error<B>> {
645		let (tx, rx) = oneshot::channel();
646		self.send_request(BabeRequest::EpochData(tx)).await?;
647
648		rx.await.or(Err(Error::BackgroundWorkerTerminated))
649	}
650
651	/// Fetch the epoch that a child of the given block, with the given slot number would have.
652	///
653	/// The parent block is identified by its hash and number.
654	pub async fn epoch_data_for_child_of(
655		&self,
656		parent_hash: B::Hash,
657		parent_number: NumberFor<B>,
658		slot: Slot,
659	) -> Result<Epoch, Error<B>> {
660		let (tx, rx) = oneshot::channel();
661		self.send_request(BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, tx))
662			.await?;
663
664		rx.await.or(Err(Error::BackgroundWorkerTerminated))?
665	}
666}
667
668/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
669#[must_use]
670pub struct BabeWorker<B: BlockT> {
671	inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
672	slot_notification_sinks: SlotNotificationSinks<B>,
673}
674
675impl<B: BlockT> BabeWorker<B> {
676	/// Return an event stream of notifications for when new slot happens, and the corresponding
677	/// epoch descriptor.
678	pub fn slot_notification_stream(
679		&self,
680	) -> Receiver<(Slot, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
681		const CHANNEL_BUFFER_SIZE: usize = 1024;
682
683		let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
684		self.slot_notification_sinks.lock().push(sink);
685		stream
686	}
687}
688
689impl<B: BlockT> Future for BabeWorker<B> {
690	type Output = ();
691
692	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
693		self.inner.as_mut().poll(cx)
694	}
695}
696
697/// Slot notification sinks.
698type SlotNotificationSinks<B> = Arc<
699	Mutex<Vec<Sender<(Slot, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>,
700>;
701
702struct BabeSlotWorker<B: BlockT, C, E, I, SO, L, BS> {
703	client: Arc<C>,
704	block_import: I,
705	env: E,
706	sync_oracle: SO,
707	justification_sync_link: L,
708	force_authoring: bool,
709	backoff_authoring_blocks: Option<BS>,
710	keystore: KeystorePtr,
711	epoch_changes: SharedEpochChanges<B, Epoch>,
712	slot_notification_sinks: SlotNotificationSinks<B>,
713	config: BabeConfiguration,
714	block_proposal_slot_portion: SlotProportion,
715	max_block_proposal_slot_portion: Option<SlotProportion>,
716	telemetry: Option<TelemetryHandle>,
717}
718
719#[async_trait::async_trait]
720impl<B, C, E, I, Error, SO, L, BS> sc_consensus_slots::SimpleSlotWorker<B>
721	for BabeSlotWorker<B, C, E, I, SO, L, BS>
722where
723	B: BlockT,
724	C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
725	C::Api: BabeApi<B>,
726	E: Environment<B, Error = Error> + Send + Sync,
727	E::Proposer: Proposer<B, Error = Error>,
728	I: BlockImport<B> + Send + Sync + 'static,
729	SO: SyncOracle + Send + Clone + Sync,
730	L: sc_consensus::JustificationSyncLink<B>,
731	BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync,
732	Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
733{
734	type Claim = (PreDigest, AuthorityId);
735	type SyncOracle = SO;
736	type JustificationSyncLink = L;
737	type CreateProposer =
738		Pin<Box<dyn Future<Output = Result<E::Proposer, ConsensusError>> + Send + 'static>>;
739	type Proposer = E::Proposer;
740	type BlockImport = I;
741	type AuxData = ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>;
742
743	fn logging_target(&self) -> &'static str {
744		LOG_TARGET
745	}
746
747	fn block_import(&mut self) -> &mut Self::BlockImport {
748		&mut self.block_import
749	}
750
751	fn aux_data(&self, parent: &B::Header, slot: Slot) -> Result<Self::AuxData, ConsensusError> {
752		self.epoch_changes
753			.shared_data()
754			.epoch_descriptor_for_child_of(
755				descendent_query(&*self.client),
756				&parent.hash(),
757				*parent.number(),
758				slot,
759			)
760			.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
761			.ok_or(ConsensusError::InvalidAuthoritiesSet)
762	}
763
764	fn authorities_len(&self, epoch_descriptor: &Self::AuxData) -> Option<usize> {
765		self.epoch_changes
766			.shared_data()
767			.viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
768			.map(|epoch| epoch.as_ref().authorities.len())
769	}
770
771	async fn claim_slot(
772		&mut self,
773		_parent_header: &B::Header,
774		slot: Slot,
775		epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
776	) -> Option<Self::Claim> {
777		debug!(target: LOG_TARGET, "Attempting to claim slot {}", slot);
778		let s = authorship::claim_slot(
779			slot,
780			self.epoch_changes
781				.shared_data()
782				.viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))?
783				.as_ref(),
784			&self.keystore,
785		);
786
787		if s.is_some() {
788			debug!(target: LOG_TARGET, "Claimed slot {}", slot);
789		}
790
791		s
792	}
793
794	fn notify_slot(
795		&self,
796		_parent_header: &B::Header,
797		slot: Slot,
798		epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
799	) {
800		let sinks = &mut self.slot_notification_sinks.lock();
801		sinks.retain_mut(|sink| match sink.try_send((slot, epoch_descriptor.clone())) {
802			Ok(()) => true,
803			Err(e) =>
804				if e.is_full() {
805					warn!(target: LOG_TARGET, "Trying to notify a slot but the channel is full");
806					true
807				} else {
808					false
809				},
810		});
811	}
812
813	fn pre_digest_data(&self, _slot: Slot, claim: &Self::Claim) -> Vec<sp_runtime::DigestItem> {
814		vec![<DigestItem as CompatibleDigestItem>::babe_pre_digest(claim.0.clone())]
815	}
816
817	async fn block_import_params(
818		&self,
819		header: B::Header,
820		header_hash: &B::Hash,
821		body: Vec<B::Extrinsic>,
822		storage_changes: StorageChanges<B>,
823		(_, public): Self::Claim,
824		epoch_descriptor: Self::AuxData,
825	) -> Result<BlockImportParams<B>, ConsensusError> {
826		let signature = self
827			.keystore
828			.sr25519_sign(<AuthorityId as AppCrypto>::ID, public.as_ref(), header_hash.as_ref())
829			.map_err(|e| ConsensusError::CannotSign(format!("{}. Key: {:?}", e, public)))?
830			.ok_or_else(|| {
831				ConsensusError::CannotSign(format!(
832					"Could not find key in keystore. Key: {:?}",
833					public
834				))
835			})?;
836
837		let digest_item = <DigestItem as CompatibleDigestItem>::babe_seal(signature.into());
838
839		let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
840		import_block.post_digests.push(digest_item);
841		import_block.body = Some(body);
842		import_block.state_action =
843			StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
844		import_block
845			.insert_intermediate(INTERMEDIATE_KEY, BabeIntermediate::<B> { epoch_descriptor });
846
847		Ok(import_block)
848	}
849
850	fn force_authoring(&self) -> bool {
851		self.force_authoring
852	}
853
854	fn should_backoff(&self, slot: Slot, chain_head: &B::Header) -> bool {
855		if let Some(ref strategy) = self.backoff_authoring_blocks {
856			if let Ok(chain_head_slot) =
857				find_pre_digest::<B>(chain_head).map(|digest| digest.slot())
858			{
859				return strategy.should_backoff(
860					*chain_head.number(),
861					chain_head_slot,
862					self.client.info().finalized_number,
863					slot,
864					self.logging_target(),
865				)
866			}
867		}
868		false
869	}
870
871	fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
872		&mut self.sync_oracle
873	}
874
875	fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
876		&mut self.justification_sync_link
877	}
878
879	fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
880		Box::pin(self.env.init(block).map_err(|e| ConsensusError::ClientImport(e.to_string())))
881	}
882
883	fn telemetry(&self) -> Option<TelemetryHandle> {
884		self.telemetry.clone()
885	}
886
887	fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
888		let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());
889
890		sc_consensus_slots::proposing_remaining_duration(
891			parent_slot,
892			slot_info,
893			&self.block_proposal_slot_portion,
894			self.max_block_proposal_slot_portion.as_ref(),
895			sc_consensus_slots::SlotLenienceType::Exponential,
896			self.logging_target(),
897		)
898	}
899}
900
901/// Extract the BABE pre digest from the given header. Pre-runtime digests are
902/// mandatory, the function will return `Err` if none is found.
903pub fn find_pre_digest<B: BlockT>(header: &B::Header) -> Result<PreDigest, Error<B>> {
904	// genesis block doesn't contain a pre digest so let's generate a
905	// dummy one to not break any invariants in the rest of the code
906	if header.number().is_zero() {
907		return Ok(PreDigest::SecondaryPlain(SecondaryPlainPreDigest {
908			slot: 0.into(),
909			authority_index: 0,
910		}))
911	}
912
913	let mut pre_digest: Option<_> = None;
914	for log in header.digest().logs() {
915		trace!(target: LOG_TARGET, "Checking log {:?}, looking for pre runtime digest", log);
916		match (log.as_babe_pre_digest(), pre_digest.is_some()) {
917			(Some(_), true) => return Err(babe_err(Error::MultiplePreRuntimeDigests)),
918			(None, _) => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
919			(s, false) => pre_digest = s,
920		}
921	}
922	pre_digest.ok_or_else(|| babe_err(Error::NoPreRuntimeDigest))
923}
924
925/// Extract the BABE epoch change digest from the given header, if it exists.
926pub fn find_next_epoch_digest<B: BlockT>(
927	header: &B::Header,
928) -> Result<Option<NextEpochDescriptor>, Error<B>> {
929	let mut epoch_digest: Option<_> = None;
930	for log in header.digest().logs() {
931		trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
932		let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
933		match (log, epoch_digest.is_some()) {
934			(Some(ConsensusLog::NextEpochData(_)), true) =>
935				return Err(babe_err(Error::MultipleEpochChangeDigests)),
936			(Some(ConsensusLog::NextEpochData(epoch)), false) => epoch_digest = Some(epoch),
937			_ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
938		}
939	}
940
941	Ok(epoch_digest)
942}
943
944/// Extract the BABE config change digest from the given header, if it exists.
945fn find_next_config_digest<B: BlockT>(
946	header: &B::Header,
947) -> Result<Option<NextConfigDescriptor>, Error<B>> {
948	let mut config_digest: Option<_> = None;
949	for log in header.digest().logs() {
950		trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
951		let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
952		match (log, config_digest.is_some()) {
953			(Some(ConsensusLog::NextConfigData(_)), true) =>
954				return Err(babe_err(Error::MultipleConfigChangeDigests)),
955			(Some(ConsensusLog::NextConfigData(config)), false) => config_digest = Some(config),
956			_ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
957		}
958	}
959
960	Ok(config_digest)
961}
962
963/// State that must be shared between the import queue and the authoring logic.
964#[derive(Clone)]
965pub struct BabeLink<Block: BlockT> {
966	epoch_changes: SharedEpochChanges<Block, Epoch>,
967	config: BabeConfiguration,
968}
969
970impl<Block: BlockT> BabeLink<Block> {
971	/// Get the epoch changes of this link.
972	pub fn epoch_changes(&self) -> &SharedEpochChanges<Block, Epoch> {
973		&self.epoch_changes
974	}
975
976	/// Get the config of this link.
977	pub fn config(&self) -> &BabeConfiguration {
978		&self.config
979	}
980}
981
982/// A verifier for Babe blocks.
983pub struct BabeVerifier<Block: BlockT, Client> {
984	client: Arc<Client>,
985	slot_duration: SlotDuration,
986	config: BabeConfiguration,
987	epoch_changes: SharedEpochChanges<Block, Epoch>,
988	telemetry: Option<TelemetryHandle>,
989}
990
991#[async_trait::async_trait]
992impl<Block, Client> Verifier<Block> for BabeVerifier<Block, Client>
993where
994	Block: BlockT,
995	Client: HeaderMetadata<Block, Error = sp_blockchain::Error>
996		+ HeaderBackend<Block>
997		+ ProvideRuntimeApi<Block>
998		+ Send
999		+ Sync
1000		+ AuxStore,
1001	Client::Api: BlockBuilderApi<Block> + BabeApi<Block>,
1002{
1003	async fn verify(
1004		&self,
1005		mut block: BlockImportParams<Block>,
1006	) -> Result<BlockImportParams<Block>, String> {
1007		trace!(
1008			target: LOG_TARGET,
1009			"Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}",
1010			block.origin,
1011			block.header,
1012			block.justifications,
1013			block.body,
1014		);
1015
1016		let hash = block.header.hash();
1017		let parent_hash = *block.header.parent_hash();
1018
1019		let number = block.header.number();
1020
1021		if is_state_sync_or_gap_sync_import(&*self.client, &block) {
1022			return Ok(block)
1023		}
1024
1025		debug!(
1026			target: LOG_TARGET,
1027			"We have {:?} logs in this header",
1028			block.header.digest().logs().len()
1029		);
1030
1031		let slot_now = Slot::from_timestamp(Timestamp::current(), self.slot_duration);
1032
1033		let pre_digest = find_pre_digest::<Block>(&block.header)?;
1034		let (check_header, epoch_descriptor) = {
1035			let (epoch_descriptor, viable_epoch) = query_epoch_changes(
1036				&self.epoch_changes,
1037				self.client.as_ref(),
1038				&self.config,
1039				*number,
1040				pre_digest.slot(),
1041				parent_hash,
1042			)?;
1043
1044			// We add one to the current slot to allow for some small drift.
1045			// FIXME #1019 in the future, alter this queue to allow deferring of headers
1046			let v_params = verification::VerificationParams {
1047				header: block.header.clone(),
1048				pre_digest: Some(pre_digest),
1049				slot_now: slot_now + 1,
1050				epoch: viable_epoch.as_ref(),
1051			};
1052
1053			(verification::check_header::<Block>(v_params)?, epoch_descriptor)
1054		};
1055
1056		match check_header {
1057			CheckedHeader::Checked(pre_header, verified_info) => {
1058				trace!(target: LOG_TARGET, "Checked {:?}; importing.", pre_header);
1059				telemetry!(
1060					self.telemetry;
1061					CONSENSUS_TRACE;
1062					"babe.checked_and_importing";
1063					"pre_header" => ?pre_header,
1064				);
1065
1066				block.header = pre_header;
1067				block.post_digests.push(verified_info.seal);
1068				block.insert_intermediate(
1069					INTERMEDIATE_KEY,
1070					BabeIntermediate::<Block> { epoch_descriptor },
1071				);
1072				block.post_hash = Some(hash);
1073
1074				Ok(block)
1075			},
1076			CheckedHeader::Deferred(a, b) => {
1077				debug!(target: LOG_TARGET, "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
1078				telemetry!(
1079					self.telemetry;
1080					CONSENSUS_DEBUG;
1081					"babe.header_too_far_in_future";
1082					"hash" => ?hash, "a" => ?a, "b" => ?b
1083				);
1084				Err(Error::<Block>::TooFarInFuture(hash).into())
1085			},
1086		}
1087	}
1088}
1089
1090/// Verification for imported blocks is skipped in two cases:
1091/// 1. When importing blocks below the last finalized block during network initial synchronization.
1092/// 2. When importing whole state we don't calculate epoch descriptor, but rather read it from the
1093///    state after import. We also skip all verifications because there's no parent state and we
1094///    trust the sync module to verify that the state is correct and finalized.
1095fn is_state_sync_or_gap_sync_import<B: BlockT>(
1096	client: &impl HeaderBackend<B>,
1097	block: &BlockImportParams<B>,
1098) -> bool {
1099	let number = *block.header.number();
1100	let info = client.info();
1101	info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) ||
1102		block.with_state()
1103}
1104
1105/// A block-import handler for BABE.
1106///
1107/// This scans each imported block for epoch change signals. The signals are
1108/// tracked in a tree (of all forks), and the import logic validates all epoch
1109/// change transitions, i.e. whether a given epoch change is expected or whether
1110/// it is missing.
1111///
1112/// The epoch change tree should be pruned as blocks are finalized.
1113pub struct BabeBlockImport<Block: BlockT, Client, I, CIDP, SC> {
1114	inner: I,
1115	client: Arc<Client>,
1116	epoch_changes: SharedEpochChanges<Block, Epoch>,
1117	create_inherent_data_providers: CIDP,
1118	config: BabeConfiguration,
1119	// A [`SelectChain`] implementation.
1120	//
1121	// Used to determine the best block that should be used as basis when sending an equivocation
1122	// report.
1123	select_chain: SC,
1124	// The offchain transaction pool factory.
1125	//
1126	// Will be used when sending equivocation reports.
1127	offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1128}
1129
1130impl<Block: BlockT, I: Clone, Client, CIDP: Clone, SC: Clone> Clone
1131	for BabeBlockImport<Block, Client, I, CIDP, SC>
1132{
1133	fn clone(&self) -> Self {
1134		BabeBlockImport {
1135			inner: self.inner.clone(),
1136			client: self.client.clone(),
1137			epoch_changes: self.epoch_changes.clone(),
1138			config: self.config.clone(),
1139			create_inherent_data_providers: self.create_inherent_data_providers.clone(),
1140			select_chain: self.select_chain.clone(),
1141			offchain_tx_pool_factory: self.offchain_tx_pool_factory.clone(),
1142		}
1143	}
1144}
1145
1146impl<Block: BlockT, Client, I, CIDP, SC> BabeBlockImport<Block, Client, I, CIDP, SC> {
1147	fn new(
1148		client: Arc<Client>,
1149		epoch_changes: SharedEpochChanges<Block, Epoch>,
1150		block_import: I,
1151		config: BabeConfiguration,
1152		create_inherent_data_providers: CIDP,
1153		select_chain: SC,
1154		offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1155	) -> Self {
1156		BabeBlockImport {
1157			client,
1158			inner: block_import,
1159			epoch_changes,
1160			config,
1161			create_inherent_data_providers,
1162			select_chain,
1163			offchain_tx_pool_factory,
1164		}
1165	}
1166}
1167
1168impl<Block, Client, Inner, CIDP, SC> BabeBlockImport<Block, Client, Inner, CIDP, SC>
1169where
1170	Block: BlockT,
1171	Inner: BlockImport<Block> + Send + Sync,
1172	Inner::Error: Into<ConsensusError>,
1173	Client: HeaderBackend<Block>
1174		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1175		+ AuxStore
1176		+ ProvideRuntimeApi<Block>
1177		+ Send
1178		+ Sync,
1179	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1180	CIDP: CreateInherentDataProviders<Block, ()>,
1181	CIDP::InherentDataProviders: InherentDataProviderExt + Send,
1182	SC: sp_consensus::SelectChain<Block> + 'static,
1183{
1184	/// Import whole state after warp sync.
1185	// This function makes multiple transactions to the DB. If one of them fails we may
1186	// end up in an inconsistent state and have to resync.
1187	async fn import_state(
1188		&self,
1189		mut block: BlockImportParams<Block>,
1190	) -> Result<ImportResult, ConsensusError> {
1191		let hash = block.post_hash();
1192		let parent_hash = *block.header.parent_hash();
1193		let number = *block.header.number();
1194
1195		block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
1196		// Reset block weight.
1197		aux_schema::write_block_weight(hash, 0, |values| {
1198			block
1199				.auxiliary
1200				.extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1201		});
1202
1203		// First make the client import the state.
1204		let import_result = self.inner.import_block(block).await;
1205		let aux = match import_result {
1206			Ok(ImportResult::Imported(aux)) => aux,
1207			Ok(r) =>
1208				return Err(ConsensusError::ClientImport(format!(
1209					"Unexpected import result: {:?}",
1210					r
1211				))),
1212			Err(r) => return Err(r.into()),
1213		};
1214
1215		// Read epoch info from the imported state.
1216		let current_epoch = self.client.runtime_api().current_epoch(hash).map_err(|e| {
1217			ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1218		})?;
1219		let next_epoch = self.client.runtime_api().next_epoch(hash).map_err(|e| {
1220			ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1221		})?;
1222
1223		let mut epoch_changes = self.epoch_changes.shared_data_locked();
1224		epoch_changes.reset(parent_hash, hash, number, current_epoch.into(), next_epoch.into());
1225		aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1226			self.client.insert_aux(insert, [])
1227		})
1228		.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1229
1230		Ok(ImportResult::Imported(aux))
1231	}
1232
1233	/// Check the inherents and equivocations.
1234	async fn check_inherents_and_equivocations(
1235		&self,
1236		block: &mut BlockImportParams<Block>,
1237	) -> Result<(), ConsensusError> {
1238		if is_state_sync_or_gap_sync_import(&*self.client, block) {
1239			return Ok(())
1240		}
1241
1242		let parent_hash = *block.header.parent_hash();
1243		let number = *block.header.number();
1244
1245		let create_inherent_data_providers = self
1246			.create_inherent_data_providers
1247			.create_inherent_data_providers(parent_hash, ())
1248			.await?;
1249
1250		let slot_now = create_inherent_data_providers.slot();
1251
1252		let babe_pre_digest = find_pre_digest::<Block>(&block.header)
1253			.map_err(|e| ConsensusError::Other(Box::new(e)))?;
1254		let slot = babe_pre_digest.slot();
1255
1256		// Check inherents.
1257		self.check_inherents(block, parent_hash, slot, create_inherent_data_providers)
1258			.await?;
1259
1260		// Check for equivocation and report it to the runtime if needed.
1261		let author = {
1262			let viable_epoch = query_epoch_changes(
1263				&self.epoch_changes,
1264				self.client.as_ref(),
1265				&self.config,
1266				number,
1267				slot,
1268				parent_hash,
1269			)
1270			.map_err(|e| ConsensusError::Other(babe_err(e).into()))?
1271			.1;
1272			match viable_epoch
1273				.as_ref()
1274				.authorities
1275				.get(babe_pre_digest.authority_index() as usize)
1276			{
1277				Some(author) => author.0.clone(),
1278				None =>
1279					return Err(ConsensusError::Other(Error::<Block>::SlotAuthorNotFound.into())),
1280			}
1281		};
1282		if let Err(err) = self
1283			.check_and_report_equivocation(slot_now, slot, &block.header, &author, &block.origin)
1284			.await
1285		{
1286			warn!(
1287				target: LOG_TARGET,
1288				"Error checking/reporting BABE equivocation: {}", err
1289			);
1290		}
1291		Ok(())
1292	}
1293
1294	async fn check_inherents(
1295		&self,
1296		block: &mut BlockImportParams<Block>,
1297		at_hash: Block::Hash,
1298		slot: Slot,
1299		create_inherent_data_providers: CIDP::InherentDataProviders,
1300	) -> Result<(), ConsensusError> {
1301		if block.state_action.skip_execution_checks() {
1302			return Ok(())
1303		}
1304
1305		if let Some(inner_body) = block.body.take() {
1306			let new_block = Block::new(block.header.clone(), inner_body);
1307			// if the body is passed through and the block was executed,
1308			// we need to use the runtime to check that the internally-set
1309			// timestamp in the inherents actually matches the slot set in the seal.
1310			let mut inherent_data = create_inherent_data_providers
1311				.create_inherent_data()
1312				.await
1313				.map_err(|e| ConsensusError::Other(Box::new(e)))?;
1314			inherent_data.babe_replace_inherent_data(slot);
1315
1316			use sp_block_builder::CheckInherentsError;
1317
1318			sp_block_builder::check_inherents_with_data(
1319				self.client.clone(),
1320				at_hash,
1321				new_block.clone(),
1322				&create_inherent_data_providers,
1323				inherent_data,
1324			)
1325			.await
1326			.map_err(|e| {
1327				ConsensusError::Other(Box::new(match e {
1328					CheckInherentsError::CreateInherentData(e) =>
1329						Error::<Block>::CreateInherents(e),
1330					CheckInherentsError::Client(e) => Error::RuntimeApi(e),
1331					CheckInherentsError::CheckInherents(e) => Error::CheckInherents(e),
1332					CheckInherentsError::CheckInherentsUnknownError(id) =>
1333						Error::CheckInherentsUnhandled(id),
1334				}))
1335			})?;
1336			let (_, inner_body) = new_block.deconstruct();
1337			block.body = Some(inner_body);
1338		}
1339
1340		Ok(())
1341	}
1342
1343	async fn check_and_report_equivocation(
1344		&self,
1345		slot_now: Slot,
1346		slot: Slot,
1347		header: &Block::Header,
1348		author: &AuthorityId,
1349		origin: &BlockOrigin,
1350	) -> Result<(), Error<Block>> {
1351		// don't report any equivocations during initial sync
1352		// as they are most likely stale.
1353		if *origin == BlockOrigin::NetworkInitialSync {
1354			return Ok(())
1355		}
1356
1357		// check if authorship of this header is an equivocation and return a proof if so.
1358		let Some(equivocation_proof) =
1359			check_equivocation(&*self.client, slot_now, slot, header, author)
1360				.map_err(Error::Client)?
1361		else {
1362			return Ok(())
1363		};
1364
1365		info!(
1366			"Slot author {:?} is equivocating at slot {} with headers {:?} and {:?}",
1367			author,
1368			slot,
1369			equivocation_proof.first_header.hash(),
1370			equivocation_proof.second_header.hash(),
1371		);
1372
1373		// get the best block on which we will build and send the equivocation report.
1374		let best_hash = self
1375			.select_chain
1376			.best_chain()
1377			.await
1378			.map(|h| h.hash())
1379			.map_err(|e| Error::Client(e.into()))?;
1380
1381		// generate a key ownership proof. we start by trying to generate the
1382		// key ownership proof at the parent of the equivocating header, this
1383		// will make sure that proof generation is successful since it happens
1384		// during the on-going session (i.e. session keys are available in the
1385		// state to be able to generate the proof). this might fail if the
1386		// equivocation happens on the first block of the session, in which case
1387		// its parent would be on the previous session. if generation on the
1388		// parent header fails we try with best block as well.
1389		let generate_key_owner_proof = |at_hash: Block::Hash| {
1390			self.client
1391				.runtime_api()
1392				.generate_key_ownership_proof(at_hash, slot, equivocation_proof.offender.clone())
1393				.map_err(Error::RuntimeApi)
1394		};
1395
1396		let parent_hash = *header.parent_hash();
1397		let key_owner_proof = match generate_key_owner_proof(parent_hash)? {
1398			Some(proof) => proof,
1399			None => match generate_key_owner_proof(best_hash)? {
1400				Some(proof) => proof,
1401				None => {
1402					debug!(
1403						target: LOG_TARGET,
1404						"Equivocation offender is not part of the authority set."
1405					);
1406					return Ok(())
1407				},
1408			},
1409		};
1410
1411		// submit equivocation report at best block.
1412		let mut runtime_api = self.client.runtime_api();
1413
1414		// Register the offchain tx pool to be able to use it from the runtime.
1415		runtime_api
1416			.register_extension(self.offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
1417
1418		runtime_api
1419			.submit_report_equivocation_unsigned_extrinsic(
1420				best_hash,
1421				equivocation_proof,
1422				key_owner_proof,
1423			)
1424			.map_err(Error::RuntimeApi)?;
1425
1426		info!(target: LOG_TARGET, "Submitted equivocation report for author {:?}", author);
1427
1428		Ok(())
1429	}
1430}
1431
1432#[async_trait::async_trait]
1433impl<Block, Client, Inner, CIDP, SC> BlockImport<Block>
1434	for BabeBlockImport<Block, Client, Inner, CIDP, SC>
1435where
1436	Block: BlockT,
1437	Inner: BlockImport<Block> + Send + Sync,
1438	Inner::Error: Into<ConsensusError>,
1439	Client: HeaderBackend<Block>
1440		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1441		+ AuxStore
1442		+ ProvideRuntimeApi<Block>
1443		+ Send
1444		+ Sync,
1445	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1446	CIDP: CreateInherentDataProviders<Block, ()>,
1447	CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
1448	SC: SelectChain<Block> + 'static,
1449{
1450	type Error = ConsensusError;
1451
1452	async fn import_block(
1453		&self,
1454		mut block: BlockImportParams<Block>,
1455	) -> Result<ImportResult, Self::Error> {
1456		let hash = block.post_hash();
1457		let number = *block.header.number();
1458		let info = self.client.info();
1459
1460		self.check_inherents_and_equivocations(&mut block).await?;
1461
1462		let block_status = self
1463			.client
1464			.status(hash)
1465			.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1466
1467		// Skip babe logic if block already in chain or importing blocks during initial sync,
1468		// otherwise the check for epoch changes will error because trying to re-import an
1469		// epoch change or because of missing epoch data in the tree, respectively.
1470		if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) ||
1471			block_status == BlockStatus::InChain
1472		{
1473			// When re-importing existing block strip away intermediates.
1474			// In case of initial sync intermediates should not be present...
1475			let _ = block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY);
1476			block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1477			return self.inner.import_block(block).await.map_err(Into::into)
1478		}
1479
1480		if block.with_state() {
1481			return self.import_state(block).await
1482		}
1483
1484		let pre_digest = find_pre_digest::<Block>(&block.header).expect(
1485			"valid babe headers must contain a predigest; header has been already verified; qed",
1486		);
1487		let slot = pre_digest.slot();
1488
1489		let parent_hash = *block.header.parent_hash();
1490		let parent_header = self
1491			.client
1492			.header(parent_hash)
1493			.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1494			.ok_or_else(|| {
1495				ConsensusError::ChainLookup(
1496					babe_err(Error::<Block>::ParentUnavailable(parent_hash, hash)).into(),
1497				)
1498			})?;
1499
1500		let parent_slot = find_pre_digest::<Block>(&parent_header).map(|d| d.slot()).expect(
1501			"parent is non-genesis; valid BABE headers contain a pre-digest; header has already \
1502			 been verified; qed",
1503		);
1504
1505		// make sure that slot number is strictly increasing
1506		if slot <= parent_slot {
1507			return Err(ConsensusError::ClientImport(
1508				babe_err(Error::<Block>::SlotMustIncrease(parent_slot, slot)).into(),
1509			))
1510		}
1511
1512		// if there's a pending epoch we'll save the previous epoch changes here
1513		// this way we can revert it if there's any error
1514		let mut old_epoch_changes = None;
1515
1516		// Use an extra scope to make the compiler happy, because otherwise it complains about the
1517		// mutex, even if we dropped it...
1518		let mut epoch_changes = {
1519			let mut epoch_changes = self.epoch_changes.shared_data_locked();
1520
1521			// check if there's any epoch change expected to happen at this slot.
1522			// `epoch` is the epoch to verify the block under, and `first_in_epoch` is true
1523			// if this is the first block in its chain for that epoch.
1524			//
1525			// also provides the total weight of the chain, including the imported block.
1526			let (epoch_descriptor, first_in_epoch, parent_weight) = {
1527				let parent_weight = if *parent_header.number() == Zero::zero() {
1528					0
1529				} else {
1530					aux_schema::load_block_weight(&*self.client, parent_hash)
1531						.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1532						.ok_or_else(|| {
1533							ConsensusError::ClientImport(
1534								babe_err(Error::<Block>::ParentBlockNoAssociatedWeight(hash))
1535									.into(),
1536							)
1537						})?
1538				};
1539
1540				let intermediate =
1541					block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)?;
1542
1543				let epoch_descriptor = intermediate.epoch_descriptor;
1544				let first_in_epoch = parent_slot < epoch_descriptor.start_slot();
1545				(epoch_descriptor, first_in_epoch, parent_weight)
1546			};
1547
1548			let total_weight = parent_weight + pre_digest.added_weight();
1549
1550			// search for this all the time so we can reject unexpected announcements.
1551			let next_epoch_digest = find_next_epoch_digest::<Block>(&block.header)
1552				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1553			let next_config_digest = find_next_config_digest::<Block>(&block.header)
1554				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1555
1556			match (first_in_epoch, next_epoch_digest.is_some(), next_config_digest.is_some()) {
1557				(true, true, _) => {},
1558				(false, false, false) => {},
1559				(false, false, true) =>
1560					return Err(ConsensusError::ClientImport(
1561						babe_err(Error::<Block>::UnexpectedConfigChange).into(),
1562					)),
1563				(true, false, _) =>
1564					return Err(ConsensusError::ClientImport(
1565						babe_err(Error::<Block>::ExpectedEpochChange(hash, slot)).into(),
1566					)),
1567				(false, true, _) =>
1568					return Err(ConsensusError::ClientImport(
1569						babe_err(Error::<Block>::UnexpectedEpochChange).into(),
1570					)),
1571			}
1572
1573			if let Some(next_epoch_descriptor) = next_epoch_digest {
1574				old_epoch_changes = Some((*epoch_changes).clone());
1575
1576				let mut viable_epoch = epoch_changes
1577					.viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
1578					.ok_or_else(|| {
1579						ConsensusError::ClientImport(Error::<Block>::FetchEpoch(parent_hash).into())
1580					})?
1581					.into_cloned();
1582
1583				let epoch_config = next_config_digest
1584					.map(Into::into)
1585					.unwrap_or_else(|| viable_epoch.as_ref().config.clone());
1586
1587				// restrict info logging during initial sync to avoid spam
1588				let log_level = if block.origin == BlockOrigin::NetworkInitialSync {
1589					log::Level::Debug
1590				} else {
1591					log::Level::Info
1592				};
1593
1594				if viable_epoch.as_ref().end_slot() <= slot {
1595					// Some epochs must have been skipped as our current slot fits outside the
1596					// current epoch. We will figure out which epoch it belongs to and we will
1597					// re-use the same data for that epoch.
1598					// Notice that we are only updating a local copy of the `Epoch`, this
1599					// makes it so that when we insert the next epoch into `EpochChanges` below
1600					// (after incrementing it), it will use the correct epoch index and start slot.
1601					// We do not update the original epoch that will be re-used because there might
1602					// be other forks (that we haven't imported) where the epoch isn't skipped, and
1603					// to import those forks we want to keep the original epoch data. Not updating
1604					// the original epoch works because when we search the tree for which epoch to
1605					// use for a given slot, we will search in-depth with the predicate
1606					// `epoch.start_slot <= slot` which will still match correctly without updating
1607					// `start_slot` to the correct value as below.
1608					let epoch = viable_epoch.as_mut();
1609					let prev_index = epoch.epoch_index;
1610					*epoch = epoch.clone_for_slot(slot);
1611
1612					warn!(
1613						target: LOG_TARGET,
1614						"๐Ÿ‘ถ Epoch(s) skipped: from {} to {}", prev_index, epoch.epoch_index,
1615					);
1616				}
1617
1618				log!(
1619					target: LOG_TARGET,
1620					log_level,
1621					"๐Ÿ‘ถ New epoch {} launching at block {} (block slot {} >= start slot {}).",
1622					viable_epoch.as_ref().epoch_index,
1623					hash,
1624					slot,
1625					viable_epoch.as_ref().start_slot,
1626				);
1627
1628				let next_epoch = viable_epoch.increment((next_epoch_descriptor, epoch_config));
1629
1630				log!(
1631					target: LOG_TARGET,
1632					log_level,
1633					"๐Ÿ‘ถ Next epoch starts at slot {}",
1634					next_epoch.as_ref().start_slot,
1635				);
1636
1637				// prune the tree of epochs not part of the finalized chain or
1638				// that are not live anymore, and then track the given epoch change
1639				// in the tree.
1640				// NOTE: it is important that these operations are done in this
1641				// order, otherwise if pruning after import the `is_descendent_of`
1642				// used by pruning may not know about the block that is being
1643				// imported.
1644				let prune_and_import = || {
1645					prune_finalized(self.client.clone(), &mut epoch_changes)?;
1646
1647					epoch_changes
1648						.import(
1649							descendent_query(&*self.client),
1650							hash,
1651							number,
1652							*block.header.parent_hash(),
1653							next_epoch,
1654						)
1655						.map_err(|e| {
1656							ConsensusError::ClientImport(format!(
1657								"Error importing epoch changes: {}",
1658								e
1659							))
1660						})?;
1661					Ok(())
1662				};
1663
1664				if let Err(e) = prune_and_import() {
1665					debug!(target: LOG_TARGET, "Failed to launch next epoch: {}", e);
1666					*epoch_changes =
1667						old_epoch_changes.expect("set `Some` above and not taken; qed");
1668					return Err(e)
1669				}
1670
1671				crate::aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1672					block
1673						.auxiliary
1674						.extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1675				});
1676			}
1677
1678			aux_schema::write_block_weight(hash, total_weight, |values| {
1679				block
1680					.auxiliary
1681					.extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1682			});
1683
1684			// The fork choice rule is that we pick the heaviest chain (i.e.
1685			// more primary blocks), if there's a tie we go with the longest
1686			// chain.
1687			block.fork_choice = {
1688				let (last_best, last_best_number) = (info.best_hash, info.best_number);
1689
1690				let last_best_weight = if &last_best == block.header.parent_hash() {
1691					// the parent=genesis case is already covered for loading parent weight,
1692					// so we don't need to cover again here.
1693					parent_weight
1694				} else {
1695					aux_schema::load_block_weight(&*self.client, last_best)
1696						.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1697						.ok_or_else(|| {
1698							ConsensusError::ChainLookup(
1699								"No block weight for parent header.".to_string(),
1700							)
1701						})?
1702				};
1703
1704				Some(ForkChoiceStrategy::Custom(if total_weight > last_best_weight {
1705					true
1706				} else if total_weight == last_best_weight {
1707					number > last_best_number
1708				} else {
1709					false
1710				}))
1711			};
1712
1713			// Release the mutex, but it stays locked
1714			epoch_changes.release_mutex()
1715		};
1716
1717		let import_result = self.inner.import_block(block).await;
1718
1719		// revert to the original epoch changes in case there's an error
1720		// importing the block
1721		if import_result.is_err() {
1722			if let Some(old_epoch_changes) = old_epoch_changes {
1723				*epoch_changes.upgrade() = old_epoch_changes;
1724			}
1725		}
1726
1727		import_result.map_err(Into::into)
1728	}
1729
1730	async fn check_block(
1731		&self,
1732		block: BlockCheckParams<Block>,
1733	) -> Result<ImportResult, Self::Error> {
1734		self.inner.check_block(block).await.map_err(Into::into)
1735	}
1736}
1737
1738/// Gets the best finalized block and its slot, and prunes the given epoch tree.
1739fn prune_finalized<Block, Client>(
1740	client: Arc<Client>,
1741	epoch_changes: &mut EpochChangesFor<Block, Epoch>,
1742) -> Result<(), ConsensusError>
1743where
1744	Block: BlockT,
1745	Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
1746{
1747	let info = client.info();
1748
1749	let finalized_slot = {
1750		let finalized_header = client
1751			.header(info.finalized_hash)
1752			.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1753			.expect(
1754				"best finalized hash was given by client; finalized headers must exist in db; qed",
1755			);
1756
1757		find_pre_digest::<Block>(&finalized_header)
1758			.expect("finalized header must be valid; valid blocks have a pre-digest; qed")
1759			.slot()
1760	};
1761
1762	epoch_changes
1763		.prune_finalized(
1764			descendent_query(&*client),
1765			&info.finalized_hash,
1766			info.finalized_number,
1767			finalized_slot,
1768		)
1769		.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1770
1771	Ok(())
1772}
1773
1774/// Produce a BABE block-import object to be used later on in the construction of
1775/// an import-queue.
1776///
1777/// Also returns a link object used to correctly instantiate the import queue
1778/// and background worker.
1779pub fn block_import<Client, Block: BlockT, I, CIDP, SC>(
1780	config: BabeConfiguration,
1781	wrapped_block_import: I,
1782	client: Arc<Client>,
1783	create_inherent_data_providers: CIDP,
1784	select_chain: SC,
1785	offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1786) -> ClientResult<(BabeBlockImport<Block, Client, I, CIDP, SC>, BabeLink<Block>)>
1787where
1788	Client: AuxStore
1789		+ HeaderBackend<Block>
1790		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1791		+ PreCommitActions<Block>
1792		+ 'static,
1793{
1794	let epoch_changes = aux_schema::load_epoch_changes::<Block, _>(&*client, &config)?;
1795	let link = BabeLink { epoch_changes: epoch_changes.clone(), config: config.clone() };
1796
1797	// NOTE: this isn't entirely necessary, but since we didn't use to prune the
1798	// epoch tree it is useful as a migration, so that nodes prune long trees on
1799	// startup rather than waiting until importing the next epoch change block.
1800	prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;
1801
1802	let client_weak = Arc::downgrade(&client);
1803	let on_finality = move |summary: &FinalityNotification<Block>| {
1804		if let Some(client) = client_weak.upgrade() {
1805			aux_storage_cleanup(client.as_ref(), summary)
1806		} else {
1807			Default::default()
1808		}
1809	};
1810	client.register_finality_action(Box::new(on_finality));
1811
1812	let import = BabeBlockImport::new(
1813		client,
1814		epoch_changes,
1815		wrapped_block_import,
1816		config,
1817		create_inherent_data_providers,
1818		select_chain,
1819		offchain_tx_pool_factory,
1820	);
1821
1822	Ok((import, link))
1823}
1824
1825/// Parameters passed to [`import_queue`].
1826pub struct ImportQueueParams<'a, Block: BlockT, BI, Client, Spawn> {
1827	/// The BABE link that is created by [`block_import`].
1828	pub link: BabeLink<Block>,
1829	/// The block import that should be wrapped.
1830	pub block_import: BI,
1831	/// Optional justification import.
1832	pub justification_import: Option<BoxJustificationImport<Block>>,
1833	/// The client to interact with the internals of the node.
1834	pub client: Arc<Client>,
1835	/// Slot duration.
1836	pub slot_duration: SlotDuration,
1837	/// Spawner for spawning futures.
1838	pub spawner: &'a Spawn,
1839	/// Registry for prometheus metrics.
1840	pub registry: Option<&'a Registry>,
1841	/// Optional telemetry handle to report telemetry events.
1842	pub telemetry: Option<TelemetryHandle>,
1843}
1844
1845/// Start an import queue for the BABE consensus algorithm.
1846///
1847/// This method returns the import queue, some data that needs to be passed to the block authoring
1848/// logic (`BabeLink`), and a future that must be run to
1849/// completion and is responsible for listening to finality notifications and
1850/// pruning the epoch changes tree.
1851///
1852/// The block import object provided must be the `BabeBlockImport` or a wrapper
1853/// of it, otherwise crucial import logic will be omitted.
1854pub fn import_queue<Block: BlockT, Client, BI, Spawn>(
1855	ImportQueueParams {
1856		link: babe_link,
1857		block_import,
1858		justification_import,
1859		client,
1860		slot_duration,
1861		spawner,
1862		registry,
1863		telemetry,
1864	}: ImportQueueParams<'_, Block, BI, Client, Spawn>,
1865) -> ClientResult<(DefaultImportQueue<Block>, BabeWorkerHandle<Block>)>
1866where
1867	BI: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1868	Client: ProvideRuntimeApi<Block>
1869		+ HeaderBackend<Block>
1870		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1871		+ AuxStore
1872		+ Send
1873		+ Sync
1874		+ 'static,
1875	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1876	Spawn: SpawnEssentialNamed,
1877{
1878	const HANDLE_BUFFER_SIZE: usize = 1024;
1879
1880	let verifier = BabeVerifier {
1881		slot_duration,
1882		config: babe_link.config.clone(),
1883		epoch_changes: babe_link.epoch_changes.clone(),
1884		telemetry,
1885		client: client.clone(),
1886	};
1887
1888	let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
1889
1890	let answer_requests =
1891		answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
1892
1893	spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());
1894
1895	Ok((
1896		BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
1897		BabeWorkerHandle(worker_tx),
1898	))
1899}
1900
1901/// Reverts protocol aux data to at most the last finalized block.
1902/// In particular, epoch-changes and block weights announced after the revert
1903/// point are removed.
1904pub fn revert<Block, Client, Backend>(
1905	client: Arc<Client>,
1906	backend: Arc<Backend>,
1907	blocks: NumberFor<Block>,
1908) -> ClientResult<()>
1909where
1910	Block: BlockT,
1911	Client: AuxStore
1912		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1913		+ HeaderBackend<Block>
1914		+ ProvideRuntimeApi<Block>
1915		+ UsageProvider<Block>,
1916	Client::Api: BabeApi<Block>,
1917	Backend: BackendT<Block>,
1918{
1919	let best_number = client.info().best_number;
1920	let finalized = client.info().finalized_number;
1921
1922	let revertible = blocks.min(best_number - finalized);
1923	if revertible == Zero::zero() {
1924		return Ok(())
1925	}
1926
1927	let revert_up_to_number = best_number - revertible;
1928	let revert_up_to_hash = client.hash(revert_up_to_number)?.ok_or(ClientError::Backend(
1929		format!("Unexpected hash lookup failure for block number: {}", revert_up_to_number),
1930	))?;
1931
1932	// Revert epoch changes tree.
1933
1934	// This config is only used on-genesis.
1935	let config = configuration(&*client)?;
1936	let epoch_changes = aux_schema::load_epoch_changes::<Block, Client>(&*client, &config)?;
1937	let mut epoch_changes = epoch_changes.shared_data();
1938
1939	if revert_up_to_number == Zero::zero() {
1940		// Special case, no epoch changes data were present on genesis.
1941		*epoch_changes = EpochChangesFor::<Block, Epoch>::default();
1942	} else {
1943		epoch_changes.revert(descendent_query(&*client), revert_up_to_hash, revert_up_to_number);
1944	}
1945
1946	// Remove block weights added after the revert point.
1947
1948	let mut weight_keys = HashSet::with_capacity(revertible.saturated_into());
1949
1950	let leaves = backend.blockchain().leaves()?.into_iter().filter(|&leaf| {
1951		sp_blockchain::tree_route(&*client, revert_up_to_hash, leaf)
1952			.map(|route| route.retracted().is_empty())
1953			.unwrap_or_default()
1954	});
1955
1956	for leaf in leaves {
1957		let mut hash = leaf;
1958		loop {
1959			let meta = client.header_metadata(hash)?;
1960			if meta.number <= revert_up_to_number ||
1961				!weight_keys.insert(aux_schema::block_weight_key(hash))
1962			{
1963				// We've reached the revert point or an already processed branch, stop here.
1964				break
1965			}
1966			hash = meta.parent;
1967		}
1968	}
1969
1970	let weight_keys: Vec<_> = weight_keys.iter().map(|val| val.as_slice()).collect();
1971
1972	// Write epoch changes and remove weights in one shot.
1973	aux_schema::write_epoch_changes::<Block, _, _>(&epoch_changes, |values| {
1974		client.insert_aux(values, weight_keys.iter())
1975	})
1976}
1977
1978fn query_epoch_changes<Block, Client>(
1979	epoch_changes: &SharedEpochChanges<Block, Epoch>,
1980	client: &Client,
1981	config: &BabeConfiguration,
1982	block_number: NumberFor<Block>,
1983	slot: Slot,
1984	parent_hash: Block::Hash,
1985) -> Result<
1986	(ViableEpochDescriptor<Block::Hash, NumberFor<Block>, Epoch>, ViableEpoch<Epoch>),
1987	Error<Block>,
1988>
1989where
1990	Block: BlockT,
1991	Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
1992{
1993	let epoch_changes = epoch_changes.shared_data();
1994	let epoch_descriptor = epoch_changes
1995		.epoch_descriptor_for_child_of(
1996			descendent_query(client),
1997			&parent_hash,
1998			block_number - 1u32.into(),
1999			slot,
2000		)
2001		.map_err(|e| Error::<Block>::ForkTree(Box::new(e)))?
2002		.ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2003	let viable_epoch = epoch_changes
2004		.viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&config, slot))
2005		.ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2006	Ok((epoch_descriptor, viable_epoch.into_cloned()))
2007}