1#![cfg_attr(not(feature = "std"), no_std)]
78
79mod benchmarking;
80pub mod migration;
81mod mock;
82mod tests;
83pub mod weights;
84
85extern crate alloc;
86
87use alloc::{vec, vec::Vec};
88use codec::{Decode, DecodeWithMemTracking, Encode, MaxEncodedLen};
89use frame_support::{
90 pallet_prelude::*,
91 traits::{
92 EstimateNextSessionRotation, Get, OneSessionHandler, ValidatorSet,
93 ValidatorSetWithIdentification,
94 },
95 BoundedSlice, WeakBoundedVec,
96};
97use frame_system::{
98 offchain::{CreateAuthorizedTransaction, SubmitTransaction},
99 pallet_prelude::*,
100};
101pub use pallet::*;
102use scale_info::TypeInfo;
103use sp_application_crypto::RuntimeAppPublic;
104use sp_runtime::{
105 offchain::storage::{MutateStorageError, StorageRetrievalError, StorageValueRef},
106 traits::{AtLeast32BitUnsigned, Convert, Saturating, TrailingZeroInput},
107 transaction_validity::TransactionValidityWithRefund,
108 Debug, PerThing, Perbill, Permill, SaturatedConversion,
109};
110use sp_staking::{
111 offence::{Kind, Offence, ReportOffence},
112 SessionIndex,
113};
114pub use weights::WeightInfo;
115
116pub mod sr25519 {
117 mod app_sr25519 {
118 use sp_application_crypto::{app_crypto, key_types::IM_ONLINE, sr25519};
119 app_crypto!(sr25519, IM_ONLINE);
120 }
121
122 sp_application_crypto::with_pair! {
123 pub type AuthorityPair = app_sr25519::Pair;
125 }
126
127 pub type AuthoritySignature = app_sr25519::Signature;
129
130 pub type AuthorityId = app_sr25519::Public;
132}
133
134pub mod ed25519 {
135 mod app_ed25519 {
136 use sp_application_crypto::{app_crypto, ed25519, key_types::IM_ONLINE};
137 app_crypto!(ed25519, IM_ONLINE);
138 }
139
140 sp_application_crypto::with_pair! {
141 pub type AuthorityPair = app_ed25519::Pair;
143 }
144
145 pub type AuthoritySignature = app_ed25519::Signature;
147
148 pub type AuthorityId = app_ed25519::Public;
150}
151
152const DB_PREFIX: &[u8] = b"parity/im-online-heartbeat/";
153const INCLUDE_THRESHOLD: u32 = 3;
156
157#[derive(Encode, Decode, Clone, PartialEq, Eq, Debug, TypeInfo)]
163struct HeartbeatStatus<BlockNumber> {
164 pub session_index: SessionIndex,
166 pub sent_at: BlockNumber,
171}
172
173impl<BlockNumber: PartialEq + AtLeast32BitUnsigned + Copy> HeartbeatStatus<BlockNumber> {
174 fn is_recent(&self, session_index: SessionIndex, now: BlockNumber) -> bool {
187 self.session_index == session_index && self.sent_at + INCLUDE_THRESHOLD.into() > now
188 }
189}
190
191#[cfg_attr(test, derive(PartialEq))]
193enum OffchainErr<BlockNumber> {
194 TooEarly,
195 WaitingForInclusion(BlockNumber),
196 AlreadyOnline(u32),
197 FailedSigning,
198 FailedToAcquireLock,
199 SubmitTransaction,
200}
201
202impl<BlockNumber: core::fmt::Debug> core::fmt::Debug for OffchainErr<BlockNumber> {
203 fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
204 match *self {
205 OffchainErr::TooEarly => write!(fmt, "Too early to send heartbeat."),
206 OffchainErr::WaitingForInclusion(ref block) => {
207 write!(fmt, "Heartbeat already sent at {:?}. Waiting for inclusion.", block)
208 },
209 OffchainErr::AlreadyOnline(auth_idx) => {
210 write!(fmt, "Authority {} is already online", auth_idx)
211 },
212 OffchainErr::FailedSigning => write!(fmt, "Failed to sign heartbeat"),
213 OffchainErr::FailedToAcquireLock => write!(fmt, "Failed to acquire lock"),
214 OffchainErr::SubmitTransaction => write!(fmt, "Failed to submit transaction"),
215 }
216 }
217}
218
219pub type AuthIndex = u32;
220
221#[derive(Encode, Decode, DecodeWithMemTracking, Clone, PartialEq, Eq, Debug, TypeInfo)]
223pub struct Heartbeat<BlockNumber>
224where
225 BlockNumber: PartialEq + Eq + Decode + Encode,
226{
227 pub block_number: BlockNumber,
229 pub session_index: SessionIndex,
231 pub authority_index: AuthIndex,
233 pub validators_len: u32,
235}
236
237pub type ValidatorId<T> = <<T as Config>::ValidatorSet as ValidatorSet<
239 <T as frame_system::Config>::AccountId,
240>>::ValidatorId;
241
242pub type IdentificationTuple<T> = (
245 ValidatorId<T>,
246 <<T as Config>::ValidatorSet as ValidatorSetWithIdentification<
247 <T as frame_system::Config>::AccountId,
248 >>::Identification,
249);
250
251type OffchainResult<T, A> = Result<A, OffchainErr<BlockNumberFor<T>>>;
252
253#[frame_support::pallet]
254pub mod pallet {
255 use super::*;
256
257 const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
259
260 #[pallet::pallet]
261 #[pallet::storage_version(STORAGE_VERSION)]
262 pub struct Pallet<T>(_);
263
264 #[pallet::config]
265 pub trait Config: CreateAuthorizedTransaction<Call<Self>> + frame_system::Config {
270 type AuthorityId: Member
272 + Parameter
273 + RuntimeAppPublic
274 + Ord
275 + MaybeSerializeDeserialize
276 + MaxEncodedLen;
277
278 type MaxKeys: Get<u32>;
280
281 type MaxPeerInHeartbeats: Get<u32>;
283
284 #[allow(deprecated)]
286 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
287
288 type ValidatorSet: ValidatorSetWithIdentification<Self::AccountId>;
290
291 type NextSessionRotation: EstimateNextSessionRotation<BlockNumberFor<Self>>;
299
300 type ReportUnresponsiveness: ReportOffence<
302 Self::AccountId,
303 IdentificationTuple<Self>,
304 UnresponsivenessOffence<IdentificationTuple<Self>>,
305 >;
306
307 #[pallet::constant]
312 type UnsignedPriority: Get<TransactionPriority>;
313
314 type WeightInfo: WeightInfo;
316 }
317
318 #[pallet::event]
319 #[pallet::generate_deposit(pub(super) fn deposit_event)]
320 pub enum Event<T: Config> {
321 HeartbeatReceived { authority_id: T::AuthorityId },
323 AllGood,
325 SomeOffline { offline: Vec<IdentificationTuple<T>> },
327 }
328
329 #[pallet::error]
330 pub enum Error<T> {
331 InvalidKey,
333 DuplicatedHeartbeat,
335 }
336
337 #[pallet::storage]
349 pub type HeartbeatAfter<T: Config> = StorageValue<_, BlockNumberFor<T>, ValueQuery>;
350
351 #[pallet::storage]
353 pub type Keys<T: Config> =
354 StorageValue<_, WeakBoundedVec<T::AuthorityId, T::MaxKeys>, ValueQuery>;
355
356 #[pallet::storage]
358 pub type ReceivedHeartbeats<T: Config> =
359 StorageDoubleMap<_, Twox64Concat, SessionIndex, Twox64Concat, AuthIndex, bool>;
360
361 #[pallet::storage]
364 pub type AuthoredBlocks<T: Config> = StorageDoubleMap<
365 _,
366 Twox64Concat,
367 SessionIndex,
368 Twox64Concat,
369 ValidatorId<T>,
370 u32,
371 ValueQuery,
372 >;
373
374 #[pallet::genesis_config]
375 #[derive(frame_support::DefaultNoBound)]
376 pub struct GenesisConfig<T: Config> {
377 pub keys: Vec<T::AuthorityId>,
378 }
379
380 #[pallet::genesis_build]
381 impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
382 fn build(&self) {
383 Pallet::<T>::initialize_keys(&self.keys);
384 }
385 }
386
387 #[pallet::call]
388 impl<T: Config> Pallet<T> {
389 #[pallet::call_index(0)]
393 #[pallet::weight(<T as Config>::WeightInfo::heartbeat(
394 heartbeat.validators_len,
395 ))]
396 #[pallet::weight_of_authorize(<T as Config>::WeightInfo::authorize_heartbeat(
397 heartbeat.validators_len,
398 ))]
399 #[pallet::authorize(Self::authorize_heartbeat_call)]
400 pub fn heartbeat(
401 origin: OriginFor<T>,
402 heartbeat: Heartbeat<BlockNumberFor<T>>,
403 _signature: <T::AuthorityId as RuntimeAppPublic>::Signature,
406 ) -> DispatchResult {
407 ensure_authorized(origin)?;
408
409 let current_session = T::ValidatorSet::session_index();
410 let exists =
411 ReceivedHeartbeats::<T>::contains_key(current_session, heartbeat.authority_index);
412 let keys = Keys::<T>::get();
413 let public = keys.get(heartbeat.authority_index as usize);
414 if let (false, Some(public)) = (exists, public) {
415 Self::deposit_event(Event::<T>::HeartbeatReceived { authority_id: public.clone() });
416
417 ReceivedHeartbeats::<T>::insert(current_session, heartbeat.authority_index, true);
418
419 Ok(())
420 } else if exists {
421 Err(Error::<T>::DuplicatedHeartbeat.into())
422 } else {
423 Err(Error::<T>::InvalidKey.into())
424 }
425 }
426 }
427
428 #[pallet::hooks]
429 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
430 fn offchain_worker(now: BlockNumberFor<T>) {
431 if sp_io::offchain::is_validator() {
433 for res in Self::send_heartbeats(now).into_iter().flatten() {
434 if let Err(e) = res {
435 log::debug!(
436 target: "runtime::im-online",
437 "Skipping heartbeat at {:?}: {:?}",
438 now,
439 e,
440 )
441 }
442 }
443 } else {
444 log::trace!(
445 target: "runtime::im-online",
446 "Skipping heartbeat at {:?}. Not a validator.",
447 now,
448 )
449 }
450 }
451 }
452
453 pub(crate) const INVALID_VALIDATORS_LEN: u8 = 10;
456}
457
458impl<T: Config + pallet_authorship::Config>
461 pallet_authorship::EventHandler<ValidatorId<T>, BlockNumberFor<T>> for Pallet<T>
462{
463 fn note_author(author: ValidatorId<T>) {
464 Self::note_authorship(author);
465 }
466}
467
468impl<T: Config> Pallet<T> {
469 pub fn is_online(authority_index: AuthIndex) -> bool {
474 let current_validators = T::ValidatorSet::validators();
475
476 if authority_index >= current_validators.len() as u32 {
477 return false;
478 }
479
480 let authority = ¤t_validators[authority_index as usize];
481
482 Self::is_online_aux(authority_index, authority)
483 }
484
485 fn is_online_aux(authority_index: AuthIndex, authority: &ValidatorId<T>) -> bool {
486 let current_session = T::ValidatorSet::session_index();
487
488 ReceivedHeartbeats::<T>::contains_key(current_session, authority_index) ||
489 AuthoredBlocks::<T>::get(current_session, authority) != 0
490 }
491
492 pub fn received_heartbeat_in_current_session(authority_index: AuthIndex) -> bool {
495 let current_session = T::ValidatorSet::session_index();
496 ReceivedHeartbeats::<T>::contains_key(current_session, authority_index)
497 }
498
499 fn note_authorship(author: ValidatorId<T>) {
501 let current_session = T::ValidatorSet::session_index();
502
503 AuthoredBlocks::<T>::mutate(current_session, author, |authored| *authored += 1);
504 }
505
506 pub(crate) fn send_heartbeats(
507 block_number: BlockNumberFor<T>,
508 ) -> OffchainResult<T, impl Iterator<Item = OffchainResult<T, ()>>> {
509 const START_HEARTBEAT_RANDOM_PERIOD: Permill = Permill::from_percent(10);
510 const START_HEARTBEAT_FINAL_PERIOD: Permill = Permill::from_percent(80);
511
512 let random_choice = |progress: Permill| {
516 let session_length = T::NextSessionRotation::average_session_length();
519 let residual = Permill::from_rational(1u32, session_length.saturated_into());
520 let threshold: Permill = progress.saturating_pow(6).saturating_add(residual);
521
522 let seed = sp_io::offchain::random_seed();
523 let random = <u32>::decode(&mut TrailingZeroInput::new(seed.as_ref()))
524 .expect("input is padded with zeroes; qed");
525 let random = Permill::from_parts(random % Permill::ACCURACY);
526
527 random <= threshold
528 };
529
530 let should_heartbeat = if let (Some(progress), _) =
531 T::NextSessionRotation::estimate_current_session_progress(block_number)
532 {
533 progress >= START_HEARTBEAT_FINAL_PERIOD ||
540 progress >= START_HEARTBEAT_RANDOM_PERIOD && random_choice(progress)
541 } else {
542 let heartbeat_after = <HeartbeatAfter<T>>::get();
545 block_number >= heartbeat_after
546 };
547
548 if !should_heartbeat {
549 return Err(OffchainErr::TooEarly);
550 }
551
552 let session_index = T::ValidatorSet::session_index();
553 let validators_len = Keys::<T>::decode_len().unwrap_or_default() as u32;
554
555 Ok(Self::local_authority_keys().map(move |(authority_index, key)| {
556 Self::send_single_heartbeat(
557 authority_index,
558 key,
559 session_index,
560 block_number,
561 validators_len,
562 )
563 }))
564 }
565
566 fn send_single_heartbeat(
567 authority_index: u32,
568 key: T::AuthorityId,
569 session_index: SessionIndex,
570 block_number: BlockNumberFor<T>,
571 validators_len: u32,
572 ) -> OffchainResult<T, ()> {
573 let prepare_heartbeat = || -> OffchainResult<T, Call<T>> {
575 let heartbeat =
576 Heartbeat { block_number, session_index, authority_index, validators_len };
577
578 let signature = key.sign(&heartbeat.encode()).ok_or(OffchainErr::FailedSigning)?;
579
580 Ok(Call::heartbeat { heartbeat, signature })
581 };
582
583 if Self::is_online(authority_index) {
584 return Err(OffchainErr::AlreadyOnline(authority_index));
585 }
586
587 Self::with_heartbeat_lock(authority_index, session_index, block_number, || {
590 let call = prepare_heartbeat()?;
591 log::info!(
592 target: "runtime::im-online",
593 "[index: {:?}] Reporting im-online at block: {:?} (session: {:?}): {:?}",
594 authority_index,
595 block_number,
596 session_index,
597 call,
598 );
599
600 let xt = T::create_authorized_transaction(call.into());
601 SubmitTransaction::<T, Call<T>>::submit_transaction(xt)
602 .map_err(|_| OffchainErr::SubmitTransaction)?;
603
604 Ok(())
605 })
606 }
607
608 fn local_authority_keys() -> impl Iterator<Item = (u32, T::AuthorityId)> {
609 let authorities = Keys::<T>::get();
615
616 let mut local_keys = T::AuthorityId::all();
620
621 local_keys.sort();
622
623 authorities.into_iter().enumerate().filter_map(move |(index, authority)| {
624 local_keys
625 .binary_search(&authority)
626 .ok()
627 .map(|location| (index as u32, local_keys[location].clone()))
628 })
629 }
630
631 fn with_heartbeat_lock<R>(
632 authority_index: u32,
633 session_index: SessionIndex,
634 now: BlockNumberFor<T>,
635 f: impl FnOnce() -> OffchainResult<T, R>,
636 ) -> OffchainResult<T, R> {
637 let key = {
638 let mut key = DB_PREFIX.to_vec();
639 key.extend(authority_index.encode());
640 key
641 };
642 let storage = StorageValueRef::persistent(&key);
643 let res = storage.mutate(
644 |status: Result<Option<HeartbeatStatus<BlockNumberFor<T>>>, StorageRetrievalError>| {
645 match status {
650 Ok(Some(status)) if status.is_recent(session_index, now) => {
652 Err(OffchainErr::WaitingForInclusion(status.sent_at))
653 },
654 _ => Ok(HeartbeatStatus { session_index, sent_at: now }),
656 }
657 },
658 );
659 if let Err(MutateStorageError::ValueFunctionFailed(err)) = res {
660 return Err(err);
661 }
662
663 let mut new_status = res.map_err(|_| OffchainErr::FailedToAcquireLock)?;
664
665 let res = f();
667
668 if res.is_err() {
670 new_status.sent_at = 0u32.into();
671 storage.set(&new_status);
672 }
673
674 res
675 }
676
677 fn initialize_keys(keys: &[T::AuthorityId]) {
678 if !keys.is_empty() {
679 assert!(Keys::<T>::get().is_empty(), "Keys are already initialized!");
680 let bounded_keys = <BoundedSlice<'_, _, T::MaxKeys>>::try_from(keys)
681 .expect("More than the maximum number of keys provided");
682 Keys::<T>::put(bounded_keys);
683 }
684 }
685
686 fn authorize_heartbeat_call(
692 _source: TransactionSource,
693 heartbeat: &Heartbeat<BlockNumberFor<T>>,
694 signature: &<T::AuthorityId as RuntimeAppPublic>::Signature,
695 ) -> TransactionValidityWithRefund {
696 if Pallet::<T>::is_online(heartbeat.authority_index) {
697 return Err(InvalidTransaction::Stale.into());
699 }
700
701 let current_session = T::ValidatorSet::session_index();
703 if heartbeat.session_index != current_session {
704 return Err(InvalidTransaction::Stale.into());
705 }
706
707 let keys = Keys::<T>::get();
709 if keys.len() as u32 != heartbeat.validators_len {
710 return Err(InvalidTransaction::Custom(INVALID_VALIDATORS_LEN).into());
711 }
712 let authority_id = match keys.get(heartbeat.authority_index as usize) {
713 Some(id) => id,
714 None => return Err(InvalidTransaction::BadProof.into()),
715 };
716
717 let signature_valid = heartbeat
719 .using_encoded(|encoded_heartbeat| authority_id.verify(&encoded_heartbeat, signature));
720
721 if !signature_valid {
722 return Err(InvalidTransaction::BadProof.into());
723 }
724
725 ValidTransaction::with_tag_prefix("ImOnline")
726 .priority(T::UnsignedPriority::get())
727 .and_provides((current_session, authority_id))
728 .longevity(
729 TryInto::<u64>::try_into(
730 T::NextSessionRotation::average_session_length() / 2u32.into(),
731 )
732 .unwrap_or(64_u64),
733 )
734 .propagate(true)
735 .build()
736 .map(|v| (v, Weight::zero()))
737 }
738
739 #[cfg(test)]
740 fn set_keys(keys: Vec<T::AuthorityId>) {
741 let bounded_keys = WeakBoundedVec::<_, T::MaxKeys>::try_from(keys)
742 .expect("More than the maximum number of keys provided");
743 Keys::<T>::put(bounded_keys);
744 }
745}
746
747impl<T: Config> sp_runtime::BoundToRuntimeAppPublic for Pallet<T> {
748 type Public = T::AuthorityId;
749}
750
751impl<T: Config> OneSessionHandler<T::AccountId> for Pallet<T> {
752 type Key = T::AuthorityId;
753
754 fn on_genesis_session<'a, I: 'a>(validators: I)
755 where
756 I: Iterator<Item = (&'a T::AccountId, T::AuthorityId)>,
757 {
758 let keys = validators.map(|x| x.1).collect::<Vec<_>>();
759 Self::initialize_keys(&keys);
760 }
761
762 fn on_new_session<'a, I: 'a>(_changed: bool, validators: I, _queued_validators: I)
763 where
764 I: Iterator<Item = (&'a T::AccountId, T::AuthorityId)>,
765 {
766 let block_number = <frame_system::Pallet<T>>::block_number();
770 let half_session = T::NextSessionRotation::average_session_length() / 2u32.into();
771 <HeartbeatAfter<T>>::put(block_number + half_session);
772
773 let keys = validators.map(|x| x.1).collect::<Vec<_>>();
775 let bounded_keys = WeakBoundedVec::<_, T::MaxKeys>::force_from(
776 keys,
777 Some(
778 "Warning: The session has more keys than expected. \
779 A runtime configuration adjustment may be needed.",
780 ),
781 );
782 Keys::<T>::put(bounded_keys);
783 }
784
785 fn on_before_session_ending() {
786 let session_index = T::ValidatorSet::session_index();
787 let keys = Keys::<T>::get();
788 let current_validators = T::ValidatorSet::validators();
789
790 let offenders = current_validators
791 .into_iter()
792 .enumerate()
793 .filter(|(index, id)| !Self::is_online_aux(*index as u32, id))
794 .filter_map(|(_, id)| {
795 <T::ValidatorSet as ValidatorSetWithIdentification<T::AccountId>>::IdentificationOf::convert(
796 id.clone()
797 ).map(|full_id| (id, full_id))
798 })
799 .collect::<Vec<IdentificationTuple<T>>>();
800
801 #[allow(deprecated)]
805 ReceivedHeartbeats::<T>::remove_prefix(T::ValidatorSet::session_index(), None);
806 #[allow(deprecated)]
807 AuthoredBlocks::<T>::remove_prefix(T::ValidatorSet::session_index(), None);
808
809 if offenders.is_empty() {
810 Self::deposit_event(Event::<T>::AllGood);
811 } else {
812 Self::deposit_event(Event::<T>::SomeOffline { offline: offenders.clone() });
813
814 let validator_set_count = keys.len() as u32;
815 let offence = UnresponsivenessOffence { session_index, validator_set_count, offenders };
816 if let Err(e) = T::ReportUnresponsiveness::report_offence(vec![], offence) {
817 sp_runtime::print(e);
818 }
819 }
820 }
821
822 fn on_disabled(_i: u32) {
823 }
825}
826
827#[derive(Debug, TypeInfo)]
829#[cfg_attr(feature = "std", derive(Clone, PartialEq, Eq))]
830pub struct UnresponsivenessOffence<Offender> {
831 pub session_index: SessionIndex,
836 pub validator_set_count: u32,
838 pub offenders: Vec<Offender>,
840}
841
842impl<Offender: Clone> Offence<Offender> for UnresponsivenessOffence<Offender> {
843 const ID: Kind = *b"im-online:offlin";
844 type TimeSlot = SessionIndex;
845
846 fn offenders(&self) -> Vec<Offender> {
847 self.offenders.clone()
848 }
849
850 fn session_index(&self) -> SessionIndex {
851 self.session_index
852 }
853
854 fn validator_set_count(&self) -> u32 {
855 self.validator_set_count
856 }
857
858 fn time_slot(&self) -> Self::TimeSlot {
859 self.session_index
860 }
861
862 fn slash_fraction(&self, offenders: u32) -> Perbill {
863 if let Some(threshold) = offenders.checked_sub(self.validator_set_count / 10 + 1) {
867 let x = Perbill::from_rational(3 * threshold, self.validator_set_count);
868 x.saturating_mul(Perbill::from_percent(7))
869 } else {
870 Perbill::default()
871 }
872 }
873}