referrerpolicy=no-referrer-when-downgrade

pallet_staking_async_ah_client/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! The client for AssetHub, intended to be used in the relay chain.
19//!
20//! The counter-part for this pallet is `pallet-staking-async-rc-client` on AssetHub.
21//!
22//! This documentation is divided into the following sections:
23//!
24//! 1. Incoming messages: the messages that we receive from the relay chian.
25//! 2. Outgoing messages: the messaged that we sent to the relay chain.
26//! 3. Local interfaces: the interfaces that we expose to other pallets in the runtime.
27//!
28//! ## Incoming Messages
29//!
30//! All incoming messages are handled via [`Call`]. They are all gated to be dispatched only by
31//! [`Config::AssetHubOrigin`]. The only one is:
32//!
33//! * [`Call::validator_set`]: A new validator set for a planning session index.
34//!
35//! ## Outgoing Messages
36//!
37//! All outgoing messages are handled by a single trait
38//! [`pallet_staking_async_rc_client::SendToAssetHub`]. They match the incoming messages of the
39//! `rc-client` pallet.
40//!
41//! ## Local Interfaces:
42//!
43//! Living on the relay chain, this pallet must:
44//!
45//! * Implement [`pallet_session::SessionManager`] (and historical variant thereof) to _give_
46//!   information to the session pallet.
47//! * Implements [`SessionInterface`] to _receive_ information from the session pallet
48//! * Implement [`sp_staking::offence::OnOffenceHandler`].
49//! * Implement reward related APIs ([`frame_support::traits::RewardsReporter`]).
50//!
51//! ## Future Plans
52//!
53//! * Governance functions to force set validators.
54
55#![cfg_attr(not(feature = "std"), no_std)]
56
57pub use pallet::*;
58
59#[cfg(test)]
60pub mod mock;
61
62extern crate alloc;
63use alloc::vec::Vec;
64use frame_support::{
65	pallet_prelude::*,
66	traits::{Defensive, DefensiveSaturating, RewardsReporter},
67};
68pub use pallet_staking_async_rc_client::SendToAssetHub;
69use pallet_staking_async_rc_client::{self as rc_client};
70use sp_runtime::SaturatedConversion;
71use sp_staking::{
72	offence::{OffenceDetails, OffenceSeverity},
73	SessionIndex,
74};
75
76/// The balance type seen from this pallet's PoV.
77pub type BalanceOf<T> = <T as Config>::CurrencyBalance;
78
79/// Type alias for offence details
80pub type OffenceDetailsOf<T> = OffenceDetails<
81	<T as frame_system::Config>::AccountId,
82	(
83		<T as frame_system::Config>::AccountId,
84		sp_staking::Exposure<<T as frame_system::Config>::AccountId, BalanceOf<T>>,
85	),
86>;
87
88const LOG_TARGET: &str = "runtime::staking-async::ah-client";
89
90// syntactic sugar for logging.
91#[macro_export]
92macro_rules! log {
93	($level:tt, $patter:expr $(, $values:expr)* $(,)?) => {
94		log::$level!(
95			target: $crate::LOG_TARGET,
96			concat!("[{:?}] ⬇️ ", $patter), <frame_system::Pallet<T>>::block_number() $(, $values)*
97		)
98	};
99}
100
101/// Interface to talk to the local session pallet.
102pub trait SessionInterface {
103	/// The validator id type of the session pallet
104	type ValidatorId: Clone;
105
106	fn validators() -> Vec<Self::ValidatorId>;
107
108	/// prune up to the given session index.
109	fn prune_up_to(index: SessionIndex);
110
111	/// Report an offence.
112	///
113	/// This is used to disable validators directly on the RC, until the next validator set.
114	fn report_offence(offender: Self::ValidatorId, severity: OffenceSeverity);
115}
116
117impl<T: Config + pallet_session::Config + pallet_session::historical::Config> SessionInterface
118	for T
119{
120	type ValidatorId = <T as pallet_session::Config>::ValidatorId;
121
122	fn validators() -> Vec<Self::ValidatorId> {
123		pallet_session::Pallet::<T>::validators()
124	}
125
126	fn prune_up_to(index: SessionIndex) {
127		pallet_session::historical::Pallet::<T>::prune_up_to(index)
128	}
129	fn report_offence(offender: Self::ValidatorId, severity: OffenceSeverity) {
130		pallet_session::Pallet::<T>::report_offence(offender, severity)
131	}
132}
133
134/// Represents the operating mode of the pallet.
135#[derive(
136	Default,
137	DecodeWithMemTracking,
138	Encode,
139	Decode,
140	MaxEncodedLen,
141	TypeInfo,
142	Clone,
143	PartialEq,
144	Eq,
145	RuntimeDebug,
146	serde::Serialize,
147	serde::Deserialize,
148)]
149pub enum OperatingMode {
150	/// Fully delegated mode.
151	///
152	/// In this mode, the pallet performs no core logic and forwards all relevant operations
153	/// to the fallback implementation defined in the pallet's `Config::Fallback`.
154	///
155	/// This mode is useful when staking is in synchronous mode and waiting for the signal to
156	/// transition to asynchronous mode.
157	#[default]
158	Passive,
159
160	/// Buffered mode for deferred execution.
161	///
162	/// In this mode, offences are accepted and buffered for later transmission to AssetHub.
163	/// However, session change reports are dropped.
164	///
165	/// This mode is useful when the counterpart pallet `pallet-staking-async-rc-client` on
166	/// AssetHub is not yet ready to process incoming messages.
167	Buffered,
168
169	/// Fully active mode.
170	///
171	/// The pallet performs all core logic directly and handles messages immediately.
172	///
173	/// This mode is useful when staking is ready to execute in asynchronous mode and the
174	/// counterpart pallet `pallet-staking-async-rc-client` is ready to accept messages.
175	Active,
176}
177
178impl OperatingMode {
179	fn can_accept_validator_set(&self) -> bool {
180		matches!(self, OperatingMode::Active)
181	}
182}
183
184/// See `pallet_staking::DefaultExposureOf`. This type is the same, except it is duplicated here so
185/// that an rc-runtime can use it after `pallet-staking` is fully removed as a dependency.
186pub struct DefaultExposureOf<T>(core::marker::PhantomData<T>);
187
188impl<T: Config>
189	sp_runtime::traits::Convert<
190		T::AccountId,
191		Option<sp_staking::Exposure<T::AccountId, BalanceOf<T>>>,
192	> for DefaultExposureOf<T>
193{
194	fn convert(
195		validator: T::AccountId,
196	) -> Option<sp_staking::Exposure<T::AccountId, BalanceOf<T>>> {
197		T::SessionInterface::validators()
198			.contains(&validator)
199			.then_some(Default::default())
200	}
201}
202
203#[frame_support::pallet]
204pub mod pallet {
205	use crate::*;
206	use alloc::vec;
207	use frame_support::traits::{Hooks, UnixTime};
208	use frame_system::pallet_prelude::*;
209	use pallet_session::{historical, SessionManager};
210	use pallet_staking_async_rc_client::SessionReport;
211	use sp_runtime::{Perbill, Saturating};
212	use sp_staking::{
213		offence::{OffenceSeverity, OnOffenceHandler},
214		SessionIndex,
215	};
216
217	const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
218
219	#[pallet::config]
220	pub trait Config: frame_system::Config {
221		/// The balance type of the runtime's currency interface.
222		type CurrencyBalance: sp_runtime::traits::AtLeast32BitUnsigned
223			+ codec::FullCodec
224			+ DecodeWithMemTracking
225			+ codec::HasCompact<Type: DecodeWithMemTracking>
226			+ Copy
227			+ MaybeSerializeDeserialize
228			+ core::fmt::Debug
229			+ Default
230			+ From<u64>
231			+ TypeInfo
232			+ Send
233			+ Sync
234			+ MaxEncodedLen;
235
236		/// An origin type that ensures an incoming message is from asset hub.
237		type AssetHubOrigin: EnsureOrigin<Self::RuntimeOrigin>;
238
239		/// The origin that can control this pallet's operations.
240		type AdminOrigin: EnsureOrigin<Self::RuntimeOrigin>;
241
242		/// Our communication interface to AssetHub.
243		type SendToAssetHub: SendToAssetHub<AccountId = Self::AccountId>;
244
245		/// A safety measure that asserts an incoming validator set must be at least this large.
246		type MinimumValidatorSetSize: Get<u32>;
247
248		/// A safety measure that asserts when iterating over validator points (to be sent to AH),
249		/// we don't iterate too many times.
250		///
251		/// Validator may change session to session, and if session reports are not sent, validator
252		/// points that we store may well grow beyond the size of the validator set. Yet, a too
253		/// large of an upper bound may also exceed the maximum size of a single DMP message.
254		/// Consult the test `message_queue_sizes` for more information.
255		///
256		/// Note that in case a single session report is larger than a single DMP message, it might
257		/// still be sent over if we use
258		/// [`pallet_staking_async_rc_client::XCMSender::split_then_send`]. This will make the size
259		/// of each individual message smaller, yet, it will still try and push them all to the
260		/// queue at the same time.
261		type MaximumValidatorsWithPoints: Get<u32>;
262
263		/// A type that gives us a reliable unix timestamp.
264		type UnixTime: UnixTime;
265
266		/// Number of points to award a validator per block authored.
267		type PointsPerBlock: Get<u32>;
268
269		/// Maximum number of offences to batch in a single message to AssetHub. Actual sending
270		/// happens `on_initialize`. Offences get infinite "retries", and are never dropped.
271		///
272		/// A sensible value should be such that sending this batch is small enough to not exhaust
273		/// the DMP queue. The size of a single offence is documented in `message_queue_sizes` test
274		/// (74 bytes).
275		type MaxOffenceBatchSize: Get<u32>;
276
277		/// Interface to talk to the local Session pallet.
278		type SessionInterface: SessionInterface<ValidatorId = Self::AccountId>;
279
280		/// A fallback implementation to delegate logic to when the pallet is in
281		/// [`OperatingMode::Passive`].
282		///
283		/// This type must implement the `historical::SessionManager` and `OnOffenceHandler`
284		/// interface and is expected to behave as a stand-in for this pallet’s core logic when
285		/// delegation is active.
286		type Fallback: pallet_session::SessionManager<Self::AccountId>
287			+ OnOffenceHandler<
288				Self::AccountId,
289				(Self::AccountId, sp_staking::Exposure<Self::AccountId, BalanceOf<Self>>),
290				Weight,
291			> + frame_support::traits::RewardsReporter<Self::AccountId>
292			+ pallet_authorship::EventHandler<Self::AccountId, BlockNumberFor<Self>>;
293
294		/// Maximum number of times we try to send a session report to AssetHub, after which, if
295		/// sending still fails, we drop it.
296		type MaxSessionReportRetries: Get<u32>;
297	}
298
299	#[pallet::pallet]
300	#[pallet::storage_version(STORAGE_VERSION)]
301	pub struct Pallet<T>(_);
302
303	/// The queued validator sets for a given planning session index.
304	///
305	/// This is received via a call from AssetHub.
306	#[pallet::storage]
307	#[pallet::unbounded]
308	pub type ValidatorSet<T: Config> = StorageValue<_, (u32, Vec<T::AccountId>), OptionQuery>;
309
310	/// An incomplete validator set report.
311	#[pallet::storage]
312	#[pallet::unbounded]
313	pub type IncompleteValidatorSetReport<T: Config> =
314		StorageValue<_, rc_client::ValidatorSetReport<T::AccountId>, OptionQuery>;
315
316	/// All of the points of the validators.
317	///
318	/// This is populated during a session, and is flushed and sent over via [`SendToAssetHub`]
319	/// at each session end.
320	#[pallet::storage]
321	pub type ValidatorPoints<T: Config> =
322		StorageMap<_, Twox64Concat, T::AccountId, u32, ValueQuery>;
323
324	/// Indicates the current operating mode of the pallet.
325	///
326	/// This value determines how the pallet behaves in response to incoming and outgoing messages,
327	/// particularly whether it should execute logic directly, defer it, or delegate it entirely.
328	#[pallet::storage]
329	pub type Mode<T: Config> = StorageValue<_, OperatingMode, ValueQuery>;
330
331	/// A storage value that is set when a `new_session` gives a new validator set to the session
332	/// pallet, and is cleared on the next call.
333	///
334	/// The inner u32 is the id of the said activated validator set. While not relevant here, good
335	/// to know this is the planning era index of staking-async on AH.
336	///
337	/// Once cleared, we know a validator set has been activated, and therefore we can send a
338	/// timestamp to AH.
339	#[pallet::storage]
340	pub type NextSessionChangesValidators<T: Config> = StorageValue<_, u32, OptionQuery>;
341
342	/// The session index at which the latest elected validator set was applied.
343	///
344	/// This is used to determine if an offence, given a session index, is in the current active era
345	/// or not.
346	#[pallet::storage]
347	pub type ValidatorSetAppliedAt<T: Config> = StorageValue<_, SessionIndex, OptionQuery>;
348
349	/// A session report that is outgoing, and should be sent.
350	///
351	/// This will be attempted to be sent, possibly on every `on_initialize` call, until it is sent,
352	/// or the second value reaches zero, at which point we drop it.
353	#[pallet::storage]
354	#[pallet::unbounded]
355	pub type OutgoingSessionReport<T: Config> =
356		StorageValue<_, (SessionReport<T::AccountId>, u32), OptionQuery>;
357
358	/// Wrapper struct for storing offences, and getting them back page by page.
359	///
360	/// It has only two interfaces:
361	///
362	/// * [`OffenceSendQueue::append`], to add a single offence.
363	/// * [`OffenceSendQueue::get_and_maybe_delete`] which retrieves the last page. Depending on the
364	///   closure, it may also delete that page. The returned value is indeed
365	///   [`Config::MaxOffenceBatchSize`] or less items.
366	///
367	/// Internally, it manages `OffenceSendQueueOffences` and `OffenceSendQueueCursor`, both of
368	/// which should NEVER be used manually.
369	pub struct OffenceSendQueue<T: Config>(core::marker::PhantomData<T>);
370
371	/// A single buffered offence in [`OffenceSendQueue`].
372	pub type QueuedOffenceOf<T> =
373		(SessionIndex, rc_client::Offence<<T as frame_system::Config>::AccountId>);
374	/// A page of buffered offences in [`OffenceSendQueue`].
375	pub type QueuedOffencePageOf<T> =
376		BoundedVec<QueuedOffenceOf<T>, <T as Config>::MaxOffenceBatchSize>;
377
378	impl<T: Config> OffenceSendQueue<T> {
379		/// Add a single offence to the queue.
380		pub fn append(o: QueuedOffenceOf<T>) {
381			let mut index = OffenceSendQueueCursor::<T>::get();
382			match OffenceSendQueueOffences::<T>::try_mutate(index, |b| b.try_push(o.clone())) {
383				Ok(_) => {
384					// `index` had empty slot -- all good.
385				},
386				Err(_) => {
387					debug_assert!(
388						!OffenceSendQueueOffences::<T>::contains_key(index + 1),
389						"next page should be empty"
390					);
391					index += 1;
392					OffenceSendQueueOffences::<T>::insert(
393						index,
394						BoundedVec::<_, _>::try_from(vec![o]).defensive_unwrap_or_default(),
395					);
396					OffenceSendQueueCursor::<T>::mutate(|i| *i += 1);
397				},
398			}
399		}
400
401		// Get the last page of offences, and delete it if `op` returns `Ok(())`.
402		pub fn get_and_maybe_delete(op: impl FnOnce(QueuedOffencePageOf<T>) -> Result<(), ()>) {
403			let index = OffenceSendQueueCursor::<T>::get();
404			let page = OffenceSendQueueOffences::<T>::get(index);
405			let res = op(page);
406			match res {
407				Ok(_) => {
408					OffenceSendQueueOffences::<T>::remove(index);
409					OffenceSendQueueCursor::<T>::mutate(|i| *i = i.saturating_sub(1))
410				},
411				Err(_) => {
412					// nada
413				},
414			}
415		}
416
417		#[cfg(feature = "std")]
418		pub fn pages() -> u32 {
419			let last_page = if Self::last_page_empty() { 0 } else { 1 };
420			OffenceSendQueueCursor::<T>::get().saturating_add(last_page)
421		}
422
423		#[cfg(feature = "std")]
424		pub fn count() -> u32 {
425			let last_index = OffenceSendQueueCursor::<T>::get();
426			let last_page = OffenceSendQueueOffences::<T>::get(last_index);
427			let last_page_count = last_page.len() as u32;
428			last_index.saturating_mul(T::MaxOffenceBatchSize::get()) + last_page_count
429		}
430
431		#[cfg(feature = "std")]
432		fn last_page_empty() -> bool {
433			OffenceSendQueueOffences::<T>::get(OffenceSendQueueCursor::<T>::get()).is_empty()
434		}
435	}
436
437	/// Internal storage item of [`OffenceSendQueue`]. Should not be used manually.
438	#[pallet::storage]
439	#[pallet::unbounded]
440	pub(crate) type OffenceSendQueueOffences<T: Config> =
441		StorageMap<_, Twox64Concat, u32, QueuedOffencePageOf<T>, ValueQuery>;
442	/// Internal storage item of [`OffenceSendQueue`]. Should not be used manually.
443	#[pallet::storage]
444	pub(crate) type OffenceSendQueueCursor<T: Config> = StorageValue<_, u32, ValueQuery>;
445
446	#[pallet::genesis_config]
447	#[derive(frame_support::DefaultNoBound, frame_support::DebugNoBound)]
448	pub struct GenesisConfig<T: Config> {
449		/// The initial operating mode of the pallet.
450		pub operating_mode: OperatingMode,
451		pub _marker: core::marker::PhantomData<T>,
452	}
453
454	#[pallet::genesis_build]
455	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
456		fn build(&self) {
457			// Set the initial operating mode of the pallet.
458			Mode::<T>::put(self.operating_mode.clone());
459		}
460	}
461
462	#[pallet::error]
463	pub enum Error<T> {
464		/// Could not process incoming message because incoming messages are blocked.
465		Blocked,
466	}
467
468	#[pallet::event]
469	#[pallet::generate_deposit(fn deposit_event)]
470	pub enum Event<T: Config> {
471		/// A new validator set has been received.
472		ValidatorSetReceived {
473			id: u32,
474			new_validator_set_count: u32,
475			prune_up_to: Option<SessionIndex>,
476			leftover: bool,
477		},
478		/// We could not merge, and therefore dropped a buffered message.
479		///
480		/// Note that this event is more resembling an error, but we use an event because in this
481		/// pallet we need to mutate storage upon some failures.
482		CouldNotMergeAndDropped,
483		/// The validator set received is way too small, as per
484		/// [`Config::MinimumValidatorSetSize`].
485		SetTooSmallAndDropped,
486		/// Something occurred that should never happen under normal operation. Logged as an event
487		/// for fail-safe observability.
488		Unexpected(UnexpectedKind),
489	}
490
491	/// Represents unexpected or invariant-breaking conditions encountered during execution.
492	///
493	/// These variants are emitted as [`Event::Unexpected`] and indicate a defensive check has
494	/// failed. While these should never occur under normal operation, they are useful for
495	/// diagnosing issues in production or test environments.
496	#[derive(Clone, Encode, Decode, DecodeWithMemTracking, PartialEq, TypeInfo, RuntimeDebug)]
497	pub enum UnexpectedKind {
498		/// A validator set was received while the pallet is in [`OperatingMode::Passive`].
499		ReceivedValidatorSetWhilePassive,
500
501		/// An unexpected transition was applied between operating modes.
502		///
503		/// Expected transitions are linear and forward-only: `Passive` → `Buffered` → `Active`.
504		UnexpectedModeTransition,
505
506		/// A session report failed to be sent.
507		///
508		/// We will store, and retry it for a number of more block.
509		SessionReportSendFailed,
510
511		/// A session report failed enough times that we should drop it.
512		///
513		/// We will retain the validator points, and send them over in the next session we receive
514		/// from pallet-session.
515		SessionReportDropped,
516
517		/// An offence report failed to be sent.
518		///
519		/// It will be retried again in the next block. We never drop them.
520		OffenceSendFailed,
521
522		/// Some validator points didn't make it to be included in the session report. Should
523		/// never happen, and means:
524		///
525		/// * a too low of a value is assigned to [`Config::MaximumValidatorsWithPoints`]
526		/// * Those who are calling into our `RewardsReporter` likely have a bad view of the
527		///   validator set, and are spamming us.
528		ValidatorPointDropped,
529	}
530
531	#[pallet::call]
532	impl<T: Config> Pallet<T> {
533		#[pallet::call_index(0)]
534		#[pallet::weight(
535			// Reads:
536			// - OperatingMode
537			// - IncompleteValidatorSetReport
538			// Writes:
539			// - IncompleteValidatorSetReport or ValidatorSet
540			// ignoring `T::SessionInterface::prune_up_to`
541			T::DbWeight::get().reads_writes(2, 1)
542		)]
543		pub fn validator_set(
544			origin: OriginFor<T>,
545			report: rc_client::ValidatorSetReport<T::AccountId>,
546		) -> DispatchResult {
547			// Ensure the origin is one of Root or whatever is representing AssetHub.
548			log!(debug, "Received new validator set report {}", report);
549			T::AssetHubOrigin::ensure_origin_or_root(origin)?;
550
551			// Check the operating mode.
552			let mode = Mode::<T>::get();
553			ensure!(mode.can_accept_validator_set(), Error::<T>::Blocked);
554
555			let maybe_merged_report = match IncompleteValidatorSetReport::<T>::take() {
556				Some(old) => old.merge(report.clone()),
557				None => Ok(report),
558			};
559
560			if maybe_merged_report.is_err() {
561				Self::deposit_event(Event::CouldNotMergeAndDropped);
562				debug_assert!(
563					IncompleteValidatorSetReport::<T>::get().is_none(),
564					"we have ::take() it above, we don't want to keep the old data"
565				);
566				return Ok(());
567			}
568
569			let report = maybe_merged_report.expect("checked above; qed");
570
571			if report.leftover {
572				// buffer it, and nothing further to do.
573				Self::deposit_event(Event::ValidatorSetReceived {
574					id: report.id,
575					new_validator_set_count: report.new_validator_set.len() as u32,
576					prune_up_to: report.prune_up_to,
577					leftover: report.leftover,
578				});
579				IncompleteValidatorSetReport::<T>::put(report);
580			} else {
581				// message is complete, process it.
582				let rc_client::ValidatorSetReport {
583					id,
584					leftover,
585					mut new_validator_set,
586					prune_up_to,
587				} = report;
588
589				// ensure the validator set, deduplicated, is not too big.
590				new_validator_set.sort();
591				new_validator_set.dedup();
592
593				if (new_validator_set.len() as u32) < T::MinimumValidatorSetSize::get() {
594					Self::deposit_event(Event::SetTooSmallAndDropped);
595					debug_assert!(
596						IncompleteValidatorSetReport::<T>::get().is_none(),
597						"we have ::take() it above, we don't want to keep the old data"
598					);
599					return Ok(());
600				}
601
602				Self::deposit_event(Event::ValidatorSetReceived {
603					id,
604					new_validator_set_count: new_validator_set.len() as u32,
605					prune_up_to,
606					leftover,
607				});
608
609				// Save the validator set.
610				ValidatorSet::<T>::put((id, new_validator_set));
611				if let Some(index) = prune_up_to {
612					T::SessionInterface::prune_up_to(index);
613				}
614			}
615
616			Ok(())
617		}
618
619		/// Allows governance to force set the operating mode of the pallet.
620		#[pallet::call_index(1)]
621		#[pallet::weight(T::DbWeight::get().writes(1))]
622		pub fn set_mode(origin: OriginFor<T>, mode: OperatingMode) -> DispatchResult {
623			T::AdminOrigin::ensure_origin(origin)?;
624			Self::do_set_mode(mode);
625			Ok(())
626		}
627
628		/// manually do what this pallet was meant to do at the end of the migration.
629		#[pallet::call_index(2)]
630		#[pallet::weight(T::DbWeight::get().writes(1))]
631		pub fn force_on_migration_end(origin: OriginFor<T>) -> DispatchResult {
632			T::AdminOrigin::ensure_origin(origin)?;
633			Self::on_migration_end();
634			Ok(())
635		}
636	}
637
638	#[pallet::hooks]
639	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
640		fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
641			let mut weight = Weight::zero();
642
643			let mode = Mode::<T>::get();
644			weight = weight.saturating_add(T::DbWeight::get().reads(1));
645			if mode != OperatingMode::Active {
646				return weight;
647			}
648
649			// if we have any pending session reports, send it.
650			weight.saturating_accrue(T::DbWeight::get().reads(1));
651			if let Some((session_report, retries_left)) = OutgoingSessionReport::<T>::take() {
652				match T::SendToAssetHub::relay_session_report(session_report.clone()) {
653					Ok(()) => {
654						// report was sent, all good, it is already deleted.
655					},
656					Err(()) => {
657						log!(error, "Failed to send session report to assethub");
658						Self::deposit_event(Event::<T>::Unexpected(
659							UnexpectedKind::SessionReportSendFailed,
660						));
661						if let Some(new_retries_left) = retries_left.checked_sub(One::one()) {
662							OutgoingSessionReport::<T>::put((session_report, new_retries_left))
663						} else {
664							// recreate the validator points, so they will be sent in the next
665							// report.
666							session_report.validator_points.into_iter().for_each(|(v, p)| {
667								ValidatorPoints::<T>::mutate(v, |existing_points| {
668									*existing_points = existing_points.defensive_saturating_add(p)
669								});
670							});
671
672							Self::deposit_event(Event::<T>::Unexpected(
673								UnexpectedKind::SessionReportDropped,
674							));
675						}
676					},
677				}
678			}
679
680			// then, take a page from our send queue, and if present, send it.
681			weight.saturating_accrue(T::DbWeight::get().reads(2));
682			OffenceSendQueue::<T>::get_and_maybe_delete(|page| {
683				if page.is_empty() {
684					return Ok(())
685				}
686				// send the page if not empty. If sending returns `Ok`, we delete this page.
687				T::SendToAssetHub::relay_new_offence_paged(page.into_inner()).inspect_err(|_| {
688					Self::deposit_event(Event::Unexpected(UnexpectedKind::OffenceSendFailed));
689				})
690			});
691
692			weight
693		}
694
695		fn integrity_test() {
696			assert!(T::MaxOffenceBatchSize::get() > 0, "Offence Batch size must be at least 1");
697		}
698	}
699
700	impl<T: Config>
701		historical::SessionManager<T::AccountId, sp_staking::Exposure<T::AccountId, BalanceOf<T>>>
702		for Pallet<T>
703	{
704		fn new_session(
705			new_index: sp_staking::SessionIndex,
706		) -> Option<
707			Vec<(
708				<T as frame_system::Config>::AccountId,
709				sp_staking::Exposure<T::AccountId, BalanceOf<T>>,
710			)>,
711		> {
712			<Self as pallet_session::SessionManager<_>>::new_session(new_index)
713				.map(|v| v.into_iter().map(|v| (v, sp_staking::Exposure::default())).collect())
714		}
715
716		fn new_session_genesis(
717			new_index: SessionIndex,
718		) -> Option<Vec<(T::AccountId, sp_staking::Exposure<T::AccountId, BalanceOf<T>>)>> {
719			if Mode::<T>::get() == OperatingMode::Passive {
720				T::Fallback::new_session_genesis(new_index).map(|validators| {
721					validators.into_iter().map(|v| (v, sp_staking::Exposure::default())).collect()
722				})
723			} else {
724				None
725			}
726		}
727
728		fn start_session(start_index: SessionIndex) {
729			<Self as pallet_session::SessionManager<_>>::start_session(start_index)
730		}
731
732		fn end_session(end_index: SessionIndex) {
733			<Self as pallet_session::SessionManager<_>>::end_session(end_index)
734		}
735	}
736
737	impl<T: Config> pallet_session::SessionManager<T::AccountId> for Pallet<T> {
738		fn new_session(session_index: u32) -> Option<Vec<T::AccountId>> {
739			match Mode::<T>::get() {
740				OperatingMode::Passive => T::Fallback::new_session(session_index),
741				// In `Buffered` mode, we drop the session report and do nothing.
742				OperatingMode::Buffered => None,
743				OperatingMode::Active => Self::do_new_session(),
744			}
745		}
746
747		fn start_session(session_index: u32) {
748			if Mode::<T>::get() == OperatingMode::Passive {
749				T::Fallback::start_session(session_index)
750			}
751		}
752
753		fn new_session_genesis(new_index: SessionIndex) -> Option<Vec<T::AccountId>> {
754			if Mode::<T>::get() == OperatingMode::Passive {
755				T::Fallback::new_session_genesis(new_index)
756			} else {
757				None
758			}
759		}
760
761		fn end_session(session_index: u32) {
762			match Mode::<T>::get() {
763				OperatingMode::Passive => T::Fallback::end_session(session_index),
764				// In `Buffered` mode, we drop the session report and do nothing.
765				OperatingMode::Buffered => (),
766				OperatingMode::Active => Self::do_end_session(session_index),
767			}
768		}
769	}
770
771	impl<T: Config>
772		OnOffenceHandler<
773			T::AccountId,
774			(T::AccountId, sp_staking::Exposure<T::AccountId, BalanceOf<T>>),
775			Weight,
776		> for Pallet<T>
777	{
778		fn on_offence(
779			offenders: &[OffenceDetails<
780				T::AccountId,
781				(T::AccountId, sp_staking::Exposure<T::AccountId, BalanceOf<T>>),
782			>],
783			slash_fraction: &[Perbill],
784			slash_session: SessionIndex,
785		) -> Weight {
786			match Mode::<T>::get() {
787				OperatingMode::Passive => {
788					// delegate to the fallback implementation.
789					T::Fallback::on_offence(offenders, slash_fraction, slash_session)
790				},
791				OperatingMode::Buffered =>
792					Self::on_offence_buffered(offenders, slash_fraction, slash_session),
793				OperatingMode::Active =>
794					Self::on_offence_active(offenders, slash_fraction, slash_session),
795			}
796		}
797	}
798
799	impl<T: Config> RewardsReporter<T::AccountId> for Pallet<T> {
800		fn reward_by_ids(rewards: impl IntoIterator<Item = (T::AccountId, u32)>) {
801			match Mode::<T>::get() {
802				OperatingMode::Passive => T::Fallback::reward_by_ids(rewards),
803				OperatingMode::Buffered | OperatingMode::Active => Self::do_reward_by_ids(rewards),
804			}
805		}
806	}
807
808	impl<T: Config> pallet_authorship::EventHandler<T::AccountId, BlockNumberFor<T>> for Pallet<T> {
809		fn note_author(author: T::AccountId) {
810			match Mode::<T>::get() {
811				OperatingMode::Passive => T::Fallback::note_author(author),
812				OperatingMode::Buffered | OperatingMode::Active => Self::do_note_author(author),
813			}
814		}
815	}
816
817	impl<T: Config> Pallet<T> {
818		/// Hook to be called when the AssetHub migration begins.
819		///
820		/// This transitions the pallet into [`OperatingMode::Buffered`], meaning it will act as the
821		/// primary staking module on the relay chain but will buffer outgoing messages instead of
822		/// sending them to AssetHub.
823		///
824		/// While in this mode, the pallet stops delegating to the fallback implementation and
825		/// temporarily accumulates events for later processing.
826		pub fn on_migration_start() {
827			debug_assert!(
828				Mode::<T>::get() == OperatingMode::Passive,
829				"we should only be called when in passive mode"
830			);
831			Self::do_set_mode(OperatingMode::Buffered);
832		}
833
834		/// Hook to be called when the AssetHub migration is complete.
835		///
836		/// This transitions the pallet into [`OperatingMode::Active`], meaning the counterpart
837		/// pallet on AssetHub is ready to accept incoming messages, and this pallet can resume
838		/// sending them.
839		///
840		/// In this mode, the pallet becomes fully active and processes all staking-related events
841		/// directly.
842		pub fn on_migration_end() {
843			debug_assert!(
844				Mode::<T>::get() == OperatingMode::Buffered,
845				"we should only be called when in buffered mode"
846			);
847			Self::do_set_mode(OperatingMode::Active);
848
849			// Buffered offences will be processed gradually by on_initialize
850			// using MaxOffenceBatchSize to prevent block overload.
851		}
852
853		fn do_set_mode(new_mode: OperatingMode) {
854			let old_mode = Mode::<T>::get();
855			let unexpected = match new_mode {
856				// `Passive` is the initial state, and not expected to be set by the user.
857				OperatingMode::Passive => true,
858				OperatingMode::Buffered => old_mode != OperatingMode::Passive,
859				OperatingMode::Active => old_mode != OperatingMode::Buffered,
860			};
861
862			// this is a defensive check, and should never happen under normal operation.
863			if unexpected {
864				log!(warn, "Unexpected mode transition from {:?} to {:?}", old_mode, new_mode);
865				Self::deposit_event(Event::Unexpected(UnexpectedKind::UnexpectedModeTransition));
866			}
867
868			// apply new mode anyway.
869			Mode::<T>::put(new_mode);
870		}
871
872		fn do_new_session() -> Option<Vec<T::AccountId>> {
873			ValidatorSet::<T>::take().map(|(id, val_set)| {
874				// store the id to be sent back in the next session back to AH
875				NextSessionChangesValidators::<T>::put(id);
876				val_set
877			})
878		}
879
880		fn do_end_session(end_index: u32) {
881			// take and delete all validator points, limited by `MaximumValidatorsWithPoints`.
882			let validator_points = ValidatorPoints::<T>::iter()
883				.drain()
884				.take(T::MaximumValidatorsWithPoints::get() as usize)
885				.collect::<Vec<_>>();
886
887			// If there were more validators than `MaximumValidatorsWithPoints`..
888			if ValidatorPoints::<T>::iter().next().is_some() {
889				// ..not much more we can do about it other than an event.
890				Self::deposit_event(Event::<T>::Unexpected(UnexpectedKind::ValidatorPointDropped))
891			}
892
893			let activation_timestamp = NextSessionChangesValidators::<T>::take().map(|id| {
894				// keep track of starting session index at which the validator set was applied.
895				ValidatorSetAppliedAt::<T>::put(end_index + 1);
896				// set the timestamp and the identifier of the validator set.
897				(T::UnixTime::now().as_millis().saturated_into::<u64>(), id)
898			});
899
900			let session_report = pallet_staking_async_rc_client::SessionReport {
901				end_index,
902				validator_points,
903				activation_timestamp,
904				leftover: false,
905			};
906
907			// queue the session report to be sent.
908			OutgoingSessionReport::<T>::put((session_report, T::MaxSessionReportRetries::get()));
909		}
910
911		fn do_reward_by_ids(rewards: impl IntoIterator<Item = (T::AccountId, u32)>) {
912			for (validator_id, points) in rewards {
913				ValidatorPoints::<T>::mutate(validator_id, |balance| {
914					balance.saturating_accrue(points);
915				});
916			}
917		}
918
919		fn do_note_author(author: T::AccountId) {
920			ValidatorPoints::<T>::mutate(author, |points| {
921				points.saturating_accrue(T::PointsPerBlock::get());
922			});
923		}
924
925		/// Check if an offence is from the active validator set.
926		fn is_ongoing_offence(slash_session: SessionIndex) -> bool {
927			ValidatorSetAppliedAt::<T>::get()
928				.map(|start_session| slash_session >= start_session)
929				.unwrap_or(false)
930		}
931
932		/// Handle offences in Buffered mode.
933		fn on_offence_buffered(
934			offenders: &[OffenceDetailsOf<T>],
935			slash_fraction: &[Perbill],
936			slash_session: SessionIndex,
937		) -> Weight {
938			let ongoing_offence = Self::is_ongoing_offence(slash_session);
939
940			offenders.iter().cloned().zip(slash_fraction).for_each(|(offence, fraction)| {
941				if ongoing_offence {
942					// report the offence to the session pallet.
943					T::SessionInterface::report_offence(
944						offence.offender.0.clone(),
945						OffenceSeverity(*fraction),
946					);
947				}
948
949				let (offender, _full_identification) = offence.offender;
950				let reporters = offence.reporters;
951
952				// In `Buffered` mode, we buffer the offences for later processing.
953				OffenceSendQueue::<T>::append((
954					slash_session,
955					rc_client::Offence {
956						offender: offender.clone(),
957						reporters: reporters.into_iter().take(1).collect(),
958						slash_fraction: *fraction,
959					},
960				));
961			});
962
963			T::DbWeight::get().reads_writes(1, 1)
964		}
965
966		/// Handle offences in Active mode.
967		fn on_offence_active(
968			offenders: &[OffenceDetailsOf<T>],
969			slash_fraction: &[Perbill],
970			slash_session: SessionIndex,
971		) -> Weight {
972			let ongoing_offence = Self::is_ongoing_offence(slash_session);
973
974			offenders.iter().cloned().zip(slash_fraction).for_each(|(offence, fraction)| {
975				if ongoing_offence {
976					// report the offence to the session pallet.
977					T::SessionInterface::report_offence(
978						offence.offender.0.clone(),
979						OffenceSeverity(*fraction),
980					);
981				}
982
983				let (offender, _full_identification) = offence.offender;
984				let reporters = offence.reporters;
985
986				// prepare an `Offence` instance for the XCM message. Note that we drop
987				// the identification.
988				let offence = rc_client::Offence {
989					offender,
990					reporters: reporters.into_iter().take(1).collect(),
991					slash_fraction: *fraction,
992				};
993				OffenceSendQueue::<T>::append((slash_session, offence))
994			});
995
996			T::DbWeight::get().reads_writes(2, 2)
997		}
998	}
999}
1000
1001#[cfg(test)]
1002mod send_queue_tests {
1003	use frame_support::hypothetically;
1004	use sp_runtime::Perbill;
1005
1006	use super::*;
1007	use crate::mock::*;
1008
1009	// (cursor, len_of_pages)
1010	fn status() -> (u32, Vec<u32>) {
1011		let mut sorted = OffenceSendQueueOffences::<Test>::iter().collect::<Vec<_>>();
1012		sorted.sort_by(|x, y| x.0.cmp(&y.0));
1013		(
1014			OffenceSendQueueCursor::<Test>::get(),
1015			sorted.into_iter().map(|(_, v)| v.len() as u32).collect(),
1016		)
1017	}
1018
1019	#[test]
1020	fn append_and_take() {
1021		new_test_ext().execute_with(|| {
1022			let o = (
1023				42,
1024				rc_client::Offence {
1025					offender: 42,
1026					reporters: vec![],
1027					slash_fraction: Perbill::from_percent(10),
1028				},
1029			);
1030			let page_size = <Test as Config>::MaxOffenceBatchSize::get();
1031			assert_eq!(page_size % 2, 0, "page size should be even");
1032
1033			assert_eq!(status(), (0, vec![]));
1034
1035			// --- when empty
1036
1037			assert_eq!(OffenceSendQueue::<Test>::count(), 0);
1038			assert_eq!(OffenceSendQueue::<Test>::pages(), 0);
1039
1040			// get and keep
1041			hypothetically!({
1042				OffenceSendQueue::<Test>::get_and_maybe_delete(|page| {
1043					assert_eq!(page.len(), 0);
1044					Err(())
1045				});
1046				assert_eq!(status(), (0, vec![]));
1047			});
1048
1049			// get and delete
1050			hypothetically!({
1051				OffenceSendQueue::<Test>::get_and_maybe_delete(|page| {
1052					assert_eq!(page.len(), 0);
1053					Ok(())
1054				});
1055				assert_eq!(status(), (0, vec![]));
1056			});
1057
1058			// -------- when 1 page half filled
1059			for _ in 0..page_size / 2 {
1060				OffenceSendQueue::<Test>::append(o.clone());
1061			}
1062			assert_eq!(status(), (0, vec![page_size / 2]));
1063			assert_eq!(OffenceSendQueue::<Test>::count(), page_size / 2);
1064			assert_eq!(OffenceSendQueue::<Test>::pages(), 1);
1065
1066			// get and keep
1067			hypothetically!({
1068				OffenceSendQueue::<Test>::get_and_maybe_delete(|page| {
1069					assert_eq!(page.len() as u32, page_size / 2);
1070					Err(())
1071				});
1072				assert_eq!(status(), (0, vec![page_size / 2]));
1073			});
1074
1075			// get and delete
1076			hypothetically!({
1077				OffenceSendQueue::<Test>::get_and_maybe_delete(|page| {
1078					assert_eq!(page.len() as u32, page_size / 2);
1079					Ok(())
1080				});
1081				assert_eq!(status(), (0, vec![]));
1082				assert_eq!(OffenceSendQueue::<Test>::count(), 0);
1083				assert_eq!(OffenceSendQueue::<Test>::pages(), 0);
1084			});
1085
1086			// -------- when 1 page full
1087			for _ in 0..page_size / 2 {
1088				OffenceSendQueue::<Test>::append(o.clone());
1089			}
1090			assert_eq!(status(), (0, vec![page_size]));
1091			assert_eq!(OffenceSendQueue::<Test>::count(), page_size);
1092			assert_eq!(OffenceSendQueue::<Test>::pages(), 1);
1093
1094			// get and keep
1095			hypothetically!({
1096				OffenceSendQueue::<Test>::get_and_maybe_delete(|page| {
1097					assert_eq!(page.len() as u32, page_size);
1098					Err(())
1099				});
1100				assert_eq!(status(), (0, vec![page_size]));
1101			});
1102
1103			// get and delete
1104			hypothetically!({
1105				OffenceSendQueue::<Test>::get_and_maybe_delete(|page| {
1106					assert_eq!(page.len() as u32, page_size);
1107					Ok(())
1108				});
1109				assert_eq!(status(), (0, vec![]));
1110			});
1111
1112			// -------- when more than 1 page full
1113			OffenceSendQueue::<Test>::append(o.clone());
1114			assert_eq!(status(), (1, vec![page_size, 1]));
1115			assert_eq!(OffenceSendQueue::<Test>::count(), page_size + 1);
1116			assert_eq!(OffenceSendQueue::<Test>::pages(), 2);
1117
1118			// get and keep
1119			hypothetically!({
1120				OffenceSendQueue::<Test>::get_and_maybe_delete(|page| {
1121					assert_eq!(page.len(), 1);
1122					Err(())
1123				});
1124				assert_eq!(status(), (1, vec![page_size, 1]));
1125			});
1126
1127			// get and delete
1128			hypothetically!({
1129				OffenceSendQueue::<Test>::get_and_maybe_delete(|page| {
1130					assert_eq!(page.len(), 1);
1131					Ok(())
1132				});
1133				assert_eq!(status(), (0, vec![page_size]));
1134			});
1135		})
1136	}
1137}