referrerpolicy=no-referrer-when-downgrade

pallet_im_online/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! # I'm online Pallet
19//!
20//! If the local node is a validator (i.e. contains an authority key), this pallet
21//! gossips a heartbeat transaction with each new session. The heartbeat functions
22//! as a simple mechanism to signal that the node is online in the current era.
23//!
24//! Received heartbeats are tracked for one era and reset with each new era. The
25//! pallet exposes two public functions to query if a heartbeat has been received
26//! in the current era or session.
27//!
28//! The heartbeat is a signed transaction, which was signed using the session key
29//! and includes the recent best block number of the local validators chain.
30//! It is submitted as an Authorized Transaction via off-chain workers.
31//!
32//! - [`Config`]
33//! - [`Call`]
34//! - [`Pallet`]
35//!
36//! ## Interface
37//!
38//! ### Public Functions
39//!
40//! - `is_online` - True if the validator sent a heartbeat in the current session.
41//!
42//! ## Usage
43//!
44//! ```
45//! use pallet_im_online::{self as im_online};
46//!
47//! #[frame_support::pallet]
48//! pub mod pallet {
49//! 	use super::*;
50//! 	use frame_support::pallet_prelude::*;
51//! 	use frame_system::pallet_prelude::*;
52//!
53//! 	#[pallet::pallet]
54//! 	pub struct Pallet<T>(_);
55//!
56//! 	#[pallet::config]
57//! 	pub trait Config: frame_system::Config + im_online::Config {}
58//!
59//! 	#[pallet::call]
60//! 	impl<T: Config> Pallet<T> {
61//! 		#[pallet::weight(0)]
62//! 		pub fn is_online(origin: OriginFor<T>, authority_index: u32) -> DispatchResult {
63//! 			let _sender = ensure_signed(origin)?;
64//! 			let _is_online = <im_online::Pallet<T>>::is_online(authority_index);
65//! 			Ok(())
66//! 		}
67//! 	}
68//! }
69//! # fn main() { }
70//! ```
71//!
72//! ## Dependencies
73//!
74//! This pallet depends on the [Session pallet](../pallet_session/index.html).
75
76// Ensure we're `no_std` when compiling for Wasm.
77#![cfg_attr(not(feature = "std"), no_std)]
78
79mod benchmarking;
80pub mod migration;
81mod mock;
82mod tests;
83pub mod weights;
84
85extern crate alloc;
86
87use alloc::{vec, vec::Vec};
88use codec::{Decode, DecodeWithMemTracking, Encode, MaxEncodedLen};
89use frame_support::{
90	pallet_prelude::*,
91	traits::{
92		EstimateNextSessionRotation, Get, OneSessionHandler, ValidatorSet,
93		ValidatorSetWithIdentification,
94	},
95	BoundedSlice, WeakBoundedVec,
96};
97use frame_system::{
98	offchain::{CreateAuthorizedTransaction, SubmitTransaction},
99	pallet_prelude::*,
100};
101pub use pallet::*;
102use scale_info::TypeInfo;
103use sp_application_crypto::RuntimeAppPublic;
104use sp_runtime::{
105	offchain::storage::{MutateStorageError, StorageRetrievalError, StorageValueRef},
106	traits::{AtLeast32BitUnsigned, Convert, Saturating, TrailingZeroInput},
107	transaction_validity::TransactionValidityWithRefund,
108	Debug, PerThing, Perbill, Permill, SaturatedConversion,
109};
110use sp_staking::{
111	offence::{Kind, Offence, ReportOffence},
112	SessionIndex,
113};
114pub use weights::WeightInfo;
115
116pub mod sr25519 {
117	mod app_sr25519 {
118		use sp_application_crypto::{app_crypto, key_types::IM_ONLINE, sr25519};
119		app_crypto!(sr25519, IM_ONLINE);
120	}
121
122	sp_application_crypto::with_pair! {
123		/// An i'm online keypair using sr25519 as its crypto.
124		pub type AuthorityPair = app_sr25519::Pair;
125	}
126
127	/// An i'm online signature using sr25519 as its crypto.
128	pub type AuthoritySignature = app_sr25519::Signature;
129
130	/// An i'm online identifier using sr25519 as its crypto.
131	pub type AuthorityId = app_sr25519::Public;
132}
133
134pub mod ed25519 {
135	mod app_ed25519 {
136		use sp_application_crypto::{app_crypto, ed25519, key_types::IM_ONLINE};
137		app_crypto!(ed25519, IM_ONLINE);
138	}
139
140	sp_application_crypto::with_pair! {
141		/// An i'm online keypair using ed25519 as its crypto.
142		pub type AuthorityPair = app_ed25519::Pair;
143	}
144
145	/// An i'm online signature using ed25519 as its crypto.
146	pub type AuthoritySignature = app_ed25519::Signature;
147
148	/// An i'm online identifier using ed25519 as its crypto.
149	pub type AuthorityId = app_ed25519::Public;
150}
151
152const DB_PREFIX: &[u8] = b"parity/im-online-heartbeat/";
153/// How many blocks do we wait for heartbeat transaction to be included
154/// before sending another one.
155const INCLUDE_THRESHOLD: u32 = 3;
156
157/// Status of the offchain worker code.
158///
159/// This stores the block number at which heartbeat was requested and when the worker
160/// has actually managed to produce it.
161/// Note we store such status for every `authority_index` separately.
162#[derive(Encode, Decode, Clone, PartialEq, Eq, Debug, TypeInfo)]
163struct HeartbeatStatus<BlockNumber> {
164	/// An index of the session that we are supposed to send heartbeat for.
165	pub session_index: SessionIndex,
166	/// A block number at which the heartbeat for that session has been actually sent.
167	///
168	/// It may be 0 in case the sending failed. In such case we should just retry
169	/// as soon as possible (i.e. in a worker running for the next block).
170	pub sent_at: BlockNumber,
171}
172
173impl<BlockNumber: PartialEq + AtLeast32BitUnsigned + Copy> HeartbeatStatus<BlockNumber> {
174	/// Returns true if heartbeat has been recently sent.
175	///
176	/// Parameters:
177	/// `session_index` - index of current session.
178	/// `now` - block at which the offchain worker is running.
179	///
180	/// This function will return `true` iff:
181	/// 1. the session index is the same (we don't care if it went up or down)
182	/// 2. the heartbeat has been sent recently (within the threshold)
183	///
184	/// The reasoning for 1. is that it's better to send an extra heartbeat than
185	/// to stall or not send one in case of a bug.
186	fn is_recent(&self, session_index: SessionIndex, now: BlockNumber) -> bool {
187		self.session_index == session_index && self.sent_at + INCLUDE_THRESHOLD.into() > now
188	}
189}
190
191/// Error which may occur while executing the off-chain code.
192#[cfg_attr(test, derive(PartialEq))]
193enum OffchainErr<BlockNumber> {
194	TooEarly,
195	WaitingForInclusion(BlockNumber),
196	AlreadyOnline(u32),
197	FailedSigning,
198	FailedToAcquireLock,
199	SubmitTransaction,
200}
201
202impl<BlockNumber: core::fmt::Debug> core::fmt::Debug for OffchainErr<BlockNumber> {
203	fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
204		match *self {
205			OffchainErr::TooEarly => write!(fmt, "Too early to send heartbeat."),
206			OffchainErr::WaitingForInclusion(ref block) => {
207				write!(fmt, "Heartbeat already sent at {:?}. Waiting for inclusion.", block)
208			},
209			OffchainErr::AlreadyOnline(auth_idx) => {
210				write!(fmt, "Authority {} is already online", auth_idx)
211			},
212			OffchainErr::FailedSigning => write!(fmt, "Failed to sign heartbeat"),
213			OffchainErr::FailedToAcquireLock => write!(fmt, "Failed to acquire lock"),
214			OffchainErr::SubmitTransaction => write!(fmt, "Failed to submit transaction"),
215		}
216	}
217}
218
219pub type AuthIndex = u32;
220
221/// Heartbeat which is sent/received.
222#[derive(Encode, Decode, DecodeWithMemTracking, Clone, PartialEq, Eq, Debug, TypeInfo)]
223pub struct Heartbeat<BlockNumber>
224where
225	BlockNumber: PartialEq + Eq + Decode + Encode,
226{
227	/// Block number at the time heartbeat is created..
228	pub block_number: BlockNumber,
229	/// Index of the current session.
230	pub session_index: SessionIndex,
231	/// An index of the authority on the list of validators.
232	pub authority_index: AuthIndex,
233	/// The length of session validator set
234	pub validators_len: u32,
235}
236
237/// A type for representing the validator id in a session.
238pub type ValidatorId<T> = <<T as Config>::ValidatorSet as ValidatorSet<
239	<T as frame_system::Config>::AccountId,
240>>::ValidatorId;
241
242/// A tuple of (ValidatorId, Identification) where `Identification` is the full identification of
243/// `ValidatorId`.
244pub type IdentificationTuple<T> = (
245	ValidatorId<T>,
246	<<T as Config>::ValidatorSet as ValidatorSetWithIdentification<
247		<T as frame_system::Config>::AccountId,
248	>>::Identification,
249);
250
251type OffchainResult<T, A> = Result<A, OffchainErr<BlockNumberFor<T>>>;
252
253#[frame_support::pallet]
254pub mod pallet {
255	use super::*;
256
257	/// The in-code storage version.
258	const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
259
260	#[pallet::pallet]
261	#[pallet::storage_version(STORAGE_VERSION)]
262	pub struct Pallet<T>(_);
263
264	#[pallet::config]
265	/// # Requirements
266	///
267	/// This pallet requires `frame_system::AuthorizeCall` to be included in the runtime's
268	/// transaction extension pipeline.
269	pub trait Config: CreateAuthorizedTransaction<Call<Self>> + frame_system::Config {
270		/// The identifier type for an authority.
271		type AuthorityId: Member
272			+ Parameter
273			+ RuntimeAppPublic
274			+ Ord
275			+ MaybeSerializeDeserialize
276			+ MaxEncodedLen;
277
278		/// The maximum number of keys that can be added.
279		type MaxKeys: Get<u32>;
280
281		/// The maximum number of peers to be stored in `ReceivedHeartbeats`
282		type MaxPeerInHeartbeats: Get<u32>;
283
284		/// The overarching event type.
285		#[allow(deprecated)]
286		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
287
288		/// A type for retrieving the validators supposed to be online in a session.
289		type ValidatorSet: ValidatorSetWithIdentification<Self::AccountId>;
290
291		/// A trait that allows us to estimate the current session progress and also the
292		/// average session length.
293		///
294		/// This parameter is used to determine the longevity of `heartbeat` transaction and a
295		/// rough time when we should start considering sending heartbeats, since the workers
296		/// avoids sending them at the very beginning of the session, assuming there is a
297		/// chance the authority will produce a block and they won't be necessary.
298		type NextSessionRotation: EstimateNextSessionRotation<BlockNumberFor<Self>>;
299
300		/// A type that gives us the ability to submit unresponsiveness offence reports.
301		type ReportUnresponsiveness: ReportOffence<
302			Self::AccountId,
303			IdentificationTuple<Self>,
304			UnresponsivenessOffence<IdentificationTuple<Self>>,
305		>;
306
307		/// A configuration for base priority of unsigned transactions.
308		///
309		/// This is exposed so that it can be tuned for particular runtime, when
310		/// multiple pallets send unsigned transactions.
311		#[pallet::constant]
312		type UnsignedPriority: Get<TransactionPriority>;
313
314		/// Weight information for extrinsics in this pallet.
315		type WeightInfo: WeightInfo;
316	}
317
318	#[pallet::event]
319	#[pallet::generate_deposit(pub(super) fn deposit_event)]
320	pub enum Event<T: Config> {
321		/// A new heartbeat was received from `AuthorityId`.
322		HeartbeatReceived { authority_id: T::AuthorityId },
323		/// At the end of the session, no offence was committed.
324		AllGood,
325		/// At the end of the session, at least one validator was found to be offline.
326		SomeOffline { offline: Vec<IdentificationTuple<T>> },
327	}
328
329	#[pallet::error]
330	pub enum Error<T> {
331		/// Non existent public key.
332		InvalidKey,
333		/// Duplicated heartbeat.
334		DuplicatedHeartbeat,
335	}
336
337	/// The block number after which it's ok to send heartbeats in the current
338	/// session.
339	///
340	/// At the beginning of each session we set this to a value that should fall
341	/// roughly in the middle of the session duration. The idea is to first wait for
342	/// the validators to produce a block in the current session, so that the
343	/// heartbeat later on will not be necessary.
344	///
345	/// This value will only be used as a fallback if we fail to get a proper session
346	/// progress estimate from `NextSessionRotation`, as those estimates should be
347	/// more accurate then the value we calculate for `HeartbeatAfter`.
348	#[pallet::storage]
349	pub type HeartbeatAfter<T: Config> = StorageValue<_, BlockNumberFor<T>, ValueQuery>;
350
351	/// The current set of keys that may issue a heartbeat.
352	#[pallet::storage]
353	pub type Keys<T: Config> =
354		StorageValue<_, WeakBoundedVec<T::AuthorityId, T::MaxKeys>, ValueQuery>;
355
356	/// For each session index, we keep a mapping of `SessionIndex` and `AuthIndex`.
357	#[pallet::storage]
358	pub type ReceivedHeartbeats<T: Config> =
359		StorageDoubleMap<_, Twox64Concat, SessionIndex, Twox64Concat, AuthIndex, bool>;
360
361	/// For each session index, we keep a mapping of `ValidatorId<T>` to the
362	/// number of blocks authored by the given authority.
363	#[pallet::storage]
364	pub type AuthoredBlocks<T: Config> = StorageDoubleMap<
365		_,
366		Twox64Concat,
367		SessionIndex,
368		Twox64Concat,
369		ValidatorId<T>,
370		u32,
371		ValueQuery,
372	>;
373
374	#[pallet::genesis_config]
375	#[derive(frame_support::DefaultNoBound)]
376	pub struct GenesisConfig<T: Config> {
377		pub keys: Vec<T::AuthorityId>,
378	}
379
380	#[pallet::genesis_build]
381	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
382		fn build(&self) {
383			Pallet::<T>::initialize_keys(&self.keys);
384		}
385	}
386
387	#[pallet::call]
388	impl<T: Config> Pallet<T> {
389		/// ## Complexity:
390		/// - `O(K)` where K is length of `Keys` (heartbeat.validators_len)
391		///   - `O(K)`: decoding of length `K`
392		#[pallet::call_index(0)]
393		#[pallet::weight(<T as Config>::WeightInfo::heartbeat(
394			heartbeat.validators_len,
395		))]
396		#[pallet::weight_of_authorize(<T as Config>::WeightInfo::authorize_heartbeat(
397			heartbeat.validators_len,
398		))]
399		#[pallet::authorize(Self::authorize_heartbeat_call)]
400		pub fn heartbeat(
401			origin: OriginFor<T>,
402			heartbeat: Heartbeat<BlockNumberFor<T>>,
403			// since signature verification is done in `authorize`
404			// we can skip doing it here again.
405			_signature: <T::AuthorityId as RuntimeAppPublic>::Signature,
406		) -> DispatchResult {
407			ensure_authorized(origin)?;
408
409			let current_session = T::ValidatorSet::session_index();
410			let exists =
411				ReceivedHeartbeats::<T>::contains_key(current_session, heartbeat.authority_index);
412			let keys = Keys::<T>::get();
413			let public = keys.get(heartbeat.authority_index as usize);
414			if let (false, Some(public)) = (exists, public) {
415				Self::deposit_event(Event::<T>::HeartbeatReceived { authority_id: public.clone() });
416
417				ReceivedHeartbeats::<T>::insert(current_session, heartbeat.authority_index, true);
418
419				Ok(())
420			} else if exists {
421				Err(Error::<T>::DuplicatedHeartbeat.into())
422			} else {
423				Err(Error::<T>::InvalidKey.into())
424			}
425		}
426	}
427
428	#[pallet::hooks]
429	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
430		fn offchain_worker(now: BlockNumberFor<T>) {
431			// Only send messages if we are a potential validator.
432			if sp_io::offchain::is_validator() {
433				for res in Self::send_heartbeats(now).into_iter().flatten() {
434					if let Err(e) = res {
435						log::debug!(
436							target: "runtime::im-online",
437							"Skipping heartbeat at {:?}: {:?}",
438							now,
439							e,
440						)
441					}
442				}
443			} else {
444				log::trace!(
445					target: "runtime::im-online",
446					"Skipping heartbeat at {:?}. Not a validator.",
447					now,
448				)
449			}
450		}
451	}
452
453	/// Invalid transaction custom error. Returned when validators_len field in heartbeat is
454	/// incorrect.
455	pub(crate) const INVALID_VALIDATORS_LEN: u8 = 10;
456}
457
458/// Keep track of number of authored blocks per authority, uncles are counted as
459/// well since they're a valid proof of being online.
460impl<T: Config + pallet_authorship::Config>
461	pallet_authorship::EventHandler<ValidatorId<T>, BlockNumberFor<T>> for Pallet<T>
462{
463	fn note_author(author: ValidatorId<T>) {
464		Self::note_authorship(author);
465	}
466}
467
468impl<T: Config> Pallet<T> {
469	/// Returns `true` if a heartbeat has been received for the authority at
470	/// `authority_index` in the authorities series or if the authority has
471	/// authored at least one block, during the current session. Otherwise
472	/// `false`.
473	pub fn is_online(authority_index: AuthIndex) -> bool {
474		let current_validators = T::ValidatorSet::validators();
475
476		if authority_index >= current_validators.len() as u32 {
477			return false;
478		}
479
480		let authority = &current_validators[authority_index as usize];
481
482		Self::is_online_aux(authority_index, authority)
483	}
484
485	fn is_online_aux(authority_index: AuthIndex, authority: &ValidatorId<T>) -> bool {
486		let current_session = T::ValidatorSet::session_index();
487
488		ReceivedHeartbeats::<T>::contains_key(current_session, authority_index) ||
489			AuthoredBlocks::<T>::get(current_session, authority) != 0
490	}
491
492	/// Returns `true` if a heartbeat has been received for the authority at `authority_index` in
493	/// the authorities series, during the current session. Otherwise `false`.
494	pub fn received_heartbeat_in_current_session(authority_index: AuthIndex) -> bool {
495		let current_session = T::ValidatorSet::session_index();
496		ReceivedHeartbeats::<T>::contains_key(current_session, authority_index)
497	}
498
499	/// Note that the given authority has authored a block in the current session.
500	fn note_authorship(author: ValidatorId<T>) {
501		let current_session = T::ValidatorSet::session_index();
502
503		AuthoredBlocks::<T>::mutate(current_session, author, |authored| *authored += 1);
504	}
505
506	pub(crate) fn send_heartbeats(
507		block_number: BlockNumberFor<T>,
508	) -> OffchainResult<T, impl Iterator<Item = OffchainResult<T, ()>>> {
509		const START_HEARTBEAT_RANDOM_PERIOD: Permill = Permill::from_percent(10);
510		const START_HEARTBEAT_FINAL_PERIOD: Permill = Permill::from_percent(80);
511
512		// this should give us a residual probability of 1/SESSION_LENGTH of sending an heartbeat,
513		// i.e. all heartbeats spread uniformly, over most of the session. as the session progresses
514		// the probability of sending an heartbeat starts to increase exponentially.
515		let random_choice = |progress: Permill| {
516			// given session progress `p` and session length `l`
517			// the threshold formula is: p^6 + 1/l
518			let session_length = T::NextSessionRotation::average_session_length();
519			let residual = Permill::from_rational(1u32, session_length.saturated_into());
520			let threshold: Permill = progress.saturating_pow(6).saturating_add(residual);
521
522			let seed = sp_io::offchain::random_seed();
523			let random = <u32>::decode(&mut TrailingZeroInput::new(seed.as_ref()))
524				.expect("input is padded with zeroes; qed");
525			let random = Permill::from_parts(random % Permill::ACCURACY);
526
527			random <= threshold
528		};
529
530		let should_heartbeat = if let (Some(progress), _) =
531			T::NextSessionRotation::estimate_current_session_progress(block_number)
532		{
533			// we try to get an estimate of the current session progress first since it should
534			// provide more accurate results. we will start an early heartbeat period where we'll
535			// randomly pick whether to heartbeat. after 80% of the session has elapsed, if we
536			// haven't sent an heartbeat yet we'll send one unconditionally. the idea is to prevent
537			// all nodes from sending the heartbeats at the same block and causing a temporary (but
538			// deterministic) spike in transactions.
539			progress >= START_HEARTBEAT_FINAL_PERIOD ||
540				progress >= START_HEARTBEAT_RANDOM_PERIOD && random_choice(progress)
541		} else {
542			// otherwise we fallback to using the block number calculated at the beginning
543			// of the session that should roughly correspond to the middle of the session
544			let heartbeat_after = <HeartbeatAfter<T>>::get();
545			block_number >= heartbeat_after
546		};
547
548		if !should_heartbeat {
549			return Err(OffchainErr::TooEarly);
550		}
551
552		let session_index = T::ValidatorSet::session_index();
553		let validators_len = Keys::<T>::decode_len().unwrap_or_default() as u32;
554
555		Ok(Self::local_authority_keys().map(move |(authority_index, key)| {
556			Self::send_single_heartbeat(
557				authority_index,
558				key,
559				session_index,
560				block_number,
561				validators_len,
562			)
563		}))
564	}
565
566	fn send_single_heartbeat(
567		authority_index: u32,
568		key: T::AuthorityId,
569		session_index: SessionIndex,
570		block_number: BlockNumberFor<T>,
571		validators_len: u32,
572	) -> OffchainResult<T, ()> {
573		// A helper function to prepare heartbeat call.
574		let prepare_heartbeat = || -> OffchainResult<T, Call<T>> {
575			let heartbeat =
576				Heartbeat { block_number, session_index, authority_index, validators_len };
577
578			let signature = key.sign(&heartbeat.encode()).ok_or(OffchainErr::FailedSigning)?;
579
580			Ok(Call::heartbeat { heartbeat, signature })
581		};
582
583		if Self::is_online(authority_index) {
584			return Err(OffchainErr::AlreadyOnline(authority_index));
585		}
586
587		// acquire lock for that authority at current heartbeat to make sure we don't
588		// send concurrent heartbeats.
589		Self::with_heartbeat_lock(authority_index, session_index, block_number, || {
590			let call = prepare_heartbeat()?;
591			log::info!(
592				target: "runtime::im-online",
593				"[index: {:?}] Reporting im-online at block: {:?} (session: {:?}): {:?}",
594				authority_index,
595				block_number,
596				session_index,
597				call,
598			);
599
600			let xt = T::create_authorized_transaction(call.into());
601			SubmitTransaction::<T, Call<T>>::submit_transaction(xt)
602				.map_err(|_| OffchainErr::SubmitTransaction)?;
603
604			Ok(())
605		})
606	}
607
608	fn local_authority_keys() -> impl Iterator<Item = (u32, T::AuthorityId)> {
609		// on-chain storage
610		//
611		// At index `idx`:
612		// 1. A (ImOnline) public key to be used by a validator at index `idx` to send im-online
613		//    heartbeats.
614		let authorities = Keys::<T>::get();
615
616		// local keystore
617		//
618		// All `ImOnline` public (+private) keys currently in the local keystore.
619		let mut local_keys = T::AuthorityId::all();
620
621		local_keys.sort();
622
623		authorities.into_iter().enumerate().filter_map(move |(index, authority)| {
624			local_keys
625				.binary_search(&authority)
626				.ok()
627				.map(|location| (index as u32, local_keys[location].clone()))
628		})
629	}
630
631	fn with_heartbeat_lock<R>(
632		authority_index: u32,
633		session_index: SessionIndex,
634		now: BlockNumberFor<T>,
635		f: impl FnOnce() -> OffchainResult<T, R>,
636	) -> OffchainResult<T, R> {
637		let key = {
638			let mut key = DB_PREFIX.to_vec();
639			key.extend(authority_index.encode());
640			key
641		};
642		let storage = StorageValueRef::persistent(&key);
643		let res = storage.mutate(
644			|status: Result<Option<HeartbeatStatus<BlockNumberFor<T>>>, StorageRetrievalError>| {
645				// Check if there is already a lock for that particular block.
646				// This means that the heartbeat has already been sent, and we are just waiting
647				// for it to be included. However if it doesn't get included for INCLUDE_THRESHOLD
648				// we will re-send it.
649				match status {
650					// we are still waiting for inclusion.
651					Ok(Some(status)) if status.is_recent(session_index, now) => {
652						Err(OffchainErr::WaitingForInclusion(status.sent_at))
653					},
654					// attempt to set new status
655					_ => Ok(HeartbeatStatus { session_index, sent_at: now }),
656				}
657			},
658		);
659		if let Err(MutateStorageError::ValueFunctionFailed(err)) = res {
660			return Err(err);
661		}
662
663		let mut new_status = res.map_err(|_| OffchainErr::FailedToAcquireLock)?;
664
665		// we got the lock, let's try to send the heartbeat.
666		let res = f();
667
668		// clear the lock in case we have failed to send transaction.
669		if res.is_err() {
670			new_status.sent_at = 0u32.into();
671			storage.set(&new_status);
672		}
673
674		res
675	}
676
677	fn initialize_keys(keys: &[T::AuthorityId]) {
678		if !keys.is_empty() {
679			assert!(Keys::<T>::get().is_empty(), "Keys are already initialized!");
680			let bounded_keys = <BoundedSlice<'_, _, T::MaxKeys>>::try_from(keys)
681				.expect("More than the maximum number of keys provided");
682			Keys::<T>::put(bounded_keys);
683		}
684	}
685
686	/// Authorization callback for the `heartbeat` call.
687	///
688	/// Validates that the heartbeat is from a recognized authority in the current session
689	/// and that the signature is correct. Cheap checks (staleness, session index, authority
690	/// membership) run first; the expensive signature verification runs last.
691	fn authorize_heartbeat_call(
692		_source: TransactionSource,
693		heartbeat: &Heartbeat<BlockNumberFor<T>>,
694		signature: &<T::AuthorityId as RuntimeAppPublic>::Signature,
695	) -> TransactionValidityWithRefund {
696		if Pallet::<T>::is_online(heartbeat.authority_index) {
697			// we already received a heartbeat for this authority
698			return Err(InvalidTransaction::Stale.into());
699		}
700
701		// check if session index from heartbeat is recent
702		let current_session = T::ValidatorSet::session_index();
703		if heartbeat.session_index != current_session {
704			return Err(InvalidTransaction::Stale.into());
705		}
706
707		// verify that the incoming (unverified) pubkey is actually an authority id
708		let keys = Keys::<T>::get();
709		if keys.len() as u32 != heartbeat.validators_len {
710			return Err(InvalidTransaction::Custom(INVALID_VALIDATORS_LEN).into());
711		}
712		let authority_id = match keys.get(heartbeat.authority_index as usize) {
713			Some(id) => id,
714			None => return Err(InvalidTransaction::BadProof.into()),
715		};
716
717		// check signature (this is expensive so we do it last).
718		let signature_valid = heartbeat
719			.using_encoded(|encoded_heartbeat| authority_id.verify(&encoded_heartbeat, signature));
720
721		if !signature_valid {
722			return Err(InvalidTransaction::BadProof.into());
723		}
724
725		ValidTransaction::with_tag_prefix("ImOnline")
726			.priority(T::UnsignedPriority::get())
727			.and_provides((current_session, authority_id))
728			.longevity(
729				TryInto::<u64>::try_into(
730					T::NextSessionRotation::average_session_length() / 2u32.into(),
731				)
732				.unwrap_or(64_u64),
733			)
734			.propagate(true)
735			.build()
736			.map(|v| (v, Weight::zero()))
737	}
738
739	#[cfg(test)]
740	fn set_keys(keys: Vec<T::AuthorityId>) {
741		let bounded_keys = WeakBoundedVec::<_, T::MaxKeys>::try_from(keys)
742			.expect("More than the maximum number of keys provided");
743		Keys::<T>::put(bounded_keys);
744	}
745}
746
747impl<T: Config> sp_runtime::BoundToRuntimeAppPublic for Pallet<T> {
748	type Public = T::AuthorityId;
749}
750
751impl<T: Config> OneSessionHandler<T::AccountId> for Pallet<T> {
752	type Key = T::AuthorityId;
753
754	fn on_genesis_session<'a, I: 'a>(validators: I)
755	where
756		I: Iterator<Item = (&'a T::AccountId, T::AuthorityId)>,
757	{
758		let keys = validators.map(|x| x.1).collect::<Vec<_>>();
759		Self::initialize_keys(&keys);
760	}
761
762	fn on_new_session<'a, I: 'a>(_changed: bool, validators: I, _queued_validators: I)
763	where
764		I: Iterator<Item = (&'a T::AccountId, T::AuthorityId)>,
765	{
766		// Tell the offchain worker to start making the next session's heartbeats.
767		// Since we consider producing blocks as being online,
768		// the heartbeat is deferred a bit to prevent spamming.
769		let block_number = <frame_system::Pallet<T>>::block_number();
770		let half_session = T::NextSessionRotation::average_session_length() / 2u32.into();
771		<HeartbeatAfter<T>>::put(block_number + half_session);
772
773		// Remember who the authorities are for the new session.
774		let keys = validators.map(|x| x.1).collect::<Vec<_>>();
775		let bounded_keys = WeakBoundedVec::<_, T::MaxKeys>::force_from(
776			keys,
777			Some(
778				"Warning: The session has more keys than expected. \
779  				A runtime configuration adjustment may be needed.",
780			),
781		);
782		Keys::<T>::put(bounded_keys);
783	}
784
785	fn on_before_session_ending() {
786		let session_index = T::ValidatorSet::session_index();
787		let keys = Keys::<T>::get();
788		let current_validators = T::ValidatorSet::validators();
789
790		let offenders = current_validators
791			.into_iter()
792			.enumerate()
793			.filter(|(index, id)| !Self::is_online_aux(*index as u32, id))
794			.filter_map(|(_, id)| {
795				<T::ValidatorSet as ValidatorSetWithIdentification<T::AccountId>>::IdentificationOf::convert(
796					id.clone()
797				).map(|full_id| (id, full_id))
798			})
799			.collect::<Vec<IdentificationTuple<T>>>();
800
801		// Remove all received heartbeats and number of authored blocks from the
802		// current session, they have already been processed and won't be needed
803		// anymore.
804		#[allow(deprecated)]
805		ReceivedHeartbeats::<T>::remove_prefix(T::ValidatorSet::session_index(), None);
806		#[allow(deprecated)]
807		AuthoredBlocks::<T>::remove_prefix(T::ValidatorSet::session_index(), None);
808
809		if offenders.is_empty() {
810			Self::deposit_event(Event::<T>::AllGood);
811		} else {
812			Self::deposit_event(Event::<T>::SomeOffline { offline: offenders.clone() });
813
814			let validator_set_count = keys.len() as u32;
815			let offence = UnresponsivenessOffence { session_index, validator_set_count, offenders };
816			if let Err(e) = T::ReportUnresponsiveness::report_offence(vec![], offence) {
817				sp_runtime::print(e);
818			}
819		}
820	}
821
822	fn on_disabled(_i: u32) {
823		// ignore
824	}
825}
826
827/// An offence that is filed if a validator didn't send a heartbeat message.
828#[derive(Debug, TypeInfo)]
829#[cfg_attr(feature = "std", derive(Clone, PartialEq, Eq))]
830pub struct UnresponsivenessOffence<Offender> {
831	/// The current session index in which we report the unresponsive validators.
832	///
833	/// It acts as a time measure for unresponsiveness reports and effectively will always point
834	/// at the end of the session.
835	pub session_index: SessionIndex,
836	/// The size of the validator set in current session/era.
837	pub validator_set_count: u32,
838	/// Authorities that were unresponsive during the current era.
839	pub offenders: Vec<Offender>,
840}
841
842impl<Offender: Clone> Offence<Offender> for UnresponsivenessOffence<Offender> {
843	const ID: Kind = *b"im-online:offlin";
844	type TimeSlot = SessionIndex;
845
846	fn offenders(&self) -> Vec<Offender> {
847		self.offenders.clone()
848	}
849
850	fn session_index(&self) -> SessionIndex {
851		self.session_index
852	}
853
854	fn validator_set_count(&self) -> u32 {
855		self.validator_set_count
856	}
857
858	fn time_slot(&self) -> Self::TimeSlot {
859		self.session_index
860	}
861
862	fn slash_fraction(&self, offenders: u32) -> Perbill {
863		// the formula is min((3 * (k - (n / 10 + 1))) / n, 1) * 0.07
864		// basically, 10% can be offline with no slash, but after that, it linearly climbs up to 7%
865		// when 13/30 are offline (around 5% when 1/3 are offline).
866		if let Some(threshold) = offenders.checked_sub(self.validator_set_count / 10 + 1) {
867			let x = Perbill::from_rational(3 * threshold, self.validator_set_count);
868			x.saturating_mul(Perbill::from_percent(7))
869		} else {
870			Perbill::default()
871		}
872	}
873}