1#![forbid(unsafe_code)]
67#![warn(missing_docs)]
68
69use std::{
70 collections::HashSet,
71 future::Future,
72 ops::{Deref, DerefMut},
73 pin::Pin,
74 sync::Arc,
75 task::{Context, Poll},
76 time::Duration,
77};
78
79use codec::{Decode, Encode};
80use futures::{
81 channel::{
82 mpsc::{channel, Receiver, Sender},
83 oneshot,
84 },
85 prelude::*,
86};
87use log::{debug, info, log, trace, warn};
88use parking_lot::Mutex;
89use prometheus_endpoint::Registry;
90
91use sc_client_api::{
92 backend::AuxStore, AuxDataOperations, Backend as BackendT, FinalityNotification,
93 PreCommitActions, UsageProvider,
94};
95use sc_consensus::{
96 block_import::{
97 BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
98 StateAction,
99 },
100 import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
101};
102use sc_consensus_epochs::{
103 descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpoch,
104 ViableEpochDescriptor,
105};
106use sc_consensus_slots::{
107 check_equivocation, BackoffAuthoringBlocksStrategy, CheckedHeader, InherentDataProviderExt,
108 SlotInfo, StorageChanges,
109};
110use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
111use sc_transaction_pool_api::OffchainTransactionPoolFactory;
112use sp_api::{ApiExt, ProvideRuntimeApi};
113use sp_application_crypto::AppCrypto;
114use sp_block_builder::BlockBuilder as BlockBuilderApi;
115use sp_blockchain::{
116 Backend as _, BlockStatus, Error as ClientError, ForkBackend, HeaderBackend, HeaderMetadata,
117 Result as ClientResult,
118};
119use sp_consensus::{BlockOrigin, Environment, Error as ConsensusError, Proposer, SelectChain};
120use sp_consensus_babe::{inherents::BabeInherentData, SlotDuration};
121use sp_consensus_slots::Slot;
122use sp_core::traits::SpawnEssentialNamed;
123use sp_inherents::{CreateInherentDataProviders, InherentDataProvider};
124use sp_keystore::KeystorePtr;
125use sp_runtime::{
126 generic::OpaqueDigestItemId,
127 traits::{Block as BlockT, Header, NumberFor, SaturatedConversion, Zero},
128 DigestItem,
129};
130
131pub use sc_consensus_slots::SlotProportion;
132pub use sp_consensus::SyncOracle;
133pub use sp_consensus_babe::{
134 digests::{
135 CompatibleDigestItem, NextConfigDescriptor, NextEpochDescriptor, PreDigest,
136 PrimaryPreDigest, SecondaryPlainPreDigest,
137 },
138 AuthorityId, AuthorityPair, AuthoritySignature, BabeApi, BabeAuthorityWeight, BabeBlockWeight,
139 BabeConfiguration, BabeEpochConfiguration, ConsensusLog, Randomness, BABE_ENGINE_ID,
140};
141
142pub use aux_schema::load_block_weight as block_weight;
143use sp_timestamp::Timestamp;
144
145mod migration;
146mod verification;
147
148pub mod authorship;
149pub mod aux_schema;
150#[cfg(test)]
151mod tests;
152
153const LOG_TARGET: &str = "babe";
154
155const AUTHORING_SCORE_VRF_CONTEXT: &[u8] = b"substrate-babe-vrf";
157
158const AUTHORING_SCORE_LENGTH: usize = 16;
160
161#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
163pub struct Epoch(sp_consensus_babe::Epoch);
164
165impl Deref for Epoch {
166 type Target = sp_consensus_babe::Epoch;
167
168 fn deref(&self) -> &Self::Target {
169 &self.0
170 }
171}
172
173impl DerefMut for Epoch {
174 fn deref_mut(&mut self) -> &mut Self::Target {
175 &mut self.0
176 }
177}
178
179impl From<sp_consensus_babe::Epoch> for Epoch {
180 fn from(epoch: sp_consensus_babe::Epoch) -> Self {
181 Epoch(epoch)
182 }
183}
184
185impl EpochT for Epoch {
186 type NextEpochDescriptor = (NextEpochDescriptor, BabeEpochConfiguration);
187 type Slot = Slot;
188
189 fn increment(
190 &self,
191 (descriptor, config): (NextEpochDescriptor, BabeEpochConfiguration),
192 ) -> Epoch {
193 sp_consensus_babe::Epoch {
194 epoch_index: self.epoch_index + 1,
195 start_slot: self.start_slot + self.duration,
196 duration: self.duration,
197 authorities: descriptor.authorities,
198 randomness: descriptor.randomness,
199 config,
200 }
201 .into()
202 }
203
204 fn start_slot(&self) -> Slot {
205 self.start_slot
206 }
207
208 fn end_slot(&self) -> Slot {
209 self.start_slot + self.duration
210 }
211}
212
213impl Epoch {
214 pub fn genesis(genesis_config: &BabeConfiguration, slot: Slot) -> Epoch {
218 sp_consensus_babe::Epoch {
219 epoch_index: 0,
220 start_slot: slot,
221 duration: genesis_config.epoch_length,
222 authorities: genesis_config.authorities.clone(),
223 randomness: genesis_config.randomness,
224 config: BabeEpochConfiguration {
225 c: genesis_config.c,
226 allowed_slots: genesis_config.allowed_slots,
227 },
228 }
229 .into()
230 }
231
232 pub fn clone_for_slot(&self, slot: Slot) -> Epoch {
240 let mut epoch = self.clone();
241
242 let skipped_epochs = *slot.saturating_sub(self.start_slot) / self.duration;
243
244 let epoch_index = epoch.epoch_index.checked_add(skipped_epochs).expect(
245 "epoch number is u64; it should be strictly smaller than number of slots; \
246 slots relate in some way to wall clock time; \
247 if u64 is not enough we should crash for safety; qed.",
248 );
249
250 let start_slot = skipped_epochs
251 .checked_mul(epoch.duration)
252 .and_then(|skipped_slots| epoch.start_slot.checked_add(skipped_slots))
253 .expect(
254 "slot number is u64; it should relate in some way to wall clock time; \
255 if u64 is not enough we should crash for safety; qed.",
256 );
257
258 epoch.epoch_index = epoch_index;
259 epoch.start_slot = Slot::from(start_slot);
260
261 epoch
262 }
263}
264
265#[derive(Debug, thiserror::Error)]
267pub enum Error<B: BlockT> {
268 #[error("Multiple BABE pre-runtime digests, rejecting!")]
270 MultiplePreRuntimeDigests,
271 #[error("No BABE pre-runtime digest found")]
273 NoPreRuntimeDigest,
274 #[error("Multiple BABE epoch change digests, rejecting!")]
276 MultipleEpochChangeDigests,
277 #[error("Multiple BABE config change digests, rejecting!")]
279 MultipleConfigChangeDigests,
280 #[error("Could not extract timestamp and slot: {0}")]
282 Extraction(ConsensusError),
283 #[error("Could not fetch epoch at {0:?}")]
285 FetchEpoch(B::Hash),
286 #[error("Header {0:?} rejected: too far in the future")]
288 TooFarInFuture(B::Hash),
289 #[error("Parent ({0}) of {1} unavailable. Cannot import")]
291 ParentUnavailable(B::Hash, B::Hash),
292 #[error("Slot number must increase: parent slot: {0}, this slot: {1}")]
294 SlotMustIncrease(Slot, Slot),
295 #[error("Header {0:?} has a bad seal")]
297 HeaderBadSeal(B::Hash),
298 #[error("Header {0:?} is unsealed")]
300 HeaderUnsealed(B::Hash),
301 #[error("Slot author not found")]
303 SlotAuthorNotFound,
304 #[error("Secondary slot assignments are disabled for the current epoch.")]
306 SecondarySlotAssignmentsDisabled,
307 #[error("Bad signature on {0:?}")]
309 BadSignature(B::Hash),
310 #[error("Invalid author: Expected secondary author: {0:?}, got: {1:?}.")]
312 InvalidAuthor(AuthorityId, AuthorityId),
313 #[error("No secondary author expected.")]
315 NoSecondaryAuthorExpected,
316 #[error("VRF verification failed")]
318 VrfVerificationFailed,
319 #[error("VRF output rejected, threshold {0} exceeded")]
321 VrfThresholdExceeded(u128),
322 #[error("Could not fetch parent header: {0}")]
324 FetchParentHeader(sp_blockchain::Error),
325 #[error("Expected epoch change to happen at {0:?}, s{1}")]
327 ExpectedEpochChange(B::Hash, Slot),
328 #[error("Unexpected config change")]
330 UnexpectedConfigChange,
331 #[error("Unexpected epoch change")]
333 UnexpectedEpochChange,
334 #[error("Parent block of {0} has no associated weight")]
336 ParentBlockNoAssociatedWeight(B::Hash),
337 #[error("Checking inherents failed: {0}")]
339 CheckInherents(sp_inherents::Error),
340 #[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))]
342 CheckInherentsUnhandled(sp_inherents::InherentIdentifier),
343 #[error("Creating inherents failed: {0}")]
345 CreateInherents(sp_inherents::Error),
346 #[error("Background worker is not running")]
348 BackgroundWorkerTerminated,
349 #[error(transparent)]
351 Client(sp_blockchain::Error),
352 #[error(transparent)]
354 RuntimeApi(sp_api::ApiError),
355 #[error(transparent)]
357 ForkTree(Box<fork_tree::Error<sp_blockchain::Error>>),
358}
359
360impl<B: BlockT> From<Error<B>> for String {
361 fn from(error: Error<B>) -> String {
362 error.to_string()
363 }
364}
365
366fn babe_err<B: BlockT>(error: Error<B>) -> Error<B> {
367 debug!(target: LOG_TARGET, "{}", error);
368 error
369}
370
371pub struct BabeIntermediate<B: BlockT> {
373 pub epoch_descriptor: ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
375}
376
377pub static INTERMEDIATE_KEY: &[u8] = b"babe1";
379
380pub fn configuration<B: BlockT, C>(client: &C) -> ClientResult<BabeConfiguration>
382where
383 C: AuxStore + ProvideRuntimeApi<B> + UsageProvider<B>,
384 C::Api: BabeApi<B>,
385{
386 let at_hash = if client.usage_info().chain.finalized_state.is_some() {
387 client.usage_info().chain.best_hash
388 } else {
389 debug!(target: LOG_TARGET, "No finalized state is available. Reading config from genesis");
390 client.usage_info().chain.genesis_hash
391 };
392
393 let runtime_api = client.runtime_api();
394 let version = runtime_api.api_version::<dyn BabeApi<B>>(at_hash)?;
395
396 let config = match version {
397 Some(1) => {
398 #[allow(deprecated)]
399 {
400 runtime_api.configuration_before_version_2(at_hash)?.into()
401 }
402 },
403 Some(2) => runtime_api.configuration(at_hash)?,
404 _ =>
405 return Err(sp_blockchain::Error::VersionInvalid(
406 "Unsupported or invalid BabeApi version".to_string(),
407 )),
408 };
409 Ok(config)
410}
411
412pub struct BabeParams<B: BlockT, C, SC, E, I, SO, L, CIDP, BS> {
414 pub keystore: KeystorePtr,
416
417 pub client: Arc<C>,
419
420 pub select_chain: SC,
422
423 pub env: E,
425
426 pub block_import: I,
430
431 pub sync_oracle: SO,
433
434 pub justification_sync_link: L,
436
437 pub create_inherent_data_providers: CIDP,
439
440 pub force_authoring: bool,
442
443 pub backoff_authoring_blocks: Option<BS>,
445
446 pub babe_link: BabeLink<B>,
448
449 pub block_proposal_slot_portion: SlotProportion,
455
456 pub max_block_proposal_slot_portion: Option<SlotProportion>,
459
460 pub telemetry: Option<TelemetryHandle>,
462}
463
464pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, L, Error>(
466 BabeParams {
467 keystore,
468 client,
469 select_chain,
470 env,
471 block_import,
472 sync_oracle,
473 justification_sync_link,
474 create_inherent_data_providers,
475 force_authoring,
476 backoff_authoring_blocks,
477 babe_link,
478 block_proposal_slot_portion,
479 max_block_proposal_slot_portion,
480 telemetry,
481 }: BabeParams<B, C, SC, E, I, SO, L, CIDP, BS>,
482) -> Result<BabeWorker<B>, ConsensusError>
483where
484 B: BlockT,
485 C: ProvideRuntimeApi<B>
486 + HeaderBackend<B>
487 + HeaderMetadata<B, Error = ClientError>
488 + Send
489 + Sync
490 + 'static,
491 C::Api: BabeApi<B>,
492 SC: SelectChain<B> + 'static,
493 E: Environment<B, Error = Error> + Send + Sync + 'static,
494 E::Proposer: Proposer<B, Error = Error>,
495 I: BlockImport<B, Error = ConsensusError> + Send + Sync + 'static,
496 SO: SyncOracle + Send + Sync + Clone + 'static,
497 L: sc_consensus::JustificationSyncLink<B> + 'static,
498 CIDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
499 CIDP::InherentDataProviders: InherentDataProviderExt + Send,
500 BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
501 Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
502{
503 let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
504
505 let worker = BabeSlotWorker {
506 client: client.clone(),
507 block_import,
508 env,
509 sync_oracle: sync_oracle.clone(),
510 justification_sync_link,
511 force_authoring,
512 backoff_authoring_blocks,
513 keystore,
514 epoch_changes: babe_link.epoch_changes.clone(),
515 slot_notification_sinks: slot_notification_sinks.clone(),
516 config: babe_link.config.clone(),
517 block_proposal_slot_portion,
518 max_block_proposal_slot_portion,
519 telemetry,
520 };
521
522 info!(target: LOG_TARGET, "๐ถ Starting BABE Authorship worker");
523
524 let slot_worker = sc_consensus_slots::start_slot_worker(
525 babe_link.config.slot_duration(),
526 select_chain,
527 sc_consensus_slots::SimpleSlotWorkerToSlotWorker(worker),
528 sync_oracle,
529 create_inherent_data_providers,
530 );
531
532 Ok(BabeWorker { inner: Box::pin(slot_worker), slot_notification_sinks })
533}
534
535fn aux_storage_cleanup<C: HeaderMetadata<Block> + HeaderBackend<Block>, Block: BlockT>(
539 client: &C,
540 notification: &FinalityNotification<Block>,
541) -> AuxDataOperations {
542 let mut hashes = HashSet::new();
543
544 let first = notification.tree_route.first().unwrap_or(¬ification.hash);
545 match client.header_metadata(*first) {
546 Ok(meta) => {
547 hashes.insert(meta.parent);
548 },
549 Err(err) => {
550 warn!(target: LOG_TARGET, "Failed to lookup metadata for block `{:?}`: {}", first, err,)
551 },
552 }
553
554 hashes.extend(
556 notification
557 .tree_route
558 .iter()
559 .filter(|h| **h != notification.hash),
562 );
563
564 let stale_forks = match client.expand_forks(¬ification.stale_heads) {
566 Ok(stale_forks) => stale_forks,
567 Err(e) => {
568 warn!(target: LOG_TARGET, "{:?}", e);
569
570 Default::default()
571 },
572 };
573 hashes.extend(stale_forks.iter());
574
575 hashes
576 .into_iter()
577 .map(|val| (aux_schema::block_weight_key(val), None))
578 .collect()
579}
580
581async fn answer_requests<B: BlockT, C>(
582 mut request_rx: Receiver<BabeRequest<B>>,
583 config: BabeConfiguration,
584 client: Arc<C>,
585 epoch_changes: SharedEpochChanges<B, Epoch>,
586) where
587 C: HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
588{
589 while let Some(request) = request_rx.next().await {
590 match request {
591 BabeRequest::EpochData(response) => {
592 let _ = response.send(epoch_changes.shared_data().clone());
593 },
594 BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, response) => {
595 let lookup = || {
596 let epoch_changes = epoch_changes.shared_data();
597 epoch_changes
598 .epoch_data_for_child_of(
599 descendent_query(&*client),
600 &parent_hash,
601 parent_number,
602 slot,
603 |slot| Epoch::genesis(&config, slot),
604 )
605 .map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
606 .ok_or(Error::<B>::FetchEpoch(parent_hash))
607 };
608
609 let _ = response.send(lookup());
610 },
611 }
612 }
613}
614
615enum BabeRequest<B: BlockT> {
617 EpochData(oneshot::Sender<EpochChangesFor<B, Epoch>>),
619 EpochDataForChildOf(B::Hash, NumberFor<B>, Slot, oneshot::Sender<Result<Epoch, Error<B>>>),
623}
624
625#[derive(Clone)]
627pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);
628
629impl<B: BlockT> BabeWorkerHandle<B> {
630 async fn send_request(&self, request: BabeRequest<B>) -> Result<(), Error<B>> {
631 match self.0.clone().send(request).await {
632 Err(err) if err.is_disconnected() => return Err(Error::BackgroundWorkerTerminated),
633 Err(err) => warn!(
634 target: LOG_TARGET,
635 "Unhandled error when sending request to worker: {:?}", err
636 ),
637 _ => {},
638 }
639
640 Ok(())
641 }
642
643 pub async fn epoch_data(&self) -> Result<EpochChangesFor<B, Epoch>, Error<B>> {
645 let (tx, rx) = oneshot::channel();
646 self.send_request(BabeRequest::EpochData(tx)).await?;
647
648 rx.await.or(Err(Error::BackgroundWorkerTerminated))
649 }
650
651 pub async fn epoch_data_for_child_of(
655 &self,
656 parent_hash: B::Hash,
657 parent_number: NumberFor<B>,
658 slot: Slot,
659 ) -> Result<Epoch, Error<B>> {
660 let (tx, rx) = oneshot::channel();
661 self.send_request(BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, tx))
662 .await?;
663
664 rx.await.or(Err(Error::BackgroundWorkerTerminated))?
665 }
666}
667
668#[must_use]
670pub struct BabeWorker<B: BlockT> {
671 inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
672 slot_notification_sinks: SlotNotificationSinks<B>,
673}
674
675impl<B: BlockT> BabeWorker<B> {
676 pub fn slot_notification_stream(
679 &self,
680 ) -> Receiver<(Slot, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
681 const CHANNEL_BUFFER_SIZE: usize = 1024;
682
683 let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
684 self.slot_notification_sinks.lock().push(sink);
685 stream
686 }
687}
688
689impl<B: BlockT> Future for BabeWorker<B> {
690 type Output = ();
691
692 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
693 self.inner.as_mut().poll(cx)
694 }
695}
696
697type SlotNotificationSinks<B> = Arc<
699 Mutex<Vec<Sender<(Slot, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>,
700>;
701
702struct BabeSlotWorker<B: BlockT, C, E, I, SO, L, BS> {
703 client: Arc<C>,
704 block_import: I,
705 env: E,
706 sync_oracle: SO,
707 justification_sync_link: L,
708 force_authoring: bool,
709 backoff_authoring_blocks: Option<BS>,
710 keystore: KeystorePtr,
711 epoch_changes: SharedEpochChanges<B, Epoch>,
712 slot_notification_sinks: SlotNotificationSinks<B>,
713 config: BabeConfiguration,
714 block_proposal_slot_portion: SlotProportion,
715 max_block_proposal_slot_portion: Option<SlotProportion>,
716 telemetry: Option<TelemetryHandle>,
717}
718
719#[async_trait::async_trait]
720impl<B, C, E, I, Error, SO, L, BS> sc_consensus_slots::SimpleSlotWorker<B>
721 for BabeSlotWorker<B, C, E, I, SO, L, BS>
722where
723 B: BlockT,
724 C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
725 C::Api: BabeApi<B>,
726 E: Environment<B, Error = Error> + Send + Sync,
727 E::Proposer: Proposer<B, Error = Error>,
728 I: BlockImport<B> + Send + Sync + 'static,
729 SO: SyncOracle + Send + Clone + Sync,
730 L: sc_consensus::JustificationSyncLink<B>,
731 BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync,
732 Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
733{
734 type Claim = (PreDigest, AuthorityId);
735 type SyncOracle = SO;
736 type JustificationSyncLink = L;
737 type CreateProposer =
738 Pin<Box<dyn Future<Output = Result<E::Proposer, ConsensusError>> + Send + 'static>>;
739 type Proposer = E::Proposer;
740 type BlockImport = I;
741 type AuxData = ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>;
742
743 fn logging_target(&self) -> &'static str {
744 LOG_TARGET
745 }
746
747 fn block_import(&mut self) -> &mut Self::BlockImport {
748 &mut self.block_import
749 }
750
751 fn aux_data(&self, parent: &B::Header, slot: Slot) -> Result<Self::AuxData, ConsensusError> {
752 self.epoch_changes
753 .shared_data()
754 .epoch_descriptor_for_child_of(
755 descendent_query(&*self.client),
756 &parent.hash(),
757 *parent.number(),
758 slot,
759 )
760 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
761 .ok_or(ConsensusError::InvalidAuthoritiesSet)
762 }
763
764 fn authorities_len(&self, epoch_descriptor: &Self::AuxData) -> Option<usize> {
765 self.epoch_changes
766 .shared_data()
767 .viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
768 .map(|epoch| epoch.as_ref().authorities.len())
769 }
770
771 async fn claim_slot(
772 &mut self,
773 _parent_header: &B::Header,
774 slot: Slot,
775 epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
776 ) -> Option<Self::Claim> {
777 debug!(target: LOG_TARGET, "Attempting to claim slot {}", slot);
778 let s = authorship::claim_slot(
779 slot,
780 self.epoch_changes
781 .shared_data()
782 .viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))?
783 .as_ref(),
784 &self.keystore,
785 );
786
787 if s.is_some() {
788 debug!(target: LOG_TARGET, "Claimed slot {}", slot);
789 }
790
791 s
792 }
793
794 fn notify_slot(
795 &self,
796 _parent_header: &B::Header,
797 slot: Slot,
798 epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
799 ) {
800 let sinks = &mut self.slot_notification_sinks.lock();
801 sinks.retain_mut(|sink| match sink.try_send((slot, epoch_descriptor.clone())) {
802 Ok(()) => true,
803 Err(e) =>
804 if e.is_full() {
805 warn!(target: LOG_TARGET, "Trying to notify a slot but the channel is full");
806 true
807 } else {
808 false
809 },
810 });
811 }
812
813 fn pre_digest_data(&self, _slot: Slot, claim: &Self::Claim) -> Vec<sp_runtime::DigestItem> {
814 vec![<DigestItem as CompatibleDigestItem>::babe_pre_digest(claim.0.clone())]
815 }
816
817 async fn block_import_params(
818 &self,
819 header: B::Header,
820 header_hash: &B::Hash,
821 body: Vec<B::Extrinsic>,
822 storage_changes: StorageChanges<B>,
823 (_, public): Self::Claim,
824 epoch_descriptor: Self::AuxData,
825 ) -> Result<BlockImportParams<B>, ConsensusError> {
826 let signature = self
827 .keystore
828 .sr25519_sign(<AuthorityId as AppCrypto>::ID, public.as_ref(), header_hash.as_ref())
829 .map_err(|e| ConsensusError::CannotSign(format!("{}. Key: {:?}", e, public)))?
830 .ok_or_else(|| {
831 ConsensusError::CannotSign(format!(
832 "Could not find key in keystore. Key: {:?}",
833 public
834 ))
835 })?;
836
837 let digest_item = <DigestItem as CompatibleDigestItem>::babe_seal(signature.into());
838
839 let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
840 import_block.post_digests.push(digest_item);
841 import_block.body = Some(body);
842 import_block.state_action =
843 StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
844 import_block
845 .insert_intermediate(INTERMEDIATE_KEY, BabeIntermediate::<B> { epoch_descriptor });
846
847 Ok(import_block)
848 }
849
850 fn force_authoring(&self) -> bool {
851 self.force_authoring
852 }
853
854 fn should_backoff(&self, slot: Slot, chain_head: &B::Header) -> bool {
855 if let Some(ref strategy) = self.backoff_authoring_blocks {
856 if let Ok(chain_head_slot) =
857 find_pre_digest::<B>(chain_head).map(|digest| digest.slot())
858 {
859 return strategy.should_backoff(
860 *chain_head.number(),
861 chain_head_slot,
862 self.client.info().finalized_number,
863 slot,
864 self.logging_target(),
865 )
866 }
867 }
868 false
869 }
870
871 fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
872 &mut self.sync_oracle
873 }
874
875 fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
876 &mut self.justification_sync_link
877 }
878
879 fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
880 Box::pin(self.env.init(block).map_err(|e| ConsensusError::ClientImport(e.to_string())))
881 }
882
883 fn telemetry(&self) -> Option<TelemetryHandle> {
884 self.telemetry.clone()
885 }
886
887 fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
888 let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());
889
890 sc_consensus_slots::proposing_remaining_duration(
891 parent_slot,
892 slot_info,
893 &self.block_proposal_slot_portion,
894 self.max_block_proposal_slot_portion.as_ref(),
895 sc_consensus_slots::SlotLenienceType::Exponential,
896 self.logging_target(),
897 )
898 }
899}
900
901pub fn find_pre_digest<B: BlockT>(header: &B::Header) -> Result<PreDigest, Error<B>> {
904 if header.number().is_zero() {
907 return Ok(PreDigest::SecondaryPlain(SecondaryPlainPreDigest {
908 slot: 0.into(),
909 authority_index: 0,
910 }))
911 }
912
913 let mut pre_digest: Option<_> = None;
914 for log in header.digest().logs() {
915 trace!(target: LOG_TARGET, "Checking log {:?}, looking for pre runtime digest", log);
916 match (log.as_babe_pre_digest(), pre_digest.is_some()) {
917 (Some(_), true) => return Err(babe_err(Error::MultiplePreRuntimeDigests)),
918 (None, _) => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
919 (s, false) => pre_digest = s,
920 }
921 }
922 pre_digest.ok_or_else(|| babe_err(Error::NoPreRuntimeDigest))
923}
924
925pub fn find_next_epoch_digest<B: BlockT>(
927 header: &B::Header,
928) -> Result<Option<NextEpochDescriptor>, Error<B>> {
929 let mut epoch_digest: Option<_> = None;
930 for log in header.digest().logs() {
931 trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
932 let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
933 match (log, epoch_digest.is_some()) {
934 (Some(ConsensusLog::NextEpochData(_)), true) =>
935 return Err(babe_err(Error::MultipleEpochChangeDigests)),
936 (Some(ConsensusLog::NextEpochData(epoch)), false) => epoch_digest = Some(epoch),
937 _ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
938 }
939 }
940
941 Ok(epoch_digest)
942}
943
944fn find_next_config_digest<B: BlockT>(
946 header: &B::Header,
947) -> Result<Option<NextConfigDescriptor>, Error<B>> {
948 let mut config_digest: Option<_> = None;
949 for log in header.digest().logs() {
950 trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
951 let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
952 match (log, config_digest.is_some()) {
953 (Some(ConsensusLog::NextConfigData(_)), true) =>
954 return Err(babe_err(Error::MultipleConfigChangeDigests)),
955 (Some(ConsensusLog::NextConfigData(config)), false) => config_digest = Some(config),
956 _ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
957 }
958 }
959
960 Ok(config_digest)
961}
962
963#[derive(Clone)]
965pub struct BabeLink<Block: BlockT> {
966 epoch_changes: SharedEpochChanges<Block, Epoch>,
967 config: BabeConfiguration,
968}
969
970impl<Block: BlockT> BabeLink<Block> {
971 pub fn epoch_changes(&self) -> &SharedEpochChanges<Block, Epoch> {
973 &self.epoch_changes
974 }
975
976 pub fn config(&self) -> &BabeConfiguration {
978 &self.config
979 }
980}
981
982pub struct BabeVerifier<Block: BlockT, Client> {
984 client: Arc<Client>,
985 slot_duration: SlotDuration,
986 config: BabeConfiguration,
987 epoch_changes: SharedEpochChanges<Block, Epoch>,
988 telemetry: Option<TelemetryHandle>,
989}
990
991#[async_trait::async_trait]
992impl<Block, Client> Verifier<Block> for BabeVerifier<Block, Client>
993where
994 Block: BlockT,
995 Client: HeaderMetadata<Block, Error = sp_blockchain::Error>
996 + HeaderBackend<Block>
997 + ProvideRuntimeApi<Block>
998 + Send
999 + Sync
1000 + AuxStore,
1001 Client::Api: BlockBuilderApi<Block> + BabeApi<Block>,
1002{
1003 async fn verify(
1004 &self,
1005 mut block: BlockImportParams<Block>,
1006 ) -> Result<BlockImportParams<Block>, String> {
1007 trace!(
1008 target: LOG_TARGET,
1009 "Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}",
1010 block.origin,
1011 block.header,
1012 block.justifications,
1013 block.body,
1014 );
1015
1016 let hash = block.header.hash();
1017 let parent_hash = *block.header.parent_hash();
1018
1019 let number = block.header.number();
1020
1021 if is_state_sync_or_gap_sync_import(&*self.client, &block) {
1022 return Ok(block)
1023 }
1024
1025 debug!(
1026 target: LOG_TARGET,
1027 "We have {:?} logs in this header",
1028 block.header.digest().logs().len()
1029 );
1030
1031 let slot_now = Slot::from_timestamp(Timestamp::current(), self.slot_duration);
1032
1033 let pre_digest = find_pre_digest::<Block>(&block.header)?;
1034 let (check_header, epoch_descriptor) = {
1035 let (epoch_descriptor, viable_epoch) = query_epoch_changes(
1036 &self.epoch_changes,
1037 self.client.as_ref(),
1038 &self.config,
1039 *number,
1040 pre_digest.slot(),
1041 parent_hash,
1042 )?;
1043
1044 let v_params = verification::VerificationParams {
1047 header: block.header.clone(),
1048 pre_digest: Some(pre_digest),
1049 slot_now: slot_now + 1,
1050 epoch: viable_epoch.as_ref(),
1051 };
1052
1053 (verification::check_header::<Block>(v_params)?, epoch_descriptor)
1054 };
1055
1056 match check_header {
1057 CheckedHeader::Checked(pre_header, verified_info) => {
1058 trace!(target: LOG_TARGET, "Checked {:?}; importing.", pre_header);
1059 telemetry!(
1060 self.telemetry;
1061 CONSENSUS_TRACE;
1062 "babe.checked_and_importing";
1063 "pre_header" => ?pre_header,
1064 );
1065
1066 block.header = pre_header;
1067 block.post_digests.push(verified_info.seal);
1068 block.insert_intermediate(
1069 INTERMEDIATE_KEY,
1070 BabeIntermediate::<Block> { epoch_descriptor },
1071 );
1072 block.post_hash = Some(hash);
1073
1074 Ok(block)
1075 },
1076 CheckedHeader::Deferred(a, b) => {
1077 debug!(target: LOG_TARGET, "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
1078 telemetry!(
1079 self.telemetry;
1080 CONSENSUS_DEBUG;
1081 "babe.header_too_far_in_future";
1082 "hash" => ?hash, "a" => ?a, "b" => ?b
1083 );
1084 Err(Error::<Block>::TooFarInFuture(hash).into())
1085 },
1086 }
1087 }
1088}
1089
1090fn is_state_sync_or_gap_sync_import<B: BlockT>(
1096 client: &impl HeaderBackend<B>,
1097 block: &BlockImportParams<B>,
1098) -> bool {
1099 let number = *block.header.number();
1100 let info = client.info();
1101 info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) ||
1102 block.with_state()
1103}
1104
1105pub struct BabeBlockImport<Block: BlockT, Client, I, CIDP, SC> {
1114 inner: I,
1115 client: Arc<Client>,
1116 epoch_changes: SharedEpochChanges<Block, Epoch>,
1117 create_inherent_data_providers: CIDP,
1118 config: BabeConfiguration,
1119 select_chain: SC,
1124 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1128}
1129
1130impl<Block: BlockT, I: Clone, Client, CIDP: Clone, SC: Clone> Clone
1131 for BabeBlockImport<Block, Client, I, CIDP, SC>
1132{
1133 fn clone(&self) -> Self {
1134 BabeBlockImport {
1135 inner: self.inner.clone(),
1136 client: self.client.clone(),
1137 epoch_changes: self.epoch_changes.clone(),
1138 config: self.config.clone(),
1139 create_inherent_data_providers: self.create_inherent_data_providers.clone(),
1140 select_chain: self.select_chain.clone(),
1141 offchain_tx_pool_factory: self.offchain_tx_pool_factory.clone(),
1142 }
1143 }
1144}
1145
1146impl<Block: BlockT, Client, I, CIDP, SC> BabeBlockImport<Block, Client, I, CIDP, SC> {
1147 fn new(
1148 client: Arc<Client>,
1149 epoch_changes: SharedEpochChanges<Block, Epoch>,
1150 block_import: I,
1151 config: BabeConfiguration,
1152 create_inherent_data_providers: CIDP,
1153 select_chain: SC,
1154 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1155 ) -> Self {
1156 BabeBlockImport {
1157 client,
1158 inner: block_import,
1159 epoch_changes,
1160 config,
1161 create_inherent_data_providers,
1162 select_chain,
1163 offchain_tx_pool_factory,
1164 }
1165 }
1166}
1167
1168impl<Block, Client, Inner, CIDP, SC> BabeBlockImport<Block, Client, Inner, CIDP, SC>
1169where
1170 Block: BlockT,
1171 Inner: BlockImport<Block> + Send + Sync,
1172 Inner::Error: Into<ConsensusError>,
1173 Client: HeaderBackend<Block>
1174 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1175 + AuxStore
1176 + ProvideRuntimeApi<Block>
1177 + Send
1178 + Sync,
1179 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1180 CIDP: CreateInherentDataProviders<Block, ()>,
1181 CIDP::InherentDataProviders: InherentDataProviderExt + Send,
1182 SC: sp_consensus::SelectChain<Block> + 'static,
1183{
1184 async fn import_state(
1188 &self,
1189 mut block: BlockImportParams<Block>,
1190 ) -> Result<ImportResult, ConsensusError> {
1191 let hash = block.post_hash();
1192 let parent_hash = *block.header.parent_hash();
1193 let number = *block.header.number();
1194
1195 block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
1196 aux_schema::write_block_weight(hash, 0, |values| {
1198 block
1199 .auxiliary
1200 .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1201 });
1202
1203 let import_result = self.inner.import_block(block).await;
1205 let aux = match import_result {
1206 Ok(ImportResult::Imported(aux)) => aux,
1207 Ok(r) =>
1208 return Err(ConsensusError::ClientImport(format!(
1209 "Unexpected import result: {:?}",
1210 r
1211 ))),
1212 Err(r) => return Err(r.into()),
1213 };
1214
1215 let current_epoch = self.client.runtime_api().current_epoch(hash).map_err(|e| {
1217 ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1218 })?;
1219 let next_epoch = self.client.runtime_api().next_epoch(hash).map_err(|e| {
1220 ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1221 })?;
1222
1223 let mut epoch_changes = self.epoch_changes.shared_data_locked();
1224 epoch_changes.reset(parent_hash, hash, number, current_epoch.into(), next_epoch.into());
1225 aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1226 self.client.insert_aux(insert, [])
1227 })
1228 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1229
1230 Ok(ImportResult::Imported(aux))
1231 }
1232
1233 async fn check_inherents_and_equivocations(
1235 &self,
1236 block: &mut BlockImportParams<Block>,
1237 ) -> Result<(), ConsensusError> {
1238 if is_state_sync_or_gap_sync_import(&*self.client, block) {
1239 return Ok(())
1240 }
1241
1242 let parent_hash = *block.header.parent_hash();
1243 let number = *block.header.number();
1244
1245 let create_inherent_data_providers = self
1246 .create_inherent_data_providers
1247 .create_inherent_data_providers(parent_hash, ())
1248 .await?;
1249
1250 let slot_now = create_inherent_data_providers.slot();
1251
1252 let babe_pre_digest = find_pre_digest::<Block>(&block.header)
1253 .map_err(|e| ConsensusError::Other(Box::new(e)))?;
1254 let slot = babe_pre_digest.slot();
1255
1256 self.check_inherents(block, parent_hash, slot, create_inherent_data_providers)
1258 .await?;
1259
1260 let author = {
1262 let viable_epoch = query_epoch_changes(
1263 &self.epoch_changes,
1264 self.client.as_ref(),
1265 &self.config,
1266 number,
1267 slot,
1268 parent_hash,
1269 )
1270 .map_err(|e| ConsensusError::Other(babe_err(e).into()))?
1271 .1;
1272 match viable_epoch
1273 .as_ref()
1274 .authorities
1275 .get(babe_pre_digest.authority_index() as usize)
1276 {
1277 Some(author) => author.0.clone(),
1278 None =>
1279 return Err(ConsensusError::Other(Error::<Block>::SlotAuthorNotFound.into())),
1280 }
1281 };
1282 if let Err(err) = self
1283 .check_and_report_equivocation(slot_now, slot, &block.header, &author, &block.origin)
1284 .await
1285 {
1286 warn!(
1287 target: LOG_TARGET,
1288 "Error checking/reporting BABE equivocation: {}", err
1289 );
1290 }
1291 Ok(())
1292 }
1293
1294 async fn check_inherents(
1295 &self,
1296 block: &mut BlockImportParams<Block>,
1297 at_hash: Block::Hash,
1298 slot: Slot,
1299 create_inherent_data_providers: CIDP::InherentDataProviders,
1300 ) -> Result<(), ConsensusError> {
1301 if block.state_action.skip_execution_checks() {
1302 return Ok(())
1303 }
1304
1305 if let Some(inner_body) = block.body.take() {
1306 let new_block = Block::new(block.header.clone(), inner_body);
1307 let mut inherent_data = create_inherent_data_providers
1311 .create_inherent_data()
1312 .await
1313 .map_err(|e| ConsensusError::Other(Box::new(e)))?;
1314 inherent_data.babe_replace_inherent_data(slot);
1315
1316 use sp_block_builder::CheckInherentsError;
1317
1318 sp_block_builder::check_inherents_with_data(
1319 self.client.clone(),
1320 at_hash,
1321 new_block.clone(),
1322 &create_inherent_data_providers,
1323 inherent_data,
1324 )
1325 .await
1326 .map_err(|e| {
1327 ConsensusError::Other(Box::new(match e {
1328 CheckInherentsError::CreateInherentData(e) =>
1329 Error::<Block>::CreateInherents(e),
1330 CheckInherentsError::Client(e) => Error::RuntimeApi(e),
1331 CheckInherentsError::CheckInherents(e) => Error::CheckInherents(e),
1332 CheckInherentsError::CheckInherentsUnknownError(id) =>
1333 Error::CheckInherentsUnhandled(id),
1334 }))
1335 })?;
1336 let (_, inner_body) = new_block.deconstruct();
1337 block.body = Some(inner_body);
1338 }
1339
1340 Ok(())
1341 }
1342
1343 async fn check_and_report_equivocation(
1344 &self,
1345 slot_now: Slot,
1346 slot: Slot,
1347 header: &Block::Header,
1348 author: &AuthorityId,
1349 origin: &BlockOrigin,
1350 ) -> Result<(), Error<Block>> {
1351 if *origin == BlockOrigin::NetworkInitialSync {
1354 return Ok(())
1355 }
1356
1357 let Some(equivocation_proof) =
1359 check_equivocation(&*self.client, slot_now, slot, header, author)
1360 .map_err(Error::Client)?
1361 else {
1362 return Ok(())
1363 };
1364
1365 info!(
1366 "Slot author {:?} is equivocating at slot {} with headers {:?} and {:?}",
1367 author,
1368 slot,
1369 equivocation_proof.first_header.hash(),
1370 equivocation_proof.second_header.hash(),
1371 );
1372
1373 let best_hash = self
1375 .select_chain
1376 .best_chain()
1377 .await
1378 .map(|h| h.hash())
1379 .map_err(|e| Error::Client(e.into()))?;
1380
1381 let generate_key_owner_proof = |at_hash: Block::Hash| {
1390 self.client
1391 .runtime_api()
1392 .generate_key_ownership_proof(at_hash, slot, equivocation_proof.offender.clone())
1393 .map_err(Error::RuntimeApi)
1394 };
1395
1396 let parent_hash = *header.parent_hash();
1397 let key_owner_proof = match generate_key_owner_proof(parent_hash)? {
1398 Some(proof) => proof,
1399 None => match generate_key_owner_proof(best_hash)? {
1400 Some(proof) => proof,
1401 None => {
1402 debug!(
1403 target: LOG_TARGET,
1404 "Equivocation offender is not part of the authority set."
1405 );
1406 return Ok(())
1407 },
1408 },
1409 };
1410
1411 let mut runtime_api = self.client.runtime_api();
1413
1414 runtime_api
1416 .register_extension(self.offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
1417
1418 runtime_api
1419 .submit_report_equivocation_unsigned_extrinsic(
1420 best_hash,
1421 equivocation_proof,
1422 key_owner_proof,
1423 )
1424 .map_err(Error::RuntimeApi)?;
1425
1426 info!(target: LOG_TARGET, "Submitted equivocation report for author {:?}", author);
1427
1428 Ok(())
1429 }
1430}
1431
1432#[async_trait::async_trait]
1433impl<Block, Client, Inner, CIDP, SC> BlockImport<Block>
1434 for BabeBlockImport<Block, Client, Inner, CIDP, SC>
1435where
1436 Block: BlockT,
1437 Inner: BlockImport<Block> + Send + Sync,
1438 Inner::Error: Into<ConsensusError>,
1439 Client: HeaderBackend<Block>
1440 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1441 + AuxStore
1442 + ProvideRuntimeApi<Block>
1443 + Send
1444 + Sync,
1445 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1446 CIDP: CreateInherentDataProviders<Block, ()>,
1447 CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
1448 SC: SelectChain<Block> + 'static,
1449{
1450 type Error = ConsensusError;
1451
1452 async fn import_block(
1453 &self,
1454 mut block: BlockImportParams<Block>,
1455 ) -> Result<ImportResult, Self::Error> {
1456 let hash = block.post_hash();
1457 let number = *block.header.number();
1458 let info = self.client.info();
1459
1460 self.check_inherents_and_equivocations(&mut block).await?;
1461
1462 let block_status = self
1463 .client
1464 .status(hash)
1465 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1466
1467 if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) ||
1471 block_status == BlockStatus::InChain
1472 {
1473 let _ = block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY);
1476 block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1477 return self.inner.import_block(block).await.map_err(Into::into)
1478 }
1479
1480 if block.with_state() {
1481 return self.import_state(block).await
1482 }
1483
1484 let pre_digest = find_pre_digest::<Block>(&block.header).expect(
1485 "valid babe headers must contain a predigest; header has been already verified; qed",
1486 );
1487 let slot = pre_digest.slot();
1488
1489 let parent_hash = *block.header.parent_hash();
1490 let parent_header = self
1491 .client
1492 .header(parent_hash)
1493 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1494 .ok_or_else(|| {
1495 ConsensusError::ChainLookup(
1496 babe_err(Error::<Block>::ParentUnavailable(parent_hash, hash)).into(),
1497 )
1498 })?;
1499
1500 let parent_slot = find_pre_digest::<Block>(&parent_header).map(|d| d.slot()).expect(
1501 "parent is non-genesis; valid BABE headers contain a pre-digest; header has already \
1502 been verified; qed",
1503 );
1504
1505 if slot <= parent_slot {
1507 return Err(ConsensusError::ClientImport(
1508 babe_err(Error::<Block>::SlotMustIncrease(parent_slot, slot)).into(),
1509 ))
1510 }
1511
1512 let mut old_epoch_changes = None;
1515
1516 let mut epoch_changes = {
1519 let mut epoch_changes = self.epoch_changes.shared_data_locked();
1520
1521 let (epoch_descriptor, first_in_epoch, parent_weight) = {
1527 let parent_weight = if *parent_header.number() == Zero::zero() {
1528 0
1529 } else {
1530 aux_schema::load_block_weight(&*self.client, parent_hash)
1531 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1532 .ok_or_else(|| {
1533 ConsensusError::ClientImport(
1534 babe_err(Error::<Block>::ParentBlockNoAssociatedWeight(hash))
1535 .into(),
1536 )
1537 })?
1538 };
1539
1540 let intermediate =
1541 block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)?;
1542
1543 let epoch_descriptor = intermediate.epoch_descriptor;
1544 let first_in_epoch = parent_slot < epoch_descriptor.start_slot();
1545 (epoch_descriptor, first_in_epoch, parent_weight)
1546 };
1547
1548 let total_weight = parent_weight + pre_digest.added_weight();
1549
1550 let next_epoch_digest = find_next_epoch_digest::<Block>(&block.header)
1552 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1553 let next_config_digest = find_next_config_digest::<Block>(&block.header)
1554 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1555
1556 match (first_in_epoch, next_epoch_digest.is_some(), next_config_digest.is_some()) {
1557 (true, true, _) => {},
1558 (false, false, false) => {},
1559 (false, false, true) =>
1560 return Err(ConsensusError::ClientImport(
1561 babe_err(Error::<Block>::UnexpectedConfigChange).into(),
1562 )),
1563 (true, false, _) =>
1564 return Err(ConsensusError::ClientImport(
1565 babe_err(Error::<Block>::ExpectedEpochChange(hash, slot)).into(),
1566 )),
1567 (false, true, _) =>
1568 return Err(ConsensusError::ClientImport(
1569 babe_err(Error::<Block>::UnexpectedEpochChange).into(),
1570 )),
1571 }
1572
1573 if let Some(next_epoch_descriptor) = next_epoch_digest {
1574 old_epoch_changes = Some((*epoch_changes).clone());
1575
1576 let mut viable_epoch = epoch_changes
1577 .viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
1578 .ok_or_else(|| {
1579 ConsensusError::ClientImport(Error::<Block>::FetchEpoch(parent_hash).into())
1580 })?
1581 .into_cloned();
1582
1583 let epoch_config = next_config_digest
1584 .map(Into::into)
1585 .unwrap_or_else(|| viable_epoch.as_ref().config.clone());
1586
1587 let log_level = if block.origin == BlockOrigin::NetworkInitialSync {
1589 log::Level::Debug
1590 } else {
1591 log::Level::Info
1592 };
1593
1594 if viable_epoch.as_ref().end_slot() <= slot {
1595 let epoch = viable_epoch.as_mut();
1609 let prev_index = epoch.epoch_index;
1610 *epoch = epoch.clone_for_slot(slot);
1611
1612 warn!(
1613 target: LOG_TARGET,
1614 "๐ถ Epoch(s) skipped: from {} to {}", prev_index, epoch.epoch_index,
1615 );
1616 }
1617
1618 log!(
1619 target: LOG_TARGET,
1620 log_level,
1621 "๐ถ New epoch {} launching at block {} (block slot {} >= start slot {}).",
1622 viable_epoch.as_ref().epoch_index,
1623 hash,
1624 slot,
1625 viable_epoch.as_ref().start_slot,
1626 );
1627
1628 let next_epoch = viable_epoch.increment((next_epoch_descriptor, epoch_config));
1629
1630 log!(
1631 target: LOG_TARGET,
1632 log_level,
1633 "๐ถ Next epoch starts at slot {}",
1634 next_epoch.as_ref().start_slot,
1635 );
1636
1637 let prune_and_import = || {
1645 prune_finalized(self.client.clone(), &mut epoch_changes)?;
1646
1647 epoch_changes
1648 .import(
1649 descendent_query(&*self.client),
1650 hash,
1651 number,
1652 *block.header.parent_hash(),
1653 next_epoch,
1654 )
1655 .map_err(|e| {
1656 ConsensusError::ClientImport(format!(
1657 "Error importing epoch changes: {}",
1658 e
1659 ))
1660 })?;
1661 Ok(())
1662 };
1663
1664 if let Err(e) = prune_and_import() {
1665 debug!(target: LOG_TARGET, "Failed to launch next epoch: {}", e);
1666 *epoch_changes =
1667 old_epoch_changes.expect("set `Some` above and not taken; qed");
1668 return Err(e)
1669 }
1670
1671 crate::aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1672 block
1673 .auxiliary
1674 .extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1675 });
1676 }
1677
1678 aux_schema::write_block_weight(hash, total_weight, |values| {
1679 block
1680 .auxiliary
1681 .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1682 });
1683
1684 block.fork_choice = {
1688 let (last_best, last_best_number) = (info.best_hash, info.best_number);
1689
1690 let last_best_weight = if &last_best == block.header.parent_hash() {
1691 parent_weight
1694 } else {
1695 aux_schema::load_block_weight(&*self.client, last_best)
1696 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1697 .ok_or_else(|| {
1698 ConsensusError::ChainLookup(
1699 "No block weight for parent header.".to_string(),
1700 )
1701 })?
1702 };
1703
1704 Some(ForkChoiceStrategy::Custom(if total_weight > last_best_weight {
1705 true
1706 } else if total_weight == last_best_weight {
1707 number > last_best_number
1708 } else {
1709 false
1710 }))
1711 };
1712
1713 epoch_changes.release_mutex()
1715 };
1716
1717 let import_result = self.inner.import_block(block).await;
1718
1719 if import_result.is_err() {
1722 if let Some(old_epoch_changes) = old_epoch_changes {
1723 *epoch_changes.upgrade() = old_epoch_changes;
1724 }
1725 }
1726
1727 import_result.map_err(Into::into)
1728 }
1729
1730 async fn check_block(
1731 &self,
1732 block: BlockCheckParams<Block>,
1733 ) -> Result<ImportResult, Self::Error> {
1734 self.inner.check_block(block).await.map_err(Into::into)
1735 }
1736}
1737
1738fn prune_finalized<Block, Client>(
1740 client: Arc<Client>,
1741 epoch_changes: &mut EpochChangesFor<Block, Epoch>,
1742) -> Result<(), ConsensusError>
1743where
1744 Block: BlockT,
1745 Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
1746{
1747 let info = client.info();
1748
1749 let finalized_slot = {
1750 let finalized_header = client
1751 .header(info.finalized_hash)
1752 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1753 .expect(
1754 "best finalized hash was given by client; finalized headers must exist in db; qed",
1755 );
1756
1757 find_pre_digest::<Block>(&finalized_header)
1758 .expect("finalized header must be valid; valid blocks have a pre-digest; qed")
1759 .slot()
1760 };
1761
1762 epoch_changes
1763 .prune_finalized(
1764 descendent_query(&*client),
1765 &info.finalized_hash,
1766 info.finalized_number,
1767 finalized_slot,
1768 )
1769 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1770
1771 Ok(())
1772}
1773
1774pub fn block_import<Client, Block: BlockT, I, CIDP, SC>(
1780 config: BabeConfiguration,
1781 wrapped_block_import: I,
1782 client: Arc<Client>,
1783 create_inherent_data_providers: CIDP,
1784 select_chain: SC,
1785 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1786) -> ClientResult<(BabeBlockImport<Block, Client, I, CIDP, SC>, BabeLink<Block>)>
1787where
1788 Client: AuxStore
1789 + HeaderBackend<Block>
1790 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1791 + PreCommitActions<Block>
1792 + 'static,
1793{
1794 let epoch_changes = aux_schema::load_epoch_changes::<Block, _>(&*client, &config)?;
1795 let link = BabeLink { epoch_changes: epoch_changes.clone(), config: config.clone() };
1796
1797 prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;
1801
1802 let client_weak = Arc::downgrade(&client);
1803 let on_finality = move |summary: &FinalityNotification<Block>| {
1804 if let Some(client) = client_weak.upgrade() {
1805 aux_storage_cleanup(client.as_ref(), summary)
1806 } else {
1807 Default::default()
1808 }
1809 };
1810 client.register_finality_action(Box::new(on_finality));
1811
1812 let import = BabeBlockImport::new(
1813 client,
1814 epoch_changes,
1815 wrapped_block_import,
1816 config,
1817 create_inherent_data_providers,
1818 select_chain,
1819 offchain_tx_pool_factory,
1820 );
1821
1822 Ok((import, link))
1823}
1824
1825pub struct ImportQueueParams<'a, Block: BlockT, BI, Client, Spawn> {
1827 pub link: BabeLink<Block>,
1829 pub block_import: BI,
1831 pub justification_import: Option<BoxJustificationImport<Block>>,
1833 pub client: Arc<Client>,
1835 pub slot_duration: SlotDuration,
1837 pub spawner: &'a Spawn,
1839 pub registry: Option<&'a Registry>,
1841 pub telemetry: Option<TelemetryHandle>,
1843}
1844
1845pub fn import_queue<Block: BlockT, Client, BI, Spawn>(
1855 ImportQueueParams {
1856 link: babe_link,
1857 block_import,
1858 justification_import,
1859 client,
1860 slot_duration,
1861 spawner,
1862 registry,
1863 telemetry,
1864 }: ImportQueueParams<'_, Block, BI, Client, Spawn>,
1865) -> ClientResult<(DefaultImportQueue<Block>, BabeWorkerHandle<Block>)>
1866where
1867 BI: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1868 Client: ProvideRuntimeApi<Block>
1869 + HeaderBackend<Block>
1870 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1871 + AuxStore
1872 + Send
1873 + Sync
1874 + 'static,
1875 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1876 Spawn: SpawnEssentialNamed,
1877{
1878 const HANDLE_BUFFER_SIZE: usize = 1024;
1879
1880 let verifier = BabeVerifier {
1881 slot_duration,
1882 config: babe_link.config.clone(),
1883 epoch_changes: babe_link.epoch_changes.clone(),
1884 telemetry,
1885 client: client.clone(),
1886 };
1887
1888 let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
1889
1890 let answer_requests =
1891 answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
1892
1893 spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());
1894
1895 Ok((
1896 BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
1897 BabeWorkerHandle(worker_tx),
1898 ))
1899}
1900
1901pub fn revert<Block, Client, Backend>(
1905 client: Arc<Client>,
1906 backend: Arc<Backend>,
1907 blocks: NumberFor<Block>,
1908) -> ClientResult<()>
1909where
1910 Block: BlockT,
1911 Client: AuxStore
1912 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1913 + HeaderBackend<Block>
1914 + ProvideRuntimeApi<Block>
1915 + UsageProvider<Block>,
1916 Client::Api: BabeApi<Block>,
1917 Backend: BackendT<Block>,
1918{
1919 let best_number = client.info().best_number;
1920 let finalized = client.info().finalized_number;
1921
1922 let revertible = blocks.min(best_number - finalized);
1923 if revertible == Zero::zero() {
1924 return Ok(())
1925 }
1926
1927 let revert_up_to_number = best_number - revertible;
1928 let revert_up_to_hash = client.hash(revert_up_to_number)?.ok_or(ClientError::Backend(
1929 format!("Unexpected hash lookup failure for block number: {}", revert_up_to_number),
1930 ))?;
1931
1932 let config = configuration(&*client)?;
1936 let epoch_changes = aux_schema::load_epoch_changes::<Block, Client>(&*client, &config)?;
1937 let mut epoch_changes = epoch_changes.shared_data();
1938
1939 if revert_up_to_number == Zero::zero() {
1940 *epoch_changes = EpochChangesFor::<Block, Epoch>::default();
1942 } else {
1943 epoch_changes.revert(descendent_query(&*client), revert_up_to_hash, revert_up_to_number);
1944 }
1945
1946 let mut weight_keys = HashSet::with_capacity(revertible.saturated_into());
1949
1950 let leaves = backend.blockchain().leaves()?.into_iter().filter(|&leaf| {
1951 sp_blockchain::tree_route(&*client, revert_up_to_hash, leaf)
1952 .map(|route| route.retracted().is_empty())
1953 .unwrap_or_default()
1954 });
1955
1956 for leaf in leaves {
1957 let mut hash = leaf;
1958 loop {
1959 let meta = client.header_metadata(hash)?;
1960 if meta.number <= revert_up_to_number ||
1961 !weight_keys.insert(aux_schema::block_weight_key(hash))
1962 {
1963 break
1965 }
1966 hash = meta.parent;
1967 }
1968 }
1969
1970 let weight_keys: Vec<_> = weight_keys.iter().map(|val| val.as_slice()).collect();
1971
1972 aux_schema::write_epoch_changes::<Block, _, _>(&epoch_changes, |values| {
1974 client.insert_aux(values, weight_keys.iter())
1975 })
1976}
1977
1978fn query_epoch_changes<Block, Client>(
1979 epoch_changes: &SharedEpochChanges<Block, Epoch>,
1980 client: &Client,
1981 config: &BabeConfiguration,
1982 block_number: NumberFor<Block>,
1983 slot: Slot,
1984 parent_hash: Block::Hash,
1985) -> Result<
1986 (ViableEpochDescriptor<Block::Hash, NumberFor<Block>, Epoch>, ViableEpoch<Epoch>),
1987 Error<Block>,
1988>
1989where
1990 Block: BlockT,
1991 Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
1992{
1993 let epoch_changes = epoch_changes.shared_data();
1994 let epoch_descriptor = epoch_changes
1995 .epoch_descriptor_for_child_of(
1996 descendent_query(client),
1997 &parent_hash,
1998 block_number - 1u32.into(),
1999 slot,
2000 )
2001 .map_err(|e| Error::<Block>::ForkTree(Box::new(e)))?
2002 .ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2003 let viable_epoch = epoch_changes
2004 .viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&config, slot))
2005 .ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2006 Ok((epoch_descriptor, viable_epoch.into_cloned()))
2007}