#![forbid(unsafe_code)]
#![warn(missing_docs)]
use std::{
collections::HashSet,
future::Future,
ops::{Deref, DerefMut},
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use codec::{Decode, Encode};
use futures::{
channel::{
mpsc::{channel, Receiver, Sender},
oneshot,
},
prelude::*,
};
use log::{debug, info, log, trace, warn};
use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use sc_client_api::{
backend::AuxStore, AuxDataOperations, Backend as BackendT, FinalityNotification,
PreCommitActions, UsageProvider,
};
use sc_consensus::{
block_import::{
BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
StateAction,
},
import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
};
use sc_consensus_epochs::{
descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpochDescriptor,
};
use sc_consensus_slots::{
check_equivocation, BackoffAuthoringBlocksStrategy, CheckedHeader, InherentDataProviderExt,
SlotInfo, StorageChanges,
};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_application_crypto::AppCrypto;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_blockchain::{
Backend as _, BlockStatus, Error as ClientError, ForkBackend, HeaderBackend, HeaderMetadata,
Result as ClientResult,
};
use sp_consensus::{BlockOrigin, Environment, Error as ConsensusError, Proposer, SelectChain};
use sp_consensus_babe::inherents::BabeInherentData;
use sp_consensus_slots::Slot;
use sp_core::traits::SpawnEssentialNamed;
use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
use sp_keystore::KeystorePtr;
use sp_runtime::{
generic::OpaqueDigestItemId,
traits::{Block as BlockT, Header, NumberFor, SaturatedConversion, Zero},
DigestItem,
};
pub use sc_consensus_slots::SlotProportion;
pub use sp_consensus::SyncOracle;
pub use sp_consensus_babe::{
digests::{
CompatibleDigestItem, NextConfigDescriptor, NextEpochDescriptor, PreDigest,
PrimaryPreDigest, SecondaryPlainPreDigest,
},
AuthorityId, AuthorityPair, AuthoritySignature, BabeApi, BabeAuthorityWeight, BabeBlockWeight,
BabeConfiguration, BabeEpochConfiguration, ConsensusLog, Randomness, BABE_ENGINE_ID,
};
pub use aux_schema::load_block_weight as block_weight;
mod migration;
mod verification;
pub mod authorship;
pub mod aux_schema;
#[cfg(test)]
mod tests;
const LOG_TARGET: &str = "babe";
const AUTHORING_SCORE_VRF_CONTEXT: &[u8] = b"substrate-babe-vrf";
const AUTHORING_SCORE_LENGTH: usize = 16;
#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
pub struct Epoch(sp_consensus_babe::Epoch);
impl Deref for Epoch {
type Target = sp_consensus_babe::Epoch;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for Epoch {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl From<sp_consensus_babe::Epoch> for Epoch {
fn from(epoch: sp_consensus_babe::Epoch) -> Self {
Epoch(epoch)
}
}
impl EpochT for Epoch {
type NextEpochDescriptor = (NextEpochDescriptor, BabeEpochConfiguration);
type Slot = Slot;
fn increment(
&self,
(descriptor, config): (NextEpochDescriptor, BabeEpochConfiguration),
) -> Epoch {
sp_consensus_babe::Epoch {
epoch_index: self.epoch_index + 1,
start_slot: self.start_slot + self.duration,
duration: self.duration,
authorities: descriptor.authorities,
randomness: descriptor.randomness,
config,
}
.into()
}
fn start_slot(&self) -> Slot {
self.start_slot
}
fn end_slot(&self) -> Slot {
self.start_slot + self.duration
}
}
impl Epoch {
pub fn genesis(genesis_config: &BabeConfiguration, slot: Slot) -> Epoch {
sp_consensus_babe::Epoch {
epoch_index: 0,
start_slot: slot,
duration: genesis_config.epoch_length,
authorities: genesis_config.authorities.clone(),
randomness: genesis_config.randomness,
config: BabeEpochConfiguration {
c: genesis_config.c,
allowed_slots: genesis_config.allowed_slots,
},
}
.into()
}
pub fn clone_for_slot(&self, slot: Slot) -> Epoch {
let mut epoch = self.clone();
let skipped_epochs = *slot.saturating_sub(self.start_slot) / self.duration;
let epoch_index = epoch.epoch_index.checked_add(skipped_epochs).expect(
"epoch number is u64; it should be strictly smaller than number of slots; \
slots relate in some way to wall clock time; \
if u64 is not enough we should crash for safety; qed.",
);
let start_slot = skipped_epochs
.checked_mul(epoch.duration)
.and_then(|skipped_slots| epoch.start_slot.checked_add(skipped_slots))
.expect(
"slot number is u64; it should relate in some way to wall clock time; \
if u64 is not enough we should crash for safety; qed.",
);
epoch.epoch_index = epoch_index;
epoch.start_slot = Slot::from(start_slot);
epoch
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error<B: BlockT> {
#[error("Multiple BABE pre-runtime digests, rejecting!")]
MultiplePreRuntimeDigests,
#[error("No BABE pre-runtime digest found")]
NoPreRuntimeDigest,
#[error("Multiple BABE epoch change digests, rejecting!")]
MultipleEpochChangeDigests,
#[error("Multiple BABE config change digests, rejecting!")]
MultipleConfigChangeDigests,
#[error("Could not extract timestamp and slot: {0}")]
Extraction(ConsensusError),
#[error("Could not fetch epoch at {0:?}")]
FetchEpoch(B::Hash),
#[error("Header {0:?} rejected: too far in the future")]
TooFarInFuture(B::Hash),
#[error("Parent ({0}) of {1} unavailable. Cannot import")]
ParentUnavailable(B::Hash, B::Hash),
#[error("Slot number must increase: parent slot: {0}, this slot: {1}")]
SlotMustIncrease(Slot, Slot),
#[error("Header {0:?} has a bad seal")]
HeaderBadSeal(B::Hash),
#[error("Header {0:?} is unsealed")]
HeaderUnsealed(B::Hash),
#[error("Slot author not found")]
SlotAuthorNotFound,
#[error("Secondary slot assignments are disabled for the current epoch.")]
SecondarySlotAssignmentsDisabled,
#[error("Bad signature on {0:?}")]
BadSignature(B::Hash),
#[error("Invalid author: Expected secondary author: {0:?}, got: {1:?}.")]
InvalidAuthor(AuthorityId, AuthorityId),
#[error("No secondary author expected.")]
NoSecondaryAuthorExpected,
#[error("VRF verification failed")]
VrfVerificationFailed,
#[error("VRF output rejected, threshold {0} exceeded")]
VrfThresholdExceeded(u128),
#[error("Could not fetch parent header: {0}")]
FetchParentHeader(sp_blockchain::Error),
#[error("Expected epoch change to happen at {0:?}, s{1}")]
ExpectedEpochChange(B::Hash, Slot),
#[error("Unexpected config change")]
UnexpectedConfigChange,
#[error("Unexpected epoch change")]
UnexpectedEpochChange,
#[error("Parent block of {0} has no associated weight")]
ParentBlockNoAssociatedWeight(B::Hash),
#[error("Checking inherents failed: {0}")]
CheckInherents(sp_inherents::Error),
#[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))]
CheckInherentsUnhandled(sp_inherents::InherentIdentifier),
#[error("Creating inherents failed: {0}")]
CreateInherents(sp_inherents::Error),
#[error("Background worker is not running")]
BackgroundWorkerTerminated,
#[error(transparent)]
Client(sp_blockchain::Error),
#[error(transparent)]
RuntimeApi(sp_api::ApiError),
#[error(transparent)]
ForkTree(Box<fork_tree::Error<sp_blockchain::Error>>),
}
impl<B: BlockT> From<Error<B>> for String {
fn from(error: Error<B>) -> String {
error.to_string()
}
}
fn babe_err<B: BlockT>(error: Error<B>) -> Error<B> {
debug!(target: LOG_TARGET, "{}", error);
error
}
pub struct BabeIntermediate<B: BlockT> {
pub epoch_descriptor: ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
}
pub static INTERMEDIATE_KEY: &[u8] = b"babe1";
pub fn configuration<B: BlockT, C>(client: &C) -> ClientResult<BabeConfiguration>
where
C: AuxStore + ProvideRuntimeApi<B> + UsageProvider<B>,
C::Api: BabeApi<B>,
{
let at_hash = if client.usage_info().chain.finalized_state.is_some() {
client.usage_info().chain.best_hash
} else {
debug!(target: LOG_TARGET, "No finalized state is available. Reading config from genesis");
client.usage_info().chain.genesis_hash
};
let runtime_api = client.runtime_api();
let version = runtime_api.api_version::<dyn BabeApi<B>>(at_hash)?;
let config = match version {
Some(1) => {
#[allow(deprecated)]
{
runtime_api.configuration_before_version_2(at_hash)?.into()
}
},
Some(2) => runtime_api.configuration(at_hash)?,
_ =>
return Err(sp_blockchain::Error::VersionInvalid(
"Unsupported or invalid BabeApi version".to_string(),
)),
};
Ok(config)
}
pub struct BabeParams<B: BlockT, C, SC, E, I, SO, L, CIDP, BS> {
pub keystore: KeystorePtr,
pub client: Arc<C>,
pub select_chain: SC,
pub env: E,
pub block_import: I,
pub sync_oracle: SO,
pub justification_sync_link: L,
pub create_inherent_data_providers: CIDP,
pub force_authoring: bool,
pub backoff_authoring_blocks: Option<BS>,
pub babe_link: BabeLink<B>,
pub block_proposal_slot_portion: SlotProportion,
pub max_block_proposal_slot_portion: Option<SlotProportion>,
pub telemetry: Option<TelemetryHandle>,
}
pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, L, Error>(
BabeParams {
keystore,
client,
select_chain,
env,
block_import,
sync_oracle,
justification_sync_link,
create_inherent_data_providers,
force_authoring,
backoff_authoring_blocks,
babe_link,
block_proposal_slot_portion,
max_block_proposal_slot_portion,
telemetry,
}: BabeParams<B, C, SC, E, I, SO, L, CIDP, BS>,
) -> Result<BabeWorker<B>, ConsensusError>
where
B: BlockT,
C: ProvideRuntimeApi<B>
+ HeaderBackend<B>
+ HeaderMetadata<B, Error = ClientError>
+ Send
+ Sync
+ 'static,
C::Api: BabeApi<B>,
SC: SelectChain<B> + 'static,
E: Environment<B, Error = Error> + Send + Sync + 'static,
E::Proposer: Proposer<B, Error = Error>,
I: BlockImport<B, Error = ConsensusError> + Send + Sync + 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
L: sc_consensus::JustificationSyncLink<B> + 'static,
CIDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
CIDP::InherentDataProviders: InherentDataProviderExt + Send,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
{
let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
let worker = BabeSlotWorker {
client: client.clone(),
block_import,
env,
sync_oracle: sync_oracle.clone(),
justification_sync_link,
force_authoring,
backoff_authoring_blocks,
keystore,
epoch_changes: babe_link.epoch_changes.clone(),
slot_notification_sinks: slot_notification_sinks.clone(),
config: babe_link.config.clone(),
block_proposal_slot_portion,
max_block_proposal_slot_portion,
telemetry,
};
info!(target: LOG_TARGET, "๐ถ Starting BABE Authorship worker");
let slot_worker = sc_consensus_slots::start_slot_worker(
babe_link.config.slot_duration(),
select_chain,
sc_consensus_slots::SimpleSlotWorkerToSlotWorker(worker),
sync_oracle,
create_inherent_data_providers,
);
Ok(BabeWorker { inner: Box::pin(slot_worker), slot_notification_sinks })
}
fn aux_storage_cleanup<C: HeaderMetadata<Block> + HeaderBackend<Block>, Block: BlockT>(
client: &C,
notification: &FinalityNotification<Block>,
) -> AuxDataOperations {
let mut hashes = HashSet::new();
let first = notification.tree_route.first().unwrap_or(¬ification.hash);
match client.header_metadata(*first) {
Ok(meta) => {
hashes.insert(meta.parent);
},
Err(err) => {
warn!(target: LOG_TARGET, "Failed to lookup metadata for block `{:?}`: {}", first, err,)
},
}
hashes.extend(
notification
.tree_route
.iter()
.filter(|h| **h != notification.hash),
);
let stale_forks = match client.expand_forks(¬ification.stale_heads) {
Ok(stale_forks) => stale_forks,
Err(e) => {
warn!(target: LOG_TARGET, "{:?}", e);
Default::default()
},
};
hashes.extend(stale_forks.iter());
hashes
.into_iter()
.map(|val| (aux_schema::block_weight_key(val), None))
.collect()
}
async fn answer_requests<B: BlockT, C>(
mut request_rx: Receiver<BabeRequest<B>>,
config: BabeConfiguration,
client: Arc<C>,
epoch_changes: SharedEpochChanges<B, Epoch>,
) where
C: HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
{
while let Some(request) = request_rx.next().await {
match request {
BabeRequest::EpochData(response) => {
let _ = response.send(epoch_changes.shared_data().clone());
},
BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, response) => {
let lookup = || {
let epoch_changes = epoch_changes.shared_data();
epoch_changes
.epoch_data_for_child_of(
descendent_query(&*client),
&parent_hash,
parent_number,
slot,
|slot| Epoch::genesis(&config, slot),
)
.map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
.ok_or(Error::<B>::FetchEpoch(parent_hash))
};
let _ = response.send(lookup());
},
}
}
}
enum BabeRequest<B: BlockT> {
EpochData(oneshot::Sender<EpochChangesFor<B, Epoch>>),
EpochDataForChildOf(B::Hash, NumberFor<B>, Slot, oneshot::Sender<Result<Epoch, Error<B>>>),
}
#[derive(Clone)]
pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);
impl<B: BlockT> BabeWorkerHandle<B> {
async fn send_request(&self, request: BabeRequest<B>) -> Result<(), Error<B>> {
match self.0.clone().send(request).await {
Err(err) if err.is_disconnected() => return Err(Error::BackgroundWorkerTerminated),
Err(err) => warn!(
target: LOG_TARGET,
"Unhandled error when sending request to worker: {:?}", err
),
_ => {},
}
Ok(())
}
pub async fn epoch_data(&self) -> Result<EpochChangesFor<B, Epoch>, Error<B>> {
let (tx, rx) = oneshot::channel();
self.send_request(BabeRequest::EpochData(tx)).await?;
rx.await.or(Err(Error::BackgroundWorkerTerminated))
}
pub async fn epoch_data_for_child_of(
&self,
parent_hash: B::Hash,
parent_number: NumberFor<B>,
slot: Slot,
) -> Result<Epoch, Error<B>> {
let (tx, rx) = oneshot::channel();
self.send_request(BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, tx))
.await?;
rx.await.or(Err(Error::BackgroundWorkerTerminated))?
}
}
#[must_use]
pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
slot_notification_sinks: SlotNotificationSinks<B>,
}
impl<B: BlockT> BabeWorker<B> {
pub fn slot_notification_stream(
&self,
) -> Receiver<(Slot, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
const CHANNEL_BUFFER_SIZE: usize = 1024;
let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
self.slot_notification_sinks.lock().push(sink);
stream
}
}
impl<B: BlockT> Future for BabeWorker<B> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}
type SlotNotificationSinks<B> = Arc<
Mutex<Vec<Sender<(Slot, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>,
>;
struct BabeSlotWorker<B: BlockT, C, E, I, SO, L, BS> {
client: Arc<C>,
block_import: I,
env: E,
sync_oracle: SO,
justification_sync_link: L,
force_authoring: bool,
backoff_authoring_blocks: Option<BS>,
keystore: KeystorePtr,
epoch_changes: SharedEpochChanges<B, Epoch>,
slot_notification_sinks: SlotNotificationSinks<B>,
config: BabeConfiguration,
block_proposal_slot_portion: SlotProportion,
max_block_proposal_slot_portion: Option<SlotProportion>,
telemetry: Option<TelemetryHandle>,
}
#[async_trait::async_trait]
impl<B, C, E, I, Error, SO, L, BS> sc_consensus_slots::SimpleSlotWorker<B>
for BabeSlotWorker<B, C, E, I, SO, L, BS>
where
B: BlockT,
C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
C::Api: BabeApi<B>,
E: Environment<B, Error = Error> + Send + Sync,
E::Proposer: Proposer<B, Error = Error>,
I: BlockImport<B> + Send + Sync + 'static,
SO: SyncOracle + Send + Clone + Sync,
L: sc_consensus::JustificationSyncLink<B>,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
{
type Claim = (PreDigest, AuthorityId);
type SyncOracle = SO;
type JustificationSyncLink = L;
type CreateProposer =
Pin<Box<dyn Future<Output = Result<E::Proposer, ConsensusError>> + Send + 'static>>;
type Proposer = E::Proposer;
type BlockImport = I;
type AuxData = ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>;
fn logging_target(&self) -> &'static str {
LOG_TARGET
}
fn block_import(&mut self) -> &mut Self::BlockImport {
&mut self.block_import
}
fn aux_data(&self, parent: &B::Header, slot: Slot) -> Result<Self::AuxData, ConsensusError> {
self.epoch_changes
.shared_data()
.epoch_descriptor_for_child_of(
descendent_query(&*self.client),
&parent.hash(),
*parent.number(),
slot,
)
.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
.ok_or(ConsensusError::InvalidAuthoritiesSet)
}
fn authorities_len(&self, epoch_descriptor: &Self::AuxData) -> Option<usize> {
self.epoch_changes
.shared_data()
.viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
.map(|epoch| epoch.as_ref().authorities.len())
}
async fn claim_slot(
&mut self,
_parent_header: &B::Header,
slot: Slot,
epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
) -> Option<Self::Claim> {
debug!(target: LOG_TARGET, "Attempting to claim slot {}", slot);
let s = authorship::claim_slot(
slot,
self.epoch_changes
.shared_data()
.viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))?
.as_ref(),
&self.keystore,
);
if s.is_some() {
debug!(target: LOG_TARGET, "Claimed slot {}", slot);
}
s
}
fn notify_slot(
&self,
_parent_header: &B::Header,
slot: Slot,
epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
) {
let sinks = &mut self.slot_notification_sinks.lock();
sinks.retain_mut(|sink| match sink.try_send((slot, epoch_descriptor.clone())) {
Ok(()) => true,
Err(e) =>
if e.is_full() {
warn!(target: LOG_TARGET, "Trying to notify a slot but the channel is full");
true
} else {
false
},
});
}
fn pre_digest_data(&self, _slot: Slot, claim: &Self::Claim) -> Vec<sp_runtime::DigestItem> {
vec![<DigestItem as CompatibleDigestItem>::babe_pre_digest(claim.0.clone())]
}
async fn block_import_params(
&self,
header: B::Header,
header_hash: &B::Hash,
body: Vec<B::Extrinsic>,
storage_changes: StorageChanges<B>,
(_, public): Self::Claim,
epoch_descriptor: Self::AuxData,
) -> Result<BlockImportParams<B>, ConsensusError> {
let signature = self
.keystore
.sr25519_sign(<AuthorityId as AppCrypto>::ID, public.as_ref(), header_hash.as_ref())
.map_err(|e| ConsensusError::CannotSign(format!("{}. Key: {:?}", e, public)))?
.ok_or_else(|| {
ConsensusError::CannotSign(format!(
"Could not find key in keystore. Key: {:?}",
public
))
})?;
let digest_item = <DigestItem as CompatibleDigestItem>::babe_seal(signature.into());
let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(digest_item);
import_block.body = Some(body);
import_block.state_action =
StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
import_block
.insert_intermediate(INTERMEDIATE_KEY, BabeIntermediate::<B> { epoch_descriptor });
Ok(import_block)
}
fn force_authoring(&self) -> bool {
self.force_authoring
}
fn should_backoff(&self, slot: Slot, chain_head: &B::Header) -> bool {
if let Some(ref strategy) = self.backoff_authoring_blocks {
if let Ok(chain_head_slot) =
find_pre_digest::<B>(chain_head).map(|digest| digest.slot())
{
return strategy.should_backoff(
*chain_head.number(),
chain_head_slot,
self.client.info().finalized_number,
slot,
self.logging_target(),
)
}
}
false
}
fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
&mut self.sync_oracle
}
fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
&mut self.justification_sync_link
}
fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
Box::pin(self.env.init(block).map_err(|e| ConsensusError::ClientImport(e.to_string())))
}
fn telemetry(&self) -> Option<TelemetryHandle> {
self.telemetry.clone()
}
fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());
sc_consensus_slots::proposing_remaining_duration(
parent_slot,
slot_info,
&self.block_proposal_slot_portion,
self.max_block_proposal_slot_portion.as_ref(),
sc_consensus_slots::SlotLenienceType::Exponential,
self.logging_target(),
)
}
}
pub fn find_pre_digest<B: BlockT>(header: &B::Header) -> Result<PreDigest, Error<B>> {
if header.number().is_zero() {
return Ok(PreDigest::SecondaryPlain(SecondaryPlainPreDigest {
slot: 0.into(),
authority_index: 0,
}))
}
let mut pre_digest: Option<_> = None;
for log in header.digest().logs() {
trace!(target: LOG_TARGET, "Checking log {:?}, looking for pre runtime digest", log);
match (log.as_babe_pre_digest(), pre_digest.is_some()) {
(Some(_), true) => return Err(babe_err(Error::MultiplePreRuntimeDigests)),
(None, _) => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
(s, false) => pre_digest = s,
}
}
pre_digest.ok_or_else(|| babe_err(Error::NoPreRuntimeDigest))
}
fn find_next_epoch_digest<B: BlockT>(
header: &B::Header,
) -> Result<Option<NextEpochDescriptor>, Error<B>> {
let mut epoch_digest: Option<_> = None;
for log in header.digest().logs() {
trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
match (log, epoch_digest.is_some()) {
(Some(ConsensusLog::NextEpochData(_)), true) =>
return Err(babe_err(Error::MultipleEpochChangeDigests)),
(Some(ConsensusLog::NextEpochData(epoch)), false) => epoch_digest = Some(epoch),
_ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
}
}
Ok(epoch_digest)
}
fn find_next_config_digest<B: BlockT>(
header: &B::Header,
) -> Result<Option<NextConfigDescriptor>, Error<B>> {
let mut config_digest: Option<_> = None;
for log in header.digest().logs() {
trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
match (log, config_digest.is_some()) {
(Some(ConsensusLog::NextConfigData(_)), true) =>
return Err(babe_err(Error::MultipleConfigChangeDigests)),
(Some(ConsensusLog::NextConfigData(config)), false) => config_digest = Some(config),
_ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
}
}
Ok(config_digest)
}
#[derive(Clone)]
pub struct BabeLink<Block: BlockT> {
epoch_changes: SharedEpochChanges<Block, Epoch>,
config: BabeConfiguration,
}
impl<Block: BlockT> BabeLink<Block> {
pub fn epoch_changes(&self) -> &SharedEpochChanges<Block, Epoch> {
&self.epoch_changes
}
pub fn config(&self) -> &BabeConfiguration {
&self.config
}
}
pub struct BabeVerifier<Block: BlockT, Client, SelectChain, CIDP> {
client: Arc<Client>,
select_chain: SelectChain,
create_inherent_data_providers: CIDP,
config: BabeConfiguration,
epoch_changes: SharedEpochChanges<Block, Epoch>,
telemetry: Option<TelemetryHandle>,
offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
}
impl<Block, Client, SelectChain, CIDP> BabeVerifier<Block, Client, SelectChain, CIDP>
where
Block: BlockT,
Client: AuxStore + HeaderBackend<Block> + HeaderMetadata<Block> + ProvideRuntimeApi<Block>,
Client::Api: BlockBuilderApi<Block> + BabeApi<Block>,
SelectChain: sp_consensus::SelectChain<Block>,
CIDP: CreateInherentDataProviders<Block, ()>,
{
async fn check_inherents(
&self,
block: Block,
at_hash: Block::Hash,
inherent_data: InherentData,
create_inherent_data_providers: CIDP::InherentDataProviders,
) -> Result<(), Error<Block>> {
let inherent_res = self
.client
.runtime_api()
.check_inherents(at_hash, block, inherent_data)
.map_err(Error::RuntimeApi)?;
if !inherent_res.ok() {
for (i, e) in inherent_res.into_errors() {
match create_inherent_data_providers.try_handle_error(&i, &e).await {
Some(res) => res.map_err(|e| Error::CheckInherents(e))?,
None => return Err(Error::CheckInherentsUnhandled(i)),
}
}
}
Ok(())
}
async fn check_and_report_equivocation(
&self,
slot_now: Slot,
slot: Slot,
header: &Block::Header,
author: &AuthorityId,
origin: &BlockOrigin,
) -> Result<(), Error<Block>> {
if *origin == BlockOrigin::NetworkInitialSync {
return Ok(())
}
let equivocation_proof =
match check_equivocation(&*self.client, slot_now, slot, header, author)
.map_err(Error::Client)?
{
Some(proof) => proof,
None => return Ok(()),
};
info!(
"Slot author {:?} is equivocating at slot {} with headers {:?} and {:?}",
author,
slot,
equivocation_proof.first_header.hash(),
equivocation_proof.second_header.hash(),
);
let best_hash = self
.select_chain
.best_chain()
.await
.map(|h| h.hash())
.map_err(|e| Error::Client(e.into()))?;
let generate_key_owner_proof = |at_hash: Block::Hash| {
self.client
.runtime_api()
.generate_key_ownership_proof(at_hash, slot, equivocation_proof.offender.clone())
.map_err(Error::RuntimeApi)
};
let parent_hash = *header.parent_hash();
let key_owner_proof = match generate_key_owner_proof(parent_hash)? {
Some(proof) => proof,
None => match generate_key_owner_proof(best_hash)? {
Some(proof) => proof,
None => {
debug!(
target: LOG_TARGET,
"Equivocation offender is not part of the authority set."
);
return Ok(())
},
},
};
let mut runtime_api = self.client.runtime_api();
runtime_api
.register_extension(self.offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
runtime_api
.submit_report_equivocation_unsigned_extrinsic(
best_hash,
equivocation_proof,
key_owner_proof,
)
.map_err(Error::RuntimeApi)?;
info!(target: LOG_TARGET, "Submitted equivocation report for author {:?}", author);
Ok(())
}
}
#[async_trait::async_trait]
impl<Block, Client, SelectChain, CIDP> Verifier<Block>
for BabeVerifier<Block, Client, SelectChain, CIDP>
where
Block: BlockT,
Client: HeaderMetadata<Block, Error = sp_blockchain::Error>
+ HeaderBackend<Block>
+ ProvideRuntimeApi<Block>
+ Send
+ Sync
+ AuxStore,
Client::Api: BlockBuilderApi<Block> + BabeApi<Block>,
SelectChain: sp_consensus::SelectChain<Block>,
CIDP: CreateInherentDataProviders<Block, ()> + Send + Sync,
CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
{
async fn verify(
&self,
mut block: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
trace!(
target: LOG_TARGET,
"Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}",
block.origin,
block.header,
block.justifications,
block.body,
);
let hash = block.header.hash();
let parent_hash = *block.header.parent_hash();
let info = self.client.info();
let number = *block.header.number();
if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) ||
block.with_state()
{
return Ok(block)
}
debug!(
target: LOG_TARGET,
"We have {:?} logs in this header",
block.header.digest().logs().len()
);
let create_inherent_data_providers = self
.create_inherent_data_providers
.create_inherent_data_providers(parent_hash, ())
.await
.map_err(|e| Error::<Block>::Client(ConsensusError::from(e).into()))?;
let slot_now = create_inherent_data_providers.slot();
let parent_header_metadata = self
.client
.header_metadata(parent_hash)
.map_err(Error::<Block>::FetchParentHeader)?;
let pre_digest = find_pre_digest::<Block>(&block.header)?;
let (check_header, epoch_descriptor) = {
let epoch_changes = self.epoch_changes.shared_data();
let epoch_descriptor = epoch_changes
.epoch_descriptor_for_child_of(
descendent_query(&*self.client),
&parent_hash,
parent_header_metadata.number,
pre_digest.slot(),
)
.map_err(|e| Error::<Block>::ForkTree(Box::new(e)))?
.ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
let viable_epoch = epoch_changes
.viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
.ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
let v_params = verification::VerificationParams {
header: block.header.clone(),
pre_digest: Some(pre_digest),
slot_now: slot_now + 1,
epoch: viable_epoch.as_ref(),
};
(verification::check_header::<Block>(v_params)?, epoch_descriptor)
};
match check_header {
CheckedHeader::Checked(pre_header, verified_info) => {
let babe_pre_digest = verified_info
.pre_digest
.as_babe_pre_digest()
.expect("check_header always returns a pre-digest digest item; qed");
let slot = babe_pre_digest.slot();
if let Err(err) = self
.check_and_report_equivocation(
slot_now,
slot,
&block.header,
&verified_info.author,
&block.origin,
)
.await
{
warn!(
target: LOG_TARGET,
"Error checking/reporting BABE equivocation: {}", err
);
}
if let Some(inner_body) = block.body {
let new_block = Block::new(pre_header.clone(), inner_body);
if !block.state_action.skip_execution_checks() {
let mut inherent_data = create_inherent_data_providers
.create_inherent_data()
.await
.map_err(Error::<Block>::CreateInherents)?;
inherent_data.babe_replace_inherent_data(slot);
self.check_inherents(
new_block.clone(),
parent_hash,
inherent_data,
create_inherent_data_providers,
)
.await?;
}
let (_, inner_body) = new_block.deconstruct();
block.body = Some(inner_body);
}
trace!(target: LOG_TARGET, "Checked {:?}; importing.", pre_header);
telemetry!(
self.telemetry;
CONSENSUS_TRACE;
"babe.checked_and_importing";
"pre_header" => ?pre_header,
);
block.header = pre_header;
block.post_digests.push(verified_info.seal);
block.insert_intermediate(
INTERMEDIATE_KEY,
BabeIntermediate::<Block> { epoch_descriptor },
);
block.post_hash = Some(hash);
Ok(block)
},
CheckedHeader::Deferred(a, b) => {
debug!(target: LOG_TARGET, "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
telemetry!(
self.telemetry;
CONSENSUS_DEBUG;
"babe.header_too_far_in_future";
"hash" => ?hash, "a" => ?a, "b" => ?b
);
Err(Error::<Block>::TooFarInFuture(hash).into())
},
}
}
}
pub struct BabeBlockImport<Block: BlockT, Client, I> {
inner: I,
client: Arc<Client>,
epoch_changes: SharedEpochChanges<Block, Epoch>,
config: BabeConfiguration,
}
impl<Block: BlockT, I: Clone, Client> Clone for BabeBlockImport<Block, Client, I> {
fn clone(&self) -> Self {
BabeBlockImport {
inner: self.inner.clone(),
client: self.client.clone(),
epoch_changes: self.epoch_changes.clone(),
config: self.config.clone(),
}
}
}
impl<Block: BlockT, Client, I> BabeBlockImport<Block, Client, I> {
fn new(
client: Arc<Client>,
epoch_changes: SharedEpochChanges<Block, Epoch>,
block_import: I,
config: BabeConfiguration,
) -> Self {
BabeBlockImport { client, inner: block_import, epoch_changes, config }
}
}
impl<Block, Client, Inner> BabeBlockImport<Block, Client, Inner>
where
Block: BlockT,
Inner: BlockImport<Block> + Send + Sync,
Inner::Error: Into<ConsensusError>,
Client: HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ AuxStore
+ ProvideRuntimeApi<Block>
+ Send
+ Sync,
Client::Api: BabeApi<Block> + ApiExt<Block>,
{
async fn import_state(
&self,
mut block: BlockImportParams<Block>,
) -> Result<ImportResult, ConsensusError> {
let hash = block.post_hash();
let parent_hash = *block.header.parent_hash();
let number = *block.header.number();
block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
aux_schema::write_block_weight(hash, 0, |values| {
block
.auxiliary
.extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
});
let import_result = self.inner.import_block(block).await;
let aux = match import_result {
Ok(ImportResult::Imported(aux)) => aux,
Ok(r) =>
return Err(ConsensusError::ClientImport(format!(
"Unexpected import result: {:?}",
r
))),
Err(r) => return Err(r.into()),
};
let current_epoch = self.client.runtime_api().current_epoch(hash).map_err(|e| {
ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
})?;
let next_epoch = self.client.runtime_api().next_epoch(hash).map_err(|e| {
ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
})?;
let mut epoch_changes = self.epoch_changes.shared_data_locked();
epoch_changes.reset(parent_hash, hash, number, current_epoch.into(), next_epoch.into());
aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
self.client.insert_aux(insert, [])
})
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
Ok(ImportResult::Imported(aux))
}
}
#[async_trait::async_trait]
impl<Block, Client, Inner> BlockImport<Block> for BabeBlockImport<Block, Client, Inner>
where
Block: BlockT,
Inner: BlockImport<Block> + Send + Sync,
Inner::Error: Into<ConsensusError>,
Client: HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ AuxStore
+ ProvideRuntimeApi<Block>
+ Send
+ Sync,
Client::Api: BabeApi<Block> + ApiExt<Block>,
{
type Error = ConsensusError;
async fn import_block(
&self,
mut block: BlockImportParams<Block>,
) -> Result<ImportResult, Self::Error> {
let hash = block.post_hash();
let number = *block.header.number();
let info = self.client.info();
let block_status = self
.client
.status(hash)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) ||
block_status == BlockStatus::InChain
{
let _ = block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY);
block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
return self.inner.import_block(block).await.map_err(Into::into)
}
if block.with_state() {
return self.import_state(block).await
}
let pre_digest = find_pre_digest::<Block>(&block.header).expect(
"valid babe headers must contain a predigest; header has been already verified; qed",
);
let slot = pre_digest.slot();
let parent_hash = *block.header.parent_hash();
let parent_header = self
.client
.header(parent_hash)
.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
.ok_or_else(|| {
ConsensusError::ChainLookup(
babe_err(Error::<Block>::ParentUnavailable(parent_hash, hash)).into(),
)
})?;
let parent_slot = find_pre_digest::<Block>(&parent_header).map(|d| d.slot()).expect(
"parent is non-genesis; valid BABE headers contain a pre-digest; header has already \
been verified; qed",
);
if slot <= parent_slot {
return Err(ConsensusError::ClientImport(
babe_err(Error::<Block>::SlotMustIncrease(parent_slot, slot)).into(),
))
}
let mut old_epoch_changes = None;
let mut epoch_changes = {
let mut epoch_changes = self.epoch_changes.shared_data_locked();
let (epoch_descriptor, first_in_epoch, parent_weight) = {
let parent_weight = if *parent_header.number() == Zero::zero() {
0
} else {
aux_schema::load_block_weight(&*self.client, parent_hash)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
.ok_or_else(|| {
ConsensusError::ClientImport(
babe_err(Error::<Block>::ParentBlockNoAssociatedWeight(hash))
.into(),
)
})?
};
let intermediate =
block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)?;
let epoch_descriptor = intermediate.epoch_descriptor;
let first_in_epoch = parent_slot < epoch_descriptor.start_slot();
(epoch_descriptor, first_in_epoch, parent_weight)
};
let total_weight = parent_weight + pre_digest.added_weight();
let next_epoch_digest = find_next_epoch_digest::<Block>(&block.header)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
let next_config_digest = find_next_config_digest::<Block>(&block.header)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
match (first_in_epoch, next_epoch_digest.is_some(), next_config_digest.is_some()) {
(true, true, _) => {},
(false, false, false) => {},
(false, false, true) =>
return Err(ConsensusError::ClientImport(
babe_err(Error::<Block>::UnexpectedConfigChange).into(),
)),
(true, false, _) =>
return Err(ConsensusError::ClientImport(
babe_err(Error::<Block>::ExpectedEpochChange(hash, slot)).into(),
)),
(false, true, _) =>
return Err(ConsensusError::ClientImport(
babe_err(Error::<Block>::UnexpectedEpochChange).into(),
)),
}
if let Some(next_epoch_descriptor) = next_epoch_digest {
old_epoch_changes = Some((*epoch_changes).clone());
let mut viable_epoch = epoch_changes
.viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
.ok_or_else(|| {
ConsensusError::ClientImport(Error::<Block>::FetchEpoch(parent_hash).into())
})?
.into_cloned();
let epoch_config = next_config_digest
.map(Into::into)
.unwrap_or_else(|| viable_epoch.as_ref().config.clone());
let log_level = if block.origin == BlockOrigin::NetworkInitialSync {
log::Level::Debug
} else {
log::Level::Info
};
if viable_epoch.as_ref().end_slot() <= slot {
let epoch = viable_epoch.as_mut();
let prev_index = epoch.epoch_index;
*epoch = epoch.clone_for_slot(slot);
warn!(
target: LOG_TARGET,
"๐ถ Epoch(s) skipped: from {} to {}", prev_index, epoch.epoch_index,
);
}
log!(
target: LOG_TARGET,
log_level,
"๐ถ New epoch {} launching at block {} (block slot {} >= start slot {}).",
viable_epoch.as_ref().epoch_index,
hash,
slot,
viable_epoch.as_ref().start_slot,
);
let next_epoch = viable_epoch.increment((next_epoch_descriptor, epoch_config));
log!(
target: LOG_TARGET,
log_level,
"๐ถ Next epoch starts at slot {}",
next_epoch.as_ref().start_slot,
);
let prune_and_import = || {
prune_finalized(self.client.clone(), &mut epoch_changes)?;
epoch_changes
.import(
descendent_query(&*self.client),
hash,
number,
*block.header.parent_hash(),
next_epoch,
)
.map_err(|e| {
ConsensusError::ClientImport(format!(
"Error importing epoch changes: {}",
e
))
})?;
Ok(())
};
if let Err(e) = prune_and_import() {
debug!(target: LOG_TARGET, "Failed to launch next epoch: {}", e);
*epoch_changes =
old_epoch_changes.expect("set `Some` above and not taken; qed");
return Err(e)
}
crate::aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
block
.auxiliary
.extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
});
}
aux_schema::write_block_weight(hash, total_weight, |values| {
block
.auxiliary
.extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
});
block.fork_choice = {
let (last_best, last_best_number) = (info.best_hash, info.best_number);
let last_best_weight = if &last_best == block.header.parent_hash() {
parent_weight
} else {
aux_schema::load_block_weight(&*self.client, last_best)
.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
.ok_or_else(|| {
ConsensusError::ChainLookup(
"No block weight for parent header.".to_string(),
)
})?
};
Some(ForkChoiceStrategy::Custom(if total_weight > last_best_weight {
true
} else if total_weight == last_best_weight {
number > last_best_number
} else {
false
}))
};
epoch_changes.release_mutex()
};
let import_result = self.inner.import_block(block).await;
if import_result.is_err() {
if let Some(old_epoch_changes) = old_epoch_changes {
*epoch_changes.upgrade() = old_epoch_changes;
}
}
import_result.map_err(Into::into)
}
async fn check_block(
&self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
self.inner.check_block(block).await.map_err(Into::into)
}
}
fn prune_finalized<Block, Client>(
client: Arc<Client>,
epoch_changes: &mut EpochChangesFor<Block, Epoch>,
) -> Result<(), ConsensusError>
where
Block: BlockT,
Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
{
let info = client.info();
let finalized_slot = {
let finalized_header = client
.header(info.finalized_hash)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
.expect(
"best finalized hash was given by client; finalized headers must exist in db; qed",
);
find_pre_digest::<Block>(&finalized_header)
.expect("finalized header must be valid; valid blocks have a pre-digest; qed")
.slot()
};
epoch_changes
.prune_finalized(
descendent_query(&*client),
&info.finalized_hash,
info.finalized_number,
finalized_slot,
)
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
Ok(())
}
pub fn block_import<Client, Block: BlockT, I>(
config: BabeConfiguration,
wrapped_block_import: I,
client: Arc<Client>,
) -> ClientResult<(BabeBlockImport<Block, Client, I>, BabeLink<Block>)>
where
Client: AuxStore
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ PreCommitActions<Block>
+ 'static,
{
let epoch_changes = aux_schema::load_epoch_changes::<Block, _>(&*client, &config)?;
let link = BabeLink { epoch_changes: epoch_changes.clone(), config: config.clone() };
prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;
let client_weak = Arc::downgrade(&client);
let on_finality = move |summary: &FinalityNotification<Block>| {
if let Some(client) = client_weak.upgrade() {
aux_storage_cleanup(client.as_ref(), summary)
} else {
Default::default()
}
};
client.register_finality_action(Box::new(on_finality));
let import = BabeBlockImport::new(client, epoch_changes, wrapped_block_import, config);
Ok((import, link))
}
pub struct ImportQueueParams<'a, Block: BlockT, BI, Client, CIDP, SelectChain, Spawn> {
pub link: BabeLink<Block>,
pub block_import: BI,
pub justification_import: Option<BoxJustificationImport<Block>>,
pub client: Arc<Client>,
pub select_chain: SelectChain,
pub create_inherent_data_providers: CIDP,
pub spawner: &'a Spawn,
pub registry: Option<&'a Registry>,
pub telemetry: Option<TelemetryHandle>,
pub offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
}
pub fn import_queue<Block: BlockT, Client, SelectChain, BI, CIDP, Spawn>(
ImportQueueParams {
link: babe_link,
block_import,
justification_import,
client,
select_chain,
create_inherent_data_providers,
spawner,
registry,
telemetry,
offchain_tx_pool_factory,
}: ImportQueueParams<'_, Block, BI, Client, CIDP, SelectChain, Spawn>,
) -> ClientResult<(DefaultImportQueue<Block>, BabeWorkerHandle<Block>)>
where
BI: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
Client: ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ AuxStore
+ Send
+ Sync
+ 'static,
Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
SelectChain: sp_consensus::SelectChain<Block> + 'static,
CIDP: CreateInherentDataProviders<Block, ()> + Send + Sync + 'static,
CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
Spawn: SpawnEssentialNamed,
{
const HANDLE_BUFFER_SIZE: usize = 1024;
let verifier = BabeVerifier {
select_chain,
create_inherent_data_providers,
config: babe_link.config.clone(),
epoch_changes: babe_link.epoch_changes.clone(),
telemetry,
client: client.clone(),
offchain_tx_pool_factory,
};
let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
let answer_requests =
answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());
Ok((
BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
BabeWorkerHandle(worker_tx),
))
}
pub fn revert<Block, Client, Backend>(
client: Arc<Client>,
backend: Arc<Backend>,
blocks: NumberFor<Block>,
) -> ClientResult<()>
where
Block: BlockT,
Client: AuxStore
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ HeaderBackend<Block>
+ ProvideRuntimeApi<Block>
+ UsageProvider<Block>,
Client::Api: BabeApi<Block>,
Backend: BackendT<Block>,
{
let best_number = client.info().best_number;
let finalized = client.info().finalized_number;
let revertible = blocks.min(best_number - finalized);
if revertible == Zero::zero() {
return Ok(())
}
let revert_up_to_number = best_number - revertible;
let revert_up_to_hash = client.hash(revert_up_to_number)?.ok_or(ClientError::Backend(
format!("Unexpected hash lookup failure for block number: {}", revert_up_to_number),
))?;
let config = configuration(&*client)?;
let epoch_changes = aux_schema::load_epoch_changes::<Block, Client>(&*client, &config)?;
let mut epoch_changes = epoch_changes.shared_data();
if revert_up_to_number == Zero::zero() {
*epoch_changes = EpochChangesFor::<Block, Epoch>::default();
} else {
epoch_changes.revert(descendent_query(&*client), revert_up_to_hash, revert_up_to_number);
}
let mut weight_keys = HashSet::with_capacity(revertible.saturated_into());
let leaves = backend.blockchain().leaves()?.into_iter().filter(|&leaf| {
sp_blockchain::tree_route(&*client, revert_up_to_hash, leaf)
.map(|route| route.retracted().is_empty())
.unwrap_or_default()
});
for leaf in leaves {
let mut hash = leaf;
loop {
let meta = client.header_metadata(hash)?;
if meta.number <= revert_up_to_number ||
!weight_keys.insert(aux_schema::block_weight_key(hash))
{
break
}
hash = meta.parent;
}
}
let weight_keys: Vec<_> = weight_keys.iter().map(|val| val.as_slice()).collect();
aux_schema::write_epoch_changes::<Block, _, _>(&epoch_changes, |values| {
client.insert_aux(values, weight_keys.iter())
})
}