referrerpolicy=no-referrer-when-downgrade

pallet_staking_async_rc_client/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! The client for the relay chain, intended to be used in AssetHub.
19//!
20//! The counter-part for this pallet is `pallet-staking-async-ah-client` on the relay chain.
21//!
22//! This documentation is divided into the following sections:
23//!
24//! 1. Incoming messages: the messages that we receive from the relay chian.
25//! 2. Outgoing messages: the messaged that we sent to the relay chain.
26//! 3. Local interfaces: the interfaces that we expose to other pallets in the runtime.
27//!
28//! ## Incoming Messages
29//!
30//! All incoming messages are handled via [`Call`]. They are all gated to be dispatched only by the
31//! relay chain origin, as per [`Config::RelayChainOrigin`].
32//!
33//! After potential queuing, they are passed to pallet-staking-async via [`AHStakingInterface`].
34//!
35//! The calls are:
36//!
37//! * [`Call::relay_session_report`]: A report from the relay chain, indicating the end of a
38//!   session. We allow ourselves to know an implementation detail: **The ending of session `x`
39//!   always implies start of session `x+1` and planning of session `x+2`.** This allows us to have
40//!   just one message per session.
41//!
42//! > Note that in the code, due to historical reasons, planning of a new session is called
43//! > `new_session`.
44//!
45//! * [`Call::relay_new_offence_paged`]: A report of one or more offences on the relay chain.
46//!
47//! ## Outgoing Messages
48//!
49//! The outgoing messages are expressed in [`SendToRelayChain`].
50//!
51//! ## Local Interfaces
52//!
53//! Within this pallet, we need to talk to the staking-async pallet in AH. This is done via
54//! [`AHStakingInterface`] trait.
55//!
56//! The staking pallet in AH has no communication with session pallet whatsoever, therefore its
57//! implementation of `SessionManager`, and it associated type `SessionInterface` no longer exists.
58//! Moreover, pallet-staking-async no longer has a notion of timestamp locally, and only relies in
59//! the timestamp passed in in the `SessionReport`.
60//!
61//! ## Shared Types
62//!
63//! Note that a number of types need to be shared between this crate and `ah-client`. For now, as a
64//! convention, they are kept in this crate. This can later be decoupled into a shared crate, or
65//! `sp-staking`.
66//!
67//! TODO: the rest should go to staking-async docs.
68//!
69//! ## Session Change
70//!
71//! Further details of how the session change works follows. These details are important to how
72//! `pallet-staking-async` should rotate sessions/eras going forward.
73//!
74//! ### Synchronous Model
75//!
76//! Let's first consider the old school model, when staking and session lived in the same runtime.
77//! Assume 3 sessions is one era.
78//!
79//! The session pallet issues the following events:
80//!
81//! end_session / start_session / new_session (plan session)
82//!
83//! * end 0, start 1, plan 2
84//! * end 1, start 2, plan 3 (new validator set returned)
85//! * end 2, start 3 (new validator set activated), plan 4
86//! * end 3, start 4, plan 5
87//! * end 4, start 5, plan 6 (ah-client to already return validator set) and so on.
88//!
89//! Staking should then do the following:
90//!
91//! * once a request to plan session 3 comes in, it must return a validator set. This is queued
92//!   internally in the session pallet, and is enacted later.
93//! * at the same time, staking increases its notion of `current_era` by 1. Yet, `active_era` is
94//!   intact. This is because the validator elected for era n+1 are not yet active in the session
95//!   pallet.
96//! * once a request to _start_ session 3 comes in, staking will rotate its `active_era` to also be
97//!   incremented to n+1.
98//!
99//! ### Asynchronous Model
100//!
101//! Now, if staking lives in AH and the session pallet lives in the relay chain, how will this look
102//! like?
103//!
104//! Staking knows that by the time the relay-chain session index `3` (and later on `6` and so on) is
105//! _planned_, it must have already returned a validator set via XCM.
106//!
107//! conceptually, staking must:
108//!
109//! - listen to the [`SessionReport`]s coming in, and start a new staking election such that we can
110//!   be sure it is delivered to the RC well before the the message for planning session 3 received.
111//! - Staking should know that, regardless of the timing, these validators correspond to session 3,
112//!   and an upcoming era.
113//! - Staking will keep these pending validators internally within its state.
114//! - Once the message to start session 3 is received, staking will act upon it locally.
115
116#![cfg_attr(not(feature = "std"), no_std)]
117
118extern crate alloc;
119use alloc::{vec, vec::Vec};
120use core::fmt::Display;
121use frame_support::{pallet_prelude::*, storage::transactional::with_transaction_opaque_err};
122use sp_runtime::{traits::Convert, Perbill, TransactionOutcome};
123use sp_staking::SessionIndex;
124use xcm::latest::{send_xcm, Location, SendError, SendXcm, Xcm};
125
126/// Export everything needed for the pallet to be used in the runtime.
127pub use pallet::*;
128
129const LOG_TARGET: &str = "runtime::staking-async::rc-client";
130
131// syntactic sugar for logging.
132#[macro_export]
133macro_rules! log {
134	($level:tt, $patter:expr $(, $values:expr)* $(,)?) => {
135		log::$level!(
136			target: $crate::LOG_TARGET,
137			concat!("[{:?}] โฌ†๏ธ ", $patter), <frame_system::Pallet<T>>::block_number() $(, $values)*
138		)
139	};
140}
141
142/// The communication trait of `pallet-staking-async-rc-client` -> `relay-chain`.
143///
144/// This trait should only encapsulate our _outgoing_ communication to the RC. Any incoming
145/// communication comes it directly via our calls.
146///
147/// In a real runtime, this is implemented via XCM calls, much like how the core-time pallet works.
148/// In a test runtime, it can be wired to direct function calls.
149pub trait SendToRelayChain {
150	/// The validator account ids.
151	type AccountId;
152
153	/// Send a new validator set report to relay chain.
154	#[allow(clippy::result_unit_err)]
155	fn validator_set(report: ValidatorSetReport<Self::AccountId>) -> Result<(), ()>;
156}
157
158#[cfg(feature = "std")]
159impl SendToRelayChain for () {
160	type AccountId = u64;
161	fn validator_set(_report: ValidatorSetReport<Self::AccountId>) -> Result<(), ()> {
162		unimplemented!();
163	}
164}
165
166/// The interface to communicate to asset hub.
167///
168/// This trait should only encapsulate our outgoing communications. Any incoming message is handled
169/// with `Call`s.
170///
171/// In a real runtime, this is implemented via XCM calls, much like how the coretime pallet works.
172/// In a test runtime, it can be wired to direct function call.
173pub trait SendToAssetHub {
174	/// The validator account ids.
175	type AccountId;
176
177	/// Report a session change to AssetHub.
178	///
179	/// Returning `Err(())` means the DMP queue is full, and you should try again in the next block.
180	#[allow(clippy::result_unit_err)]
181	fn relay_session_report(session_report: SessionReport<Self::AccountId>) -> Result<(), ()>;
182
183	#[allow(clippy::result_unit_err)]
184	fn relay_new_offence_paged(
185		offences: Vec<(SessionIndex, Offence<Self::AccountId>)>,
186	) -> Result<(), ()>;
187}
188
189/// A no-op implementation of [`SendToAssetHub`].
190#[cfg(feature = "std")]
191impl SendToAssetHub for () {
192	type AccountId = u64;
193
194	fn relay_session_report(_session_report: SessionReport<Self::AccountId>) -> Result<(), ()> {
195		unimplemented!();
196	}
197
198	fn relay_new_offence_paged(
199		_offences: Vec<(SessionIndex, Offence<Self::AccountId>)>,
200	) -> Result<(), ()> {
201		unimplemented!()
202	}
203}
204
205#[derive(Encode, Decode, DecodeWithMemTracking, Clone, PartialEq, TypeInfo)]
206/// A report about a new validator set. This is sent from AH -> RC.
207pub struct ValidatorSetReport<AccountId> {
208	/// The new validator set.
209	pub new_validator_set: Vec<AccountId>,
210	/// The id of this validator set.
211	///
212	/// Is an always incrementing identifier for this validator set, the activation of which can be
213	/// later pointed to in a `SessionReport`.
214	///
215	/// Implementation detail: within `pallet-staking-async`, this is always set to the
216	/// `planning-era` (aka. `CurrentEra`).
217	pub id: u32,
218	/// Signal the relay chain that it can prune up to this session, and enough eras have passed.
219	///
220	/// This can always have a safety buffer. For example, whatever is a sane value, it can be
221	/// `value - 5`.
222	pub prune_up_to: Option<SessionIndex>,
223	/// Same semantics as [`SessionReport::leftover`].
224	pub leftover: bool,
225}
226
227impl<AccountId: core::fmt::Debug> core::fmt::Debug for ValidatorSetReport<AccountId> {
228	fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
229		f.debug_struct("ValidatorSetReport")
230			.field("new_validator_set", &self.new_validator_set)
231			.field("id", &self.id)
232			.field("prune_up_to", &self.prune_up_to)
233			.field("leftover", &self.leftover)
234			.finish()
235	}
236}
237
238impl<AccountId> core::fmt::Display for ValidatorSetReport<AccountId> {
239	fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
240		f.debug_struct("ValidatorSetReport")
241			.field("new_validator_set", &self.new_validator_set.len())
242			.field("id", &self.id)
243			.field("prune_up_to", &self.prune_up_to)
244			.field("leftover", &self.leftover)
245			.finish()
246	}
247}
248
249impl<AccountId> ValidatorSetReport<AccountId> {
250	/// A new instance of self that is terminal. This is useful when we want to send everything in
251	/// one go.
252	pub fn new_terminal(
253		new_validator_set: Vec<AccountId>,
254		id: u32,
255		prune_up_to: Option<SessionIndex>,
256	) -> Self {
257		Self { new_validator_set, id, prune_up_to, leftover: false }
258	}
259
260	/// Merge oneself with another instance.
261	pub fn merge(mut self, other: Self) -> Result<Self, UnexpectedKind> {
262		if self.id != other.id || self.prune_up_to != other.prune_up_to {
263			// Must be some bug -- don't merge.
264			return Err(UnexpectedKind::ValidatorSetIntegrityFailed);
265		}
266		self.new_validator_set.extend(other.new_validator_set);
267		self.leftover = other.leftover;
268		Ok(self)
269	}
270
271	/// Split self into chunks of `chunk_size` element.
272	pub fn split(self, chunk_size: usize) -> Vec<Self>
273	where
274		AccountId: Clone,
275	{
276		let splitted_points = self.new_validator_set.chunks(chunk_size.max(1)).map(|x| x.to_vec());
277		let mut parts = splitted_points
278			.into_iter()
279			.map(|new_validator_set| Self { new_validator_set, leftover: true, ..self })
280			.collect::<Vec<_>>();
281		if let Some(x) = parts.last_mut() {
282			x.leftover = false
283		}
284		parts
285	}
286}
287
288#[derive(Encode, Decode, DecodeWithMemTracking, Clone, PartialEq, TypeInfo, MaxEncodedLen)]
289/// The information that is sent from RC -> AH on session end.
290pub struct SessionReport<AccountId> {
291	/// The session that is ending.
292	///
293	/// This always implies start of `end_index + 1`, and planning of `end_index + 2`.
294	pub end_index: SessionIndex,
295	/// All of the points that validators have accumulated.
296	///
297	/// This can be either from block authoring, or from parachain consensus, or anything else.
298	pub validator_points: Vec<(AccountId, u32)>,
299	/// If none, it means no new validator set was activated as a part of this session.
300	///
301	/// If `Some((timestamp, id))`, it means that the new validator set was activated at the given
302	/// timestamp, and the id of the validator set is `id`.
303	///
304	/// This `id` is what was previously communicated to the RC as a part of
305	/// [`ValidatorSetReport::id`].
306	pub activation_timestamp: Option<(u64, u32)>,
307	/// If this session report is self-contained, then it is false.
308	///
309	/// If this session report has some leftover, it should not be acted upon until a subsequent
310	/// message with `leftover = true` comes in. The client pallets should handle this queuing.
311	///
312	/// This is in place to future proof us against possibly needing to send multiple rounds of
313	/// messages to convey all of the `validator_points`.
314	///
315	/// Upon processing, this should always be true, and it should be ignored.
316	pub leftover: bool,
317}
318
319impl<AccountId: core::fmt::Debug> core::fmt::Debug for SessionReport<AccountId> {
320	fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
321		f.debug_struct("SessionReport")
322			.field("end_index", &self.end_index)
323			.field("validator_points", &self.validator_points)
324			.field("activation_timestamp", &self.activation_timestamp)
325			.field("leftover", &self.leftover)
326			.finish()
327	}
328}
329
330impl<AccountId> core::fmt::Display for SessionReport<AccountId> {
331	fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
332		f.debug_struct("SessionReport")
333			.field("end_index", &self.end_index)
334			.field("validator_points", &self.validator_points.len())
335			.field("activation_timestamp", &self.activation_timestamp)
336			.field("leftover", &self.leftover)
337			.finish()
338	}
339}
340
341impl<AccountId> SessionReport<AccountId> {
342	/// A new instance of self that is terminal. This is useful when we want to send everything in
343	/// one go.
344	pub fn new_terminal(
345		end_index: SessionIndex,
346		validator_points: Vec<(AccountId, u32)>,
347		activation_timestamp: Option<(u64, u32)>,
348	) -> Self {
349		Self { end_index, validator_points, activation_timestamp, leftover: false }
350	}
351
352	/// Merge oneself with another instance.
353	pub fn merge(mut self, other: Self) -> Result<Self, UnexpectedKind> {
354		if self.end_index != other.end_index ||
355			self.activation_timestamp != other.activation_timestamp
356		{
357			// Must be some bug -- don't merge.
358			return Err(UnexpectedKind::SessionReportIntegrityFailed);
359		}
360		self.validator_points.extend(other.validator_points);
361		self.leftover = other.leftover;
362		Ok(self)
363	}
364
365	/// Split oneself into `count` number of pieces.
366	pub fn split(self, chunk_size: usize) -> Vec<Self>
367	where
368		AccountId: Clone,
369	{
370		let splitted_points = self.validator_points.chunks(chunk_size.max(1)).map(|x| x.to_vec());
371		let mut parts = splitted_points
372			.into_iter()
373			.map(|validator_points| Self { validator_points, leftover: true, ..self })
374			.collect::<Vec<_>>();
375		if let Some(x) = parts.last_mut() {
376			x.leftover = false
377		}
378		parts
379	}
380}
381
382/// A trait to encapsulate messages between RC and AH that can be splitted into smaller chunks.
383///
384/// Implemented for [`SessionReport`] and [`ValidatorSetReport`].
385#[allow(clippy::len_without_is_empty)]
386pub trait SplittableMessage: Sized {
387	/// Split yourself into pieces of `chunk_size` size.
388	fn split_by(self, chunk_size: usize) -> Vec<Self>;
389
390	/// Current length of the message.
391	fn len(&self) -> usize;
392}
393
394impl<AccountId: Clone> SplittableMessage for SessionReport<AccountId> {
395	fn split_by(self, chunk_size: usize) -> Vec<Self> {
396		self.split(chunk_size)
397	}
398	fn len(&self) -> usize {
399		self.validator_points.len()
400	}
401}
402
403impl<AccountId: Clone> SplittableMessage for ValidatorSetReport<AccountId> {
404	fn split_by(self, chunk_size: usize) -> Vec<Self> {
405		self.split(chunk_size)
406	}
407	fn len(&self) -> usize {
408		self.new_validator_set.len()
409	}
410}
411
412/// Common utility to send XCM messages that can use [`SplittableMessage`].
413///
414/// It can be used both in the RC and AH. `Message` is the splittable message type, and `ToXcm`
415/// should be configured by the user, converting `message` to a valida `Xcm<()>`. It should utilize
416/// the correct call indices, which we only know at the runtime level.
417pub struct XCMSender<Sender, Destination, Message, ToXcm>(
418	core::marker::PhantomData<(Sender, Destination, Message, ToXcm)>,
419);
420
421impl<Sender, Destination, Message, ToXcm> XCMSender<Sender, Destination, Message, ToXcm>
422where
423	Sender: SendXcm,
424	Destination: Get<Location>,
425	Message: Clone + Encode,
426	ToXcm: Convert<Message, Xcm<()>>,
427{
428	/// Send the message single-shot; no splitting.
429	///
430	/// Useful for sending messages that are already paged/chunked, so we are sure that they fit in
431	/// one message.
432	#[allow(clippy::result_unit_err)]
433	pub fn send(message: Message) -> Result<(), ()> {
434		let xcm = ToXcm::convert(message);
435		let dest = Destination::get();
436		// send_xcm already calls validate internally
437		send_xcm::<Sender>(dest, xcm).map(|_| ()).map_err(|_| ())
438	}
439}
440
441impl<Sender, Destination, Message, ToXcm> XCMSender<Sender, Destination, Message, ToXcm>
442where
443	Sender: SendXcm,
444	Destination: Get<Location>,
445	Message: SplittableMessage + Display + Clone + Encode,
446	ToXcm: Convert<Message, Xcm<()>>,
447{
448	/// Safe send method to send a `message`, while validating it and using [`SplittableMessage`] to
449	/// split it into smaller pieces if XCM validation fails with `ExceedsMaxMessageSize`. It will
450	/// fail on other errors.
451	///
452	/// Returns `Ok()` if the message was sent using `XCM`, potentially with splitting up to
453	/// `maybe_max_step` times, `Err(())` otherwise.
454	#[deprecated(
455		note = "all staking related VMP messages should fit the single message limits. Should not be used."
456	)]
457	#[allow(clippy::result_unit_err)]
458	pub fn split_then_send(message: Message, maybe_max_steps: Option<u32>) -> Result<(), ()> {
459		let message_type_name = core::any::type_name::<Message>();
460		let dest = Destination::get();
461		let xcms = Self::prepare(message, maybe_max_steps).map_err(|e| {
462			log::error!(target: "runtime::staking-async::rc-client", "๐Ÿ“จ Failed to split message {}: {:?}", message_type_name, e);
463		})?;
464
465		match with_transaction_opaque_err(|| {
466			let all_sent = xcms.into_iter().enumerate().try_for_each(|(idx, xcm)| {
467				log::debug!(target: "runtime::staking-async::rc-client", "๐Ÿ“จ sending {} message index {}, size: {:?}", message_type_name, idx, xcm.encoded_size());
468				send_xcm::<Sender>(dest.clone(), xcm).map(|_| {
469					log::debug!(target: "runtime::staking-async::rc-client", "๐Ÿ“จ Successfully sent {} message part {} to relay chain", message_type_name,  idx);
470				}).inspect_err(|e| {
471					log::error!(target: "runtime::staking-async::rc-client", "๐Ÿ“จ Failed to send {} message to relay chain: {:?}", message_type_name, e);
472				})
473			});
474
475			match all_sent {
476				Ok(()) => TransactionOutcome::Commit(Ok(())),
477				Err(send_err) => TransactionOutcome::Rollback(Err(send_err)),
478			}
479		}) {
480			// just like https://doc.rust-lang.org/src/core/result.rs.html#1746 which I cannot use yet because not in 1.89
481			Ok(inner) => inner.map_err(|_| ()),
482			// unreachable; `with_transaction_opaque_err` always returns `Ok(inner)`
483			Err(_) => Err(()),
484		}
485	}
486
487	fn prepare(message: Message, maybe_max_steps: Option<u32>) -> Result<Vec<Xcm<()>>, SendError> {
488		// initial chunk size is the entire thing, so it will be a vector of 1 item.
489		let mut chunk_size = message.len();
490		let mut steps = 0;
491
492		loop {
493			let current_messages = message.clone().split_by(chunk_size);
494
495			// the first message is the heaviest, the last one might be smaller.
496			let first_message = if let Some(r) = current_messages.first() {
497				r
498			} else {
499				log::debug!(target: "runtime::staking-async::xcm", "๐Ÿ“จ unexpected: no messages to send");
500				return Ok(vec![]);
501			};
502
503			log::debug!(
504				target: "runtime::staking-async::xcm",
505				"๐Ÿ“จ step: {:?}, chunk_size: {:?}, message_size: {:?}",
506				steps,
507				chunk_size,
508				first_message.encoded_size(),
509			);
510
511			let first_xcm = ToXcm::convert(first_message.clone());
512			match <Sender as SendXcm>::validate(&mut Some(Destination::get()), &mut Some(first_xcm))
513			{
514				Ok((_ticket, price)) => {
515					log::debug!(target: "runtime::staking-async::xcm", "๐Ÿ“จ validated, price: {:?}", price);
516					return Ok(current_messages.into_iter().map(ToXcm::convert).collect::<Vec<_>>());
517				},
518				Err(SendError::ExceedsMaxMessageSize) => {
519					log::debug!(target: "runtime::staking-async::xcm", "๐Ÿ“จ ExceedsMaxMessageSize -- reducing chunk_size");
520					chunk_size = chunk_size.saturating_div(2);
521					steps += 1;
522					if maybe_max_steps.is_some_and(|max_steps| steps > max_steps) ||
523						chunk_size.is_zero()
524					{
525						log::error!(target: "runtime::staking-async::xcm", "๐Ÿ“จ Exceeded max steps or chunk_size = 0");
526						return Err(SendError::ExceedsMaxMessageSize);
527					} else {
528						// try again with the new `chunk_size`
529						continue;
530					}
531				},
532				Err(other) => {
533					log::error!(target: "runtime::staking-async::xcm", "๐Ÿ“จ other error -- cannot send XCM: {:?}", other);
534					return Err(other);
535				},
536			}
537		}
538	}
539}
540
541/// Our communication trait of `pallet-staking-async-rc-client` -> `pallet-staking-async`.
542///
543/// This is merely a shorthand to avoid tightly-coupling the staking pallet to this pallet. It
544/// limits what we can say to `pallet-staking-async` to only these functions.
545pub trait AHStakingInterface {
546	/// The validator account id type.
547	type AccountId;
548	/// Maximum number of validators that the staking system may have.
549	type MaxValidatorSet: Get<u32>;
550
551	/// New session report from the relay chain.
552	fn on_relay_session_report(report: SessionReport<Self::AccountId>) -> Weight;
553
554	/// Return the weight of `on_relay_session_report` call without executing it.
555	///
556	/// This will return the worst case estimate of the weight. The actual execution will return the
557	/// accurate amount.
558	fn weigh_on_relay_session_report(report: &SessionReport<Self::AccountId>) -> Weight;
559
560	/// Report one or more offences on the relay chain.
561	fn on_new_offences(
562		slash_session: SessionIndex,
563		offences: Vec<Offence<Self::AccountId>>,
564	) -> Weight;
565
566	/// Return the weight of `on_new_offences` call without executing it.
567	///
568	/// This will return the worst case estimate of the weight. The actual execution will return the
569	/// accurate amount.
570	fn weigh_on_new_offences(offence_count: u32) -> Weight;
571
572	/// Get the active era's start session index.
573	///
574	/// Returns the first session index of the currently active era.
575	fn active_era_start_session_index() -> SessionIndex;
576}
577
578/// The communication trait of `pallet-staking-async` -> `pallet-staking-async-rc-client`.
579pub trait RcClientInterface {
580	/// The validator account ids.
581	type AccountId;
582
583	/// Report a new validator set.
584	fn validator_set(new_validator_set: Vec<Self::AccountId>, id: u32, prune_up_tp: Option<u32>);
585}
586
587/// An offence on the relay chain. Based on [`sp_staking::offence::OffenceDetails`].
588#[derive(Encode, Decode, DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo)]
589pub struct Offence<AccountId> {
590	/// The offender.
591	pub offender: AccountId,
592	/// Those who have reported this offence.
593	pub reporters: Vec<AccountId>,
594	/// The amount that they should be slashed.
595	pub slash_fraction: Perbill,
596}
597
598#[frame_support::pallet]
599pub mod pallet {
600	use super::*;
601	use frame_system::pallet_prelude::{BlockNumberFor, *};
602
603	/// The in-code storage version.
604	const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
605
606	/// An incomplete incoming session report that we have not acted upon yet.
607	// Note: this can remain unbounded, as the internals of `AHStakingInterface` is benchmarked, and
608	// is worst case.
609	#[pallet::storage]
610	#[pallet::unbounded]
611	pub type IncompleteSessionReport<T: Config> =
612		StorageValue<_, SessionReport<T::AccountId>, OptionQuery>;
613
614	/// The last session report's `end_index` that we have acted upon.
615	///
616	/// This allows this pallet to ensure a sequentially increasing sequence of session reports
617	/// passed to staking.
618	///
619	/// Note that with the XCM being the backbone of communication, we have a guarantee on the
620	/// ordering of messages. As long as the RC sends session reports in order, we _eventually_
621	/// receive them in the same correct order as well.
622	#[pallet::storage]
623	pub type LastSessionReportEndingIndex<T: Config> = StorageValue<_, SessionIndex, OptionQuery>;
624
625	/// A validator set that is outgoing, and should be sent.
626	///
627	/// This will be attempted to be sent, possibly on every `on_initialize` call, until it is sent,
628	/// or the second value reaches zero, at which point we drop it.
629	#[pallet::storage]
630	// TODO: for now we know this ValidatorSetReport is at most validator-count * 32, and we don't
631	// need its MEL critically.
632	#[pallet::unbounded]
633	pub type OutgoingValidatorSet<T: Config> =
634		StorageValue<_, (ValidatorSetReport<T::AccountId>, u32), OptionQuery>;
635
636	#[pallet::pallet]
637	#[pallet::storage_version(STORAGE_VERSION)]
638	pub struct Pallet<T>(_);
639
640	#[pallet::hooks]
641	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
642		fn on_initialize(_: BlockNumberFor<T>) -> Weight {
643			let mut weight = T::DbWeight::get().reads(1);
644
645			// Early return if no validator set to export
646			if !OutgoingValidatorSet::<T>::exists() {
647				return weight;
648			}
649
650			// Determine if we should export based on session offset
651			let should_export = if T::ValidatorSetExportSession::get() == 0 {
652				// Immediate export mode
653				true
654			} else {
655				// Check if we've reached the target session offset
656				weight.saturating_accrue(T::DbWeight::get().reads(2));
657
658				let last_session_end = LastSessionReportEndingIndex::<T>::get().unwrap_or(0);
659				let last_era_ending_index =
660					T::AHStakingInterface::active_era_start_session_index().saturating_sub(1);
661				let session_offset = last_session_end.saturating_sub(last_era_ending_index);
662
663				session_offset >= T::ValidatorSetExportSession::get()
664			};
665
666			if !should_export {
667				// validator set buffered until target session offset
668				return weight;
669			}
670
671			// good time to export the latest elected validator set
672			weight.saturating_accrue(T::DbWeight::get().reads_writes(1, 1));
673			if let Some((report, retries_left)) = OutgoingValidatorSet::<T>::take() {
674				// Export the validator set
675				weight.saturating_accrue(T::DbWeight::get().writes(1));
676				match T::SendToRelayChain::validator_set(report.clone()) {
677					Ok(()) => {
678						log::debug!(
679							target: LOG_TARGET,
680							"Exported validator set to RC for Era: {}",
681							report.id,
682						);
683					},
684					Err(()) => {
685						log!(error, "Failed to send validator set report to relay chain");
686						weight.saturating_accrue(T::DbWeight::get().writes(1));
687						Self::deposit_event(Event::<T>::Unexpected(
688							UnexpectedKind::ValidatorSetSendFailed,
689						));
690
691						if let Some(new_retries_left) = retries_left.checked_sub(One::one()) {
692							weight.saturating_accrue(T::DbWeight::get().writes(1));
693							OutgoingValidatorSet::<T>::put((report, new_retries_left));
694						} else {
695							weight.saturating_accrue(T::DbWeight::get().writes(1));
696							Self::deposit_event(Event::<T>::Unexpected(
697								UnexpectedKind::ValidatorSetDropped,
698							));
699						}
700					},
701				}
702			} else {
703				defensive!("OutgoingValidatorSet checked already, must exist.");
704			}
705
706			weight
707		}
708	}
709
710	#[pallet::config]
711	pub trait Config: frame_system::Config {
712		/// An origin type that allows us to be sure a call is being dispatched by the relay chain.
713		///
714		/// It be can be configured to something like `Root` or relay chain or similar.
715		type RelayChainOrigin: EnsureOrigin<Self::RuntimeOrigin>;
716
717		/// Our communication handle to the local staking pallet.
718		type AHStakingInterface: AHStakingInterface<AccountId = Self::AccountId>;
719
720		/// Our communication handle to the relay chain.
721		type SendToRelayChain: SendToRelayChain<AccountId = Self::AccountId>;
722
723		/// Maximum number of times that we retry sending a validator set to RC, after which, if
724		/// sending still fails, we emit an [`UnexpectedKind::ValidatorSetDropped`] event and drop
725		/// it.
726		type MaxValidatorSetRetries: Get<u32>;
727
728		/// The end session index within an era post which we export validator set to RC.
729		///
730		/// This is a 1-indexed session number relative to the era start:
731		/// - 0 = export immediately when received from staking pallet
732		/// - 1 = export at end of first session of era
733		/// - 5 = export at end of 5th session of era (for 6-session eras)
734		///
735		/// The validator set is placed in `OutgoingValidatorSet` when election completes
736		/// in `pallet-staking-async`. The XCM message is sent when BOTH conditions met:
737		/// 1. Current session offset >= `ValidatorSetExportSession`
738		/// 2. `OutgoingValidatorSet` exists (validator set buffered)
739		///
740		/// Setting to 0 bypasses the session check and exports immediately.
741		///
742		/// Example: With `SessionsPerEra=6` and `ValidatorSetExportSession=4`:
743		/// - Session 0: Election completes โ†’ validator set buffered in `OutgoingValidatorSet`
744		/// - Sessions 1-4: Buffered (session offset < 5)
745		/// - End of Session 4 and start of Session 5: Export triggered.
746		///
747		/// Must be < SessionsPerEra.
748		type ValidatorSetExportSession: Get<SessionIndex>;
749	}
750
751	#[pallet::event]
752	#[pallet::generate_deposit(pub(crate) fn deposit_event)]
753	pub enum Event<T: Config> {
754		/// A said session report was received.
755		SessionReportReceived {
756			end_index: SessionIndex,
757			activation_timestamp: Option<(u64, u32)>,
758			validator_points_counts: u32,
759			leftover: bool,
760		},
761		/// A new offence was reported.
762		OffenceReceived { slash_session: SessionIndex, offences_count: u32 },
763		/// Something occurred that should never happen under normal operation.
764		/// Logged as an event for fail-safe observability.
765		Unexpected(UnexpectedKind),
766	}
767
768	/// Represents unexpected or invariant-breaking conditions encountered during execution.
769	///
770	/// These variants are emitted as [`Event::Unexpected`] and indicate a defensive check has
771	/// failed. While these should never occur under normal operation, they are useful for
772	/// diagnosing issues in production or test environments.
773	#[derive(Clone, Encode, Decode, DecodeWithMemTracking, PartialEq, TypeInfo, Debug)]
774	pub enum UnexpectedKind {
775		/// We could not merge the chunks, and therefore dropped the session report.
776		SessionReportIntegrityFailed,
777		/// We could not merge the chunks, and therefore dropped the validator set.
778		ValidatorSetIntegrityFailed,
779		/// The received session index is more than what we expected.
780		SessionSkipped,
781		/// A session in the past was received. This will not raise any errors, just emit an event
782		/// and stop processing the report.
783		SessionAlreadyProcessed,
784		/// A validator set failed to be sent to RC.
785		///
786		/// We will store, and retry it for [`Config::MaxValidatorSetRetries`] future blocks.
787		ValidatorSetSendFailed,
788		/// A validator set was dropped.
789		ValidatorSetDropped,
790	}
791
792	impl<T: Config> RcClientInterface for Pallet<T> {
793		type AccountId = T::AccountId;
794
795		fn validator_set(
796			new_validator_set: Vec<Self::AccountId>,
797			id: u32,
798			prune_up_tp: Option<u32>,
799		) {
800			let report = ValidatorSetReport::new_terminal(new_validator_set, id, prune_up_tp);
801			// just store the report to be outgoing, it will be sent in the next on-init.
802			OutgoingValidatorSet::<T>::put((report, T::MaxValidatorSetRetries::get()));
803		}
804	}
805
806	#[pallet::call]
807	impl<T: Config> Pallet<T> {
808		/// Called to indicate the start of a new session on the relay chain.
809		#[pallet::call_index(0)]
810		#[pallet::weight(
811			// `LastSessionReportEndingIndex`: rw
812			// `IncompleteSessionReport`: rw
813			T::DbWeight::get().reads_writes(2, 2) + T::AHStakingInterface::weigh_on_relay_session_report(report)
814		)]
815		pub fn relay_session_report(
816			origin: OriginFor<T>,
817			report: SessionReport<T::AccountId>,
818		) -> DispatchResultWithPostInfo {
819			log!(debug, "Received session report: {}", report);
820			T::RelayChainOrigin::ensure_origin_or_root(origin)?;
821			let local_weight = T::DbWeight::get().reads_writes(2, 2);
822
823			match LastSessionReportEndingIndex::<T>::get() {
824				None => {
825					// first session report post genesis, okay.
826				},
827				Some(last) if report.end_index == last + 1 => {
828					// incremental -- good
829				},
830				Some(last) if report.end_index > last + 1 => {
831					// deposit a warning event, but proceed
832					Self::deposit_event(Event::Unexpected(UnexpectedKind::SessionSkipped));
833					log!(
834						warn,
835						"Session report end index is more than expected. last_index={:?}, report.index={:?}",
836						last,
837						report.end_index
838					);
839				},
840				Some(past) => {
841					log!(
842						error,
843						"Session report end index is not valid. last_index={:?}, report.index={:?}",
844						past,
845						report.end_index
846					);
847					Self::deposit_event(Event::Unexpected(UnexpectedKind::SessionAlreadyProcessed));
848					IncompleteSessionReport::<T>::kill();
849					return Ok(Some(local_weight).into());
850				},
851			}
852
853			Self::deposit_event(Event::SessionReportReceived {
854				end_index: report.end_index,
855				activation_timestamp: report.activation_timestamp,
856				validator_points_counts: report.validator_points.len() as u32,
857				leftover: report.leftover,
858			});
859
860			// If we have anything previously buffered, then merge it.
861			let maybe_new_session_report = match IncompleteSessionReport::<T>::take() {
862				Some(old) => old.merge(report.clone()),
863				None => Ok(report),
864			};
865
866			if let Err(e) = maybe_new_session_report {
867				Self::deposit_event(Event::Unexpected(e));
868				debug_assert!(
869					IncompleteSessionReport::<T>::get().is_none(),
870					"we have ::take() it above, we don't want to keep the old data"
871				);
872				return Ok(().into());
873			}
874			let new_session_report = maybe_new_session_report.expect("checked above; qed");
875
876			if new_session_report.leftover {
877				// this is still not final -- buffer it.
878				IncompleteSessionReport::<T>::put(new_session_report);
879				Ok(().into())
880			} else {
881				// this is final, report it.
882				LastSessionReportEndingIndex::<T>::put(new_session_report.end_index);
883
884				let weight = T::AHStakingInterface::on_relay_session_report(new_session_report);
885				Ok((Some(local_weight + weight)).into())
886			}
887		}
888
889		#[pallet::call_index(1)]
890		#[pallet::weight(
891			T::AHStakingInterface::weigh_on_new_offences(offences.len() as u32)
892		)]
893		pub fn relay_new_offence_paged(
894			origin: OriginFor<T>,
895			offences: Vec<(SessionIndex, Offence<T::AccountId>)>,
896		) -> DispatchResultWithPostInfo {
897			T::RelayChainOrigin::ensure_origin_or_root(origin)?;
898			log!(info, "Received new page of {} offences", offences.len());
899
900			let mut offences_by_session =
901				alloc::collections::BTreeMap::<SessionIndex, Vec<Offence<T::AccountId>>>::new();
902			for (session_index, offence) in offences {
903				offences_by_session.entry(session_index).or_default().push(offence);
904			}
905
906			let mut weight: Weight = Default::default();
907			for (slash_session, offences) in offences_by_session {
908				Self::deposit_event(Event::OffenceReceived {
909					slash_session,
910					offences_count: offences.len() as u32,
911				});
912				let new_weight = T::AHStakingInterface::on_new_offences(slash_session, offences);
913				weight.saturating_accrue(new_weight)
914			}
915
916			Ok(Some(weight).into())
917		}
918	}
919}