referrerpolicy=no-referrer-when-downgrade

pallet_im_online/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! # I'm online Pallet
19//!
20//! If the local node is a validator (i.e. contains an authority key), this pallet
21//! gossips a heartbeat transaction with each new session. The heartbeat functions
22//! as a simple mechanism to signal that the node is online in the current era.
23//!
24//! Received heartbeats are tracked for one era and reset with each new era. The
25//! pallet exposes two public functions to query if a heartbeat has been received
26//! in the current era or session.
27//!
28//! The heartbeat is a signed transaction, which was signed using the session key
29//! and includes the recent best block number of the local validators chain.
30//! It is submitted as an Unsigned Transaction via off-chain workers.
31//!
32//! - [`Config`]
33//! - [`Call`]
34//! - [`Pallet`]
35//!
36//! ## Interface
37//!
38//! ### Public Functions
39//!
40//! - `is_online` - True if the validator sent a heartbeat in the current session.
41//!
42//! ## Usage
43//!
44//! ```
45//! use pallet_im_online::{self as im_online};
46//!
47//! #[frame_support::pallet]
48//! pub mod pallet {
49//! 	use super::*;
50//! 	use frame_support::pallet_prelude::*;
51//! 	use frame_system::pallet_prelude::*;
52//!
53//! 	#[pallet::pallet]
54//! 	pub struct Pallet<T>(_);
55//!
56//! 	#[pallet::config]
57//! 	pub trait Config: frame_system::Config + im_online::Config {}
58//!
59//! 	#[pallet::call]
60//! 	impl<T: Config> Pallet<T> {
61//! 		#[pallet::weight(0)]
62//! 		pub fn is_online(origin: OriginFor<T>, authority_index: u32) -> DispatchResult {
63//! 			let _sender = ensure_signed(origin)?;
64//! 			let _is_online = <im_online::Pallet<T>>::is_online(authority_index);
65//! 			Ok(())
66//! 		}
67//! 	}
68//! }
69//! # fn main() { }
70//! ```
71//!
72//! ## Dependencies
73//!
74//! This pallet depends on the [Session pallet](../pallet_session/index.html).
75
76// Ensure we're `no_std` when compiling for Wasm.
77#![cfg_attr(not(feature = "std"), no_std)]
78
79mod benchmarking;
80pub mod migration;
81mod mock;
82mod tests;
83pub mod weights;
84
85extern crate alloc;
86
87use alloc::{vec, vec::Vec};
88use codec::{Decode, DecodeWithMemTracking, Encode, MaxEncodedLen};
89use frame_support::{
90	pallet_prelude::*,
91	traits::{
92		EstimateNextSessionRotation, Get, OneSessionHandler, ValidatorSet,
93		ValidatorSetWithIdentification,
94	},
95	BoundedSlice, WeakBoundedVec,
96};
97use frame_system::{
98	offchain::{CreateBare, SubmitTransaction},
99	pallet_prelude::*,
100};
101pub use pallet::*;
102use scale_info::TypeInfo;
103use sp_application_crypto::RuntimeAppPublic;
104use sp_runtime::{
105	offchain::storage::{MutateStorageError, StorageRetrievalError, StorageValueRef},
106	traits::{AtLeast32BitUnsigned, Convert, Saturating, TrailingZeroInput},
107	Debug, PerThing, Perbill, Permill, SaturatedConversion,
108};
109use sp_staking::{
110	offence::{Kind, Offence, ReportOffence},
111	SessionIndex,
112};
113pub use weights::WeightInfo;
114
115pub mod sr25519 {
116	mod app_sr25519 {
117		use sp_application_crypto::{app_crypto, key_types::IM_ONLINE, sr25519};
118		app_crypto!(sr25519, IM_ONLINE);
119	}
120
121	sp_application_crypto::with_pair! {
122		/// An i'm online keypair using sr25519 as its crypto.
123		pub type AuthorityPair = app_sr25519::Pair;
124	}
125
126	/// An i'm online signature using sr25519 as its crypto.
127	pub type AuthoritySignature = app_sr25519::Signature;
128
129	/// An i'm online identifier using sr25519 as its crypto.
130	pub type AuthorityId = app_sr25519::Public;
131}
132
133pub mod ed25519 {
134	mod app_ed25519 {
135		use sp_application_crypto::{app_crypto, ed25519, key_types::IM_ONLINE};
136		app_crypto!(ed25519, IM_ONLINE);
137	}
138
139	sp_application_crypto::with_pair! {
140		/// An i'm online keypair using ed25519 as its crypto.
141		pub type AuthorityPair = app_ed25519::Pair;
142	}
143
144	/// An i'm online signature using ed25519 as its crypto.
145	pub type AuthoritySignature = app_ed25519::Signature;
146
147	/// An i'm online identifier using ed25519 as its crypto.
148	pub type AuthorityId = app_ed25519::Public;
149}
150
151const DB_PREFIX: &[u8] = b"parity/im-online-heartbeat/";
152/// How many blocks do we wait for heartbeat transaction to be included
153/// before sending another one.
154const INCLUDE_THRESHOLD: u32 = 3;
155
156/// Status of the offchain worker code.
157///
158/// This stores the block number at which heartbeat was requested and when the worker
159/// has actually managed to produce it.
160/// Note we store such status for every `authority_index` separately.
161#[derive(Encode, Decode, Clone, PartialEq, Eq, Debug, TypeInfo)]
162struct HeartbeatStatus<BlockNumber> {
163	/// An index of the session that we are supposed to send heartbeat for.
164	pub session_index: SessionIndex,
165	/// A block number at which the heartbeat for that session has been actually sent.
166	///
167	/// It may be 0 in case the sending failed. In such case we should just retry
168	/// as soon as possible (i.e. in a worker running for the next block).
169	pub sent_at: BlockNumber,
170}
171
172impl<BlockNumber: PartialEq + AtLeast32BitUnsigned + Copy> HeartbeatStatus<BlockNumber> {
173	/// Returns true if heartbeat has been recently sent.
174	///
175	/// Parameters:
176	/// `session_index` - index of current session.
177	/// `now` - block at which the offchain worker is running.
178	///
179	/// This function will return `true` iff:
180	/// 1. the session index is the same (we don't care if it went up or down)
181	/// 2. the heartbeat has been sent recently (within the threshold)
182	///
183	/// The reasoning for 1. is that it's better to send an extra heartbeat than
184	/// to stall or not send one in case of a bug.
185	fn is_recent(&self, session_index: SessionIndex, now: BlockNumber) -> bool {
186		self.session_index == session_index && self.sent_at + INCLUDE_THRESHOLD.into() > now
187	}
188}
189
190/// Error which may occur while executing the off-chain code.
191#[cfg_attr(test, derive(PartialEq))]
192enum OffchainErr<BlockNumber> {
193	TooEarly,
194	WaitingForInclusion(BlockNumber),
195	AlreadyOnline(u32),
196	FailedSigning,
197	FailedToAcquireLock,
198	SubmitTransaction,
199}
200
201impl<BlockNumber: core::fmt::Debug> core::fmt::Debug for OffchainErr<BlockNumber> {
202	fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
203		match *self {
204			OffchainErr::TooEarly => write!(fmt, "Too early to send heartbeat."),
205			OffchainErr::WaitingForInclusion(ref block) => {
206				write!(fmt, "Heartbeat already sent at {:?}. Waiting for inclusion.", block)
207			},
208			OffchainErr::AlreadyOnline(auth_idx) => {
209				write!(fmt, "Authority {} is already online", auth_idx)
210			},
211			OffchainErr::FailedSigning => write!(fmt, "Failed to sign heartbeat"),
212			OffchainErr::FailedToAcquireLock => write!(fmt, "Failed to acquire lock"),
213			OffchainErr::SubmitTransaction => write!(fmt, "Failed to submit transaction"),
214		}
215	}
216}
217
218pub type AuthIndex = u32;
219
220/// Heartbeat which is sent/received.
221#[derive(Encode, Decode, DecodeWithMemTracking, Clone, PartialEq, Eq, Debug, TypeInfo)]
222pub struct Heartbeat<BlockNumber>
223where
224	BlockNumber: PartialEq + Eq + Decode + Encode,
225{
226	/// Block number at the time heartbeat is created..
227	pub block_number: BlockNumber,
228	/// Index of the current session.
229	pub session_index: SessionIndex,
230	/// An index of the authority on the list of validators.
231	pub authority_index: AuthIndex,
232	/// The length of session validator set
233	pub validators_len: u32,
234}
235
236/// A type for representing the validator id in a session.
237pub type ValidatorId<T> = <<T as Config>::ValidatorSet as ValidatorSet<
238	<T as frame_system::Config>::AccountId,
239>>::ValidatorId;
240
241/// A tuple of (ValidatorId, Identification) where `Identification` is the full identification of
242/// `ValidatorId`.
243pub type IdentificationTuple<T> = (
244	ValidatorId<T>,
245	<<T as Config>::ValidatorSet as ValidatorSetWithIdentification<
246		<T as frame_system::Config>::AccountId,
247	>>::Identification,
248);
249
250type OffchainResult<T, A> = Result<A, OffchainErr<BlockNumberFor<T>>>;
251
252#[frame_support::pallet]
253pub mod pallet {
254	use super::*;
255
256	/// The in-code storage version.
257	const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
258
259	#[pallet::pallet]
260	#[pallet::storage_version(STORAGE_VERSION)]
261	pub struct Pallet<T>(_);
262
263	#[pallet::config]
264	pub trait Config: CreateBare<Call<Self>> + frame_system::Config {
265		/// The identifier type for an authority.
266		type AuthorityId: Member
267			+ Parameter
268			+ RuntimeAppPublic
269			+ Ord
270			+ MaybeSerializeDeserialize
271			+ MaxEncodedLen;
272
273		/// The maximum number of keys that can be added.
274		type MaxKeys: Get<u32>;
275
276		/// The maximum number of peers to be stored in `ReceivedHeartbeats`
277		type MaxPeerInHeartbeats: Get<u32>;
278
279		/// The overarching event type.
280		#[allow(deprecated)]
281		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
282
283		/// A type for retrieving the validators supposed to be online in a session.
284		type ValidatorSet: ValidatorSetWithIdentification<Self::AccountId>;
285
286		/// A trait that allows us to estimate the current session progress and also the
287		/// average session length.
288		///
289		/// This parameter is used to determine the longevity of `heartbeat` transaction and a
290		/// rough time when we should start considering sending heartbeats, since the workers
291		/// avoids sending them at the very beginning of the session, assuming there is a
292		/// chance the authority will produce a block and they won't be necessary.
293		type NextSessionRotation: EstimateNextSessionRotation<BlockNumberFor<Self>>;
294
295		/// A type that gives us the ability to submit unresponsiveness offence reports.
296		type ReportUnresponsiveness: ReportOffence<
297			Self::AccountId,
298			IdentificationTuple<Self>,
299			UnresponsivenessOffence<IdentificationTuple<Self>>,
300		>;
301
302		/// A configuration for base priority of unsigned transactions.
303		///
304		/// This is exposed so that it can be tuned for particular runtime, when
305		/// multiple pallets send unsigned transactions.
306		#[pallet::constant]
307		type UnsignedPriority: Get<TransactionPriority>;
308
309		/// Weight information for extrinsics in this pallet.
310		type WeightInfo: WeightInfo;
311	}
312
313	#[pallet::event]
314	#[pallet::generate_deposit(pub(super) fn deposit_event)]
315	pub enum Event<T: Config> {
316		/// A new heartbeat was received from `AuthorityId`.
317		HeartbeatReceived { authority_id: T::AuthorityId },
318		/// At the end of the session, no offence was committed.
319		AllGood,
320		/// At the end of the session, at least one validator was found to be offline.
321		SomeOffline { offline: Vec<IdentificationTuple<T>> },
322	}
323
324	#[pallet::error]
325	pub enum Error<T> {
326		/// Non existent public key.
327		InvalidKey,
328		/// Duplicated heartbeat.
329		DuplicatedHeartbeat,
330	}
331
332	/// The block number after which it's ok to send heartbeats in the current
333	/// session.
334	///
335	/// At the beginning of each session we set this to a value that should fall
336	/// roughly in the middle of the session duration. The idea is to first wait for
337	/// the validators to produce a block in the current session, so that the
338	/// heartbeat later on will not be necessary.
339	///
340	/// This value will only be used as a fallback if we fail to get a proper session
341	/// progress estimate from `NextSessionRotation`, as those estimates should be
342	/// more accurate then the value we calculate for `HeartbeatAfter`.
343	#[pallet::storage]
344	pub type HeartbeatAfter<T: Config> = StorageValue<_, BlockNumberFor<T>, ValueQuery>;
345
346	/// The current set of keys that may issue a heartbeat.
347	#[pallet::storage]
348	pub type Keys<T: Config> =
349		StorageValue<_, WeakBoundedVec<T::AuthorityId, T::MaxKeys>, ValueQuery>;
350
351	/// For each session index, we keep a mapping of `SessionIndex` and `AuthIndex`.
352	#[pallet::storage]
353	pub type ReceivedHeartbeats<T: Config> =
354		StorageDoubleMap<_, Twox64Concat, SessionIndex, Twox64Concat, AuthIndex, bool>;
355
356	/// For each session index, we keep a mapping of `ValidatorId<T>` to the
357	/// number of blocks authored by the given authority.
358	#[pallet::storage]
359	pub type AuthoredBlocks<T: Config> = StorageDoubleMap<
360		_,
361		Twox64Concat,
362		SessionIndex,
363		Twox64Concat,
364		ValidatorId<T>,
365		u32,
366		ValueQuery,
367	>;
368
369	#[pallet::genesis_config]
370	#[derive(frame_support::DefaultNoBound)]
371	pub struct GenesisConfig<T: Config> {
372		pub keys: Vec<T::AuthorityId>,
373	}
374
375	#[pallet::genesis_build]
376	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
377		fn build(&self) {
378			Pallet::<T>::initialize_keys(&self.keys);
379		}
380	}
381
382	#[pallet::call]
383	impl<T: Config> Pallet<T> {
384		/// ## Complexity:
385		/// - `O(K)` where K is length of `Keys` (heartbeat.validators_len)
386		///   - `O(K)`: decoding of length `K`
387		// NOTE: the weight includes the cost of validate_unsigned as it is part of the cost to
388		// import block with such an extrinsic.
389		#[pallet::call_index(0)]
390		#[pallet::weight(<T as Config>::WeightInfo::validate_unsigned_and_then_heartbeat(
391			heartbeat.validators_len,
392		))]
393		pub fn heartbeat(
394			origin: OriginFor<T>,
395			heartbeat: Heartbeat<BlockNumberFor<T>>,
396			// since signature verification is done in `validate_unsigned`
397			// we can skip doing it here again.
398			_signature: <T::AuthorityId as RuntimeAppPublic>::Signature,
399		) -> DispatchResult {
400			ensure_none(origin)?;
401
402			let current_session = T::ValidatorSet::session_index();
403			let exists =
404				ReceivedHeartbeats::<T>::contains_key(current_session, heartbeat.authority_index);
405			let keys = Keys::<T>::get();
406			let public = keys.get(heartbeat.authority_index as usize);
407			if let (false, Some(public)) = (exists, public) {
408				Self::deposit_event(Event::<T>::HeartbeatReceived { authority_id: public.clone() });
409
410				ReceivedHeartbeats::<T>::insert(current_session, heartbeat.authority_index, true);
411
412				Ok(())
413			} else if exists {
414				Err(Error::<T>::DuplicatedHeartbeat.into())
415			} else {
416				Err(Error::<T>::InvalidKey.into())
417			}
418		}
419	}
420
421	#[pallet::hooks]
422	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
423		fn offchain_worker(now: BlockNumberFor<T>) {
424			// Only send messages if we are a potential validator.
425			if sp_io::offchain::is_validator() {
426				for res in Self::send_heartbeats(now).into_iter().flatten() {
427					if let Err(e) = res {
428						log::debug!(
429							target: "runtime::im-online",
430							"Skipping heartbeat at {:?}: {:?}",
431							now,
432							e,
433						)
434					}
435				}
436			} else {
437				log::trace!(
438					target: "runtime::im-online",
439					"Skipping heartbeat at {:?}. Not a validator.",
440					now,
441				)
442			}
443		}
444	}
445
446	/// Invalid transaction custom error. Returned when validators_len field in heartbeat is
447	/// incorrect.
448	pub(crate) const INVALID_VALIDATORS_LEN: u8 = 10;
449
450	#[allow(deprecated)]
451	#[pallet::validate_unsigned]
452	impl<T: Config> ValidateUnsigned for Pallet<T> {
453		type Call = Call<T>;
454
455		fn validate_unsigned(_source: TransactionSource, call: &Self::Call) -> TransactionValidity {
456			if let Call::heartbeat { heartbeat, signature } = call {
457				if <Pallet<T>>::is_online(heartbeat.authority_index) {
458					// we already received a heartbeat for this authority
459					return InvalidTransaction::Stale.into();
460				}
461
462				// check if session index from heartbeat is recent
463				let current_session = T::ValidatorSet::session_index();
464				if heartbeat.session_index != current_session {
465					return InvalidTransaction::Stale.into();
466				}
467
468				// verify that the incoming (unverified) pubkey is actually an authority id
469				let keys = Keys::<T>::get();
470				if keys.len() as u32 != heartbeat.validators_len {
471					return InvalidTransaction::Custom(INVALID_VALIDATORS_LEN).into();
472				}
473				let authority_id = match keys.get(heartbeat.authority_index as usize) {
474					Some(id) => id,
475					None => return InvalidTransaction::BadProof.into(),
476				};
477
478				// check signature (this is expensive so we do it last).
479				let signature_valid = heartbeat.using_encoded(|encoded_heartbeat| {
480					authority_id.verify(&encoded_heartbeat, signature)
481				});
482
483				if !signature_valid {
484					return InvalidTransaction::BadProof.into();
485				}
486
487				ValidTransaction::with_tag_prefix("ImOnline")
488					.priority(T::UnsignedPriority::get())
489					.and_provides((current_session, authority_id))
490					.longevity(
491						TryInto::<u64>::try_into(
492							T::NextSessionRotation::average_session_length() / 2u32.into(),
493						)
494						.unwrap_or(64_u64),
495					)
496					.propagate(true)
497					.build()
498			} else {
499				InvalidTransaction::Call.into()
500			}
501		}
502	}
503}
504
505/// Keep track of number of authored blocks per authority, uncles are counted as
506/// well since they're a valid proof of being online.
507impl<T: Config + pallet_authorship::Config>
508	pallet_authorship::EventHandler<ValidatorId<T>, BlockNumberFor<T>> for Pallet<T>
509{
510	fn note_author(author: ValidatorId<T>) {
511		Self::note_authorship(author);
512	}
513}
514
515impl<T: Config> Pallet<T> {
516	/// Returns `true` if a heartbeat has been received for the authority at
517	/// `authority_index` in the authorities series or if the authority has
518	/// authored at least one block, during the current session. Otherwise
519	/// `false`.
520	pub fn is_online(authority_index: AuthIndex) -> bool {
521		let current_validators = T::ValidatorSet::validators();
522
523		if authority_index >= current_validators.len() as u32 {
524			return false;
525		}
526
527		let authority = &current_validators[authority_index as usize];
528
529		Self::is_online_aux(authority_index, authority)
530	}
531
532	fn is_online_aux(authority_index: AuthIndex, authority: &ValidatorId<T>) -> bool {
533		let current_session = T::ValidatorSet::session_index();
534
535		ReceivedHeartbeats::<T>::contains_key(current_session, authority_index) ||
536			AuthoredBlocks::<T>::get(current_session, authority) != 0
537	}
538
539	/// Returns `true` if a heartbeat has been received for the authority at `authority_index` in
540	/// the authorities series, during the current session. Otherwise `false`.
541	pub fn received_heartbeat_in_current_session(authority_index: AuthIndex) -> bool {
542		let current_session = T::ValidatorSet::session_index();
543		ReceivedHeartbeats::<T>::contains_key(current_session, authority_index)
544	}
545
546	/// Note that the given authority has authored a block in the current session.
547	fn note_authorship(author: ValidatorId<T>) {
548		let current_session = T::ValidatorSet::session_index();
549
550		AuthoredBlocks::<T>::mutate(current_session, author, |authored| *authored += 1);
551	}
552
553	pub(crate) fn send_heartbeats(
554		block_number: BlockNumberFor<T>,
555	) -> OffchainResult<T, impl Iterator<Item = OffchainResult<T, ()>>> {
556		const START_HEARTBEAT_RANDOM_PERIOD: Permill = Permill::from_percent(10);
557		const START_HEARTBEAT_FINAL_PERIOD: Permill = Permill::from_percent(80);
558
559		// this should give us a residual probability of 1/SESSION_LENGTH of sending an heartbeat,
560		// i.e. all heartbeats spread uniformly, over most of the session. as the session progresses
561		// the probability of sending an heartbeat starts to increase exponentially.
562		let random_choice = |progress: Permill| {
563			// given session progress `p` and session length `l`
564			// the threshold formula is: p^6 + 1/l
565			let session_length = T::NextSessionRotation::average_session_length();
566			let residual = Permill::from_rational(1u32, session_length.saturated_into());
567			let threshold: Permill = progress.saturating_pow(6).saturating_add(residual);
568
569			let seed = sp_io::offchain::random_seed();
570			let random = <u32>::decode(&mut TrailingZeroInput::new(seed.as_ref()))
571				.expect("input is padded with zeroes; qed");
572			let random = Permill::from_parts(random % Permill::ACCURACY);
573
574			random <= threshold
575		};
576
577		let should_heartbeat = if let (Some(progress), _) =
578			T::NextSessionRotation::estimate_current_session_progress(block_number)
579		{
580			// we try to get an estimate of the current session progress first since it should
581			// provide more accurate results. we will start an early heartbeat period where we'll
582			// randomly pick whether to heartbeat. after 80% of the session has elapsed, if we
583			// haven't sent an heartbeat yet we'll send one unconditionally. the idea is to prevent
584			// all nodes from sending the heartbeats at the same block and causing a temporary (but
585			// deterministic) spike in transactions.
586			progress >= START_HEARTBEAT_FINAL_PERIOD ||
587				progress >= START_HEARTBEAT_RANDOM_PERIOD && random_choice(progress)
588		} else {
589			// otherwise we fallback to using the block number calculated at the beginning
590			// of the session that should roughly correspond to the middle of the session
591			let heartbeat_after = <HeartbeatAfter<T>>::get();
592			block_number >= heartbeat_after
593		};
594
595		if !should_heartbeat {
596			return Err(OffchainErr::TooEarly);
597		}
598
599		let session_index = T::ValidatorSet::session_index();
600		let validators_len = Keys::<T>::decode_len().unwrap_or_default() as u32;
601
602		Ok(Self::local_authority_keys().map(move |(authority_index, key)| {
603			Self::send_single_heartbeat(
604				authority_index,
605				key,
606				session_index,
607				block_number,
608				validators_len,
609			)
610		}))
611	}
612
613	fn send_single_heartbeat(
614		authority_index: u32,
615		key: T::AuthorityId,
616		session_index: SessionIndex,
617		block_number: BlockNumberFor<T>,
618		validators_len: u32,
619	) -> OffchainResult<T, ()> {
620		// A helper function to prepare heartbeat call.
621		let prepare_heartbeat = || -> OffchainResult<T, Call<T>> {
622			let heartbeat =
623				Heartbeat { block_number, session_index, authority_index, validators_len };
624
625			let signature = key.sign(&heartbeat.encode()).ok_or(OffchainErr::FailedSigning)?;
626
627			Ok(Call::heartbeat { heartbeat, signature })
628		};
629
630		if Self::is_online(authority_index) {
631			return Err(OffchainErr::AlreadyOnline(authority_index));
632		}
633
634		// acquire lock for that authority at current heartbeat to make sure we don't
635		// send concurrent heartbeats.
636		Self::with_heartbeat_lock(authority_index, session_index, block_number, || {
637			let call = prepare_heartbeat()?;
638			log::info!(
639				target: "runtime::im-online",
640				"[index: {:?}] Reporting im-online at block: {:?} (session: {:?}): {:?}",
641				authority_index,
642				block_number,
643				session_index,
644				call,
645			);
646
647			let xt = T::create_bare(call.into());
648			SubmitTransaction::<T, Call<T>>::submit_transaction(xt)
649				.map_err(|_| OffchainErr::SubmitTransaction)?;
650
651			Ok(())
652		})
653	}
654
655	fn local_authority_keys() -> impl Iterator<Item = (u32, T::AuthorityId)> {
656		// on-chain storage
657		//
658		// At index `idx`:
659		// 1. A (ImOnline) public key to be used by a validator at index `idx` to send im-online
660		//    heartbeats.
661		let authorities = Keys::<T>::get();
662
663		// local keystore
664		//
665		// All `ImOnline` public (+private) keys currently in the local keystore.
666		let mut local_keys = T::AuthorityId::all();
667
668		local_keys.sort();
669
670		authorities.into_iter().enumerate().filter_map(move |(index, authority)| {
671			local_keys
672				.binary_search(&authority)
673				.ok()
674				.map(|location| (index as u32, local_keys[location].clone()))
675		})
676	}
677
678	fn with_heartbeat_lock<R>(
679		authority_index: u32,
680		session_index: SessionIndex,
681		now: BlockNumberFor<T>,
682		f: impl FnOnce() -> OffchainResult<T, R>,
683	) -> OffchainResult<T, R> {
684		let key = {
685			let mut key = DB_PREFIX.to_vec();
686			key.extend(authority_index.encode());
687			key
688		};
689		let storage = StorageValueRef::persistent(&key);
690		let res = storage.mutate(
691			|status: Result<Option<HeartbeatStatus<BlockNumberFor<T>>>, StorageRetrievalError>| {
692				// Check if there is already a lock for that particular block.
693				// This means that the heartbeat has already been sent, and we are just waiting
694				// for it to be included. However if it doesn't get included for INCLUDE_THRESHOLD
695				// we will re-send it.
696				match status {
697					// we are still waiting for inclusion.
698					Ok(Some(status)) if status.is_recent(session_index, now) => {
699						Err(OffchainErr::WaitingForInclusion(status.sent_at))
700					},
701					// attempt to set new status
702					_ => Ok(HeartbeatStatus { session_index, sent_at: now }),
703				}
704			},
705		);
706		if let Err(MutateStorageError::ValueFunctionFailed(err)) = res {
707			return Err(err);
708		}
709
710		let mut new_status = res.map_err(|_| OffchainErr::FailedToAcquireLock)?;
711
712		// we got the lock, let's try to send the heartbeat.
713		let res = f();
714
715		// clear the lock in case we have failed to send transaction.
716		if res.is_err() {
717			new_status.sent_at = 0u32.into();
718			storage.set(&new_status);
719		}
720
721		res
722	}
723
724	fn initialize_keys(keys: &[T::AuthorityId]) {
725		if !keys.is_empty() {
726			assert!(Keys::<T>::get().is_empty(), "Keys are already initialized!");
727			let bounded_keys = <BoundedSlice<'_, _, T::MaxKeys>>::try_from(keys)
728				.expect("More than the maximum number of keys provided");
729			Keys::<T>::put(bounded_keys);
730		}
731	}
732
733	#[cfg(test)]
734	fn set_keys(keys: Vec<T::AuthorityId>) {
735		let bounded_keys = WeakBoundedVec::<_, T::MaxKeys>::try_from(keys)
736			.expect("More than the maximum number of keys provided");
737		Keys::<T>::put(bounded_keys);
738	}
739}
740
741impl<T: Config> sp_runtime::BoundToRuntimeAppPublic for Pallet<T> {
742	type Public = T::AuthorityId;
743}
744
745impl<T: Config> OneSessionHandler<T::AccountId> for Pallet<T> {
746	type Key = T::AuthorityId;
747
748	fn on_genesis_session<'a, I: 'a>(validators: I)
749	where
750		I: Iterator<Item = (&'a T::AccountId, T::AuthorityId)>,
751	{
752		let keys = validators.map(|x| x.1).collect::<Vec<_>>();
753		Self::initialize_keys(&keys);
754	}
755
756	fn on_new_session<'a, I: 'a>(_changed: bool, validators: I, _queued_validators: I)
757	where
758		I: Iterator<Item = (&'a T::AccountId, T::AuthorityId)>,
759	{
760		// Tell the offchain worker to start making the next session's heartbeats.
761		// Since we consider producing blocks as being online,
762		// the heartbeat is deferred a bit to prevent spamming.
763		let block_number = <frame_system::Pallet<T>>::block_number();
764		let half_session = T::NextSessionRotation::average_session_length() / 2u32.into();
765		<HeartbeatAfter<T>>::put(block_number + half_session);
766
767		// Remember who the authorities are for the new session.
768		let keys = validators.map(|x| x.1).collect::<Vec<_>>();
769		let bounded_keys = WeakBoundedVec::<_, T::MaxKeys>::force_from(
770			keys,
771			Some(
772				"Warning: The session has more keys than expected. \
773  				A runtime configuration adjustment may be needed.",
774			),
775		);
776		Keys::<T>::put(bounded_keys);
777	}
778
779	fn on_before_session_ending() {
780		let session_index = T::ValidatorSet::session_index();
781		let keys = Keys::<T>::get();
782		let current_validators = T::ValidatorSet::validators();
783
784		let offenders = current_validators
785			.into_iter()
786			.enumerate()
787			.filter(|(index, id)| !Self::is_online_aux(*index as u32, id))
788			.filter_map(|(_, id)| {
789				<T::ValidatorSet as ValidatorSetWithIdentification<T::AccountId>>::IdentificationOf::convert(
790					id.clone()
791				).map(|full_id| (id, full_id))
792			})
793			.collect::<Vec<IdentificationTuple<T>>>();
794
795		// Remove all received heartbeats and number of authored blocks from the
796		// current session, they have already been processed and won't be needed
797		// anymore.
798		#[allow(deprecated)]
799		ReceivedHeartbeats::<T>::remove_prefix(T::ValidatorSet::session_index(), None);
800		#[allow(deprecated)]
801		AuthoredBlocks::<T>::remove_prefix(T::ValidatorSet::session_index(), None);
802
803		if offenders.is_empty() {
804			Self::deposit_event(Event::<T>::AllGood);
805		} else {
806			Self::deposit_event(Event::<T>::SomeOffline { offline: offenders.clone() });
807
808			let validator_set_count = keys.len() as u32;
809			let offence = UnresponsivenessOffence { session_index, validator_set_count, offenders };
810			if let Err(e) = T::ReportUnresponsiveness::report_offence(vec![], offence) {
811				sp_runtime::print(e);
812			}
813		}
814	}
815
816	fn on_disabled(_i: u32) {
817		// ignore
818	}
819}
820
821/// An offence that is filed if a validator didn't send a heartbeat message.
822#[derive(Debug, TypeInfo)]
823#[cfg_attr(feature = "std", derive(Clone, PartialEq, Eq))]
824pub struct UnresponsivenessOffence<Offender> {
825	/// The current session index in which we report the unresponsive validators.
826	///
827	/// It acts as a time measure for unresponsiveness reports and effectively will always point
828	/// at the end of the session.
829	pub session_index: SessionIndex,
830	/// The size of the validator set in current session/era.
831	pub validator_set_count: u32,
832	/// Authorities that were unresponsive during the current era.
833	pub offenders: Vec<Offender>,
834}
835
836impl<Offender: Clone> Offence<Offender> for UnresponsivenessOffence<Offender> {
837	const ID: Kind = *b"im-online:offlin";
838	type TimeSlot = SessionIndex;
839
840	fn offenders(&self) -> Vec<Offender> {
841		self.offenders.clone()
842	}
843
844	fn session_index(&self) -> SessionIndex {
845		self.session_index
846	}
847
848	fn validator_set_count(&self) -> u32 {
849		self.validator_set_count
850	}
851
852	fn time_slot(&self) -> Self::TimeSlot {
853		self.session_index
854	}
855
856	fn slash_fraction(&self, offenders: u32) -> Perbill {
857		// the formula is min((3 * (k - (n / 10 + 1))) / n, 1) * 0.07
858		// basically, 10% can be offline with no slash, but after that, it linearly climbs up to 7%
859		// when 13/30 are offline (around 5% when 1/3 are offline).
860		if let Some(threshold) = offenders.checked_sub(self.validator_set_count / 10 + 1) {
861			let x = Perbill::from_rational(3 * threshold, self.validator_set_count);
862			x.saturating_mul(Perbill::from_percent(7))
863		} else {
864			Perbill::default()
865		}
866	}
867}