referrerpolicy=no-referrer-when-downgrade

cumulus_pallet_parachain_system/
parachain_inherent.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17//! Cumulus parachain inherent related structures.
18
19use alloc::{collections::btree_map::BTreeMap, vec, vec::Vec};
20use core::fmt::Debug;
21use cumulus_primitives_core::{
22	relay_chain::{
23		ApprovedPeerId, BlockNumber as RelayChainBlockNumber, BlockNumber, Header as RelayHeader,
24	},
25	InboundDownwardMessage, InboundHrmpMessage, ParaId, PersistedValidationData,
26};
27use cumulus_primitives_parachain_inherent::{HashedMessage, ParachainInherentData};
28use frame_support::{
29	defensive,
30	pallet_prelude::{Decode, DecodeWithMemTracking, Encode},
31};
32use scale_info::TypeInfo;
33use sp_core::{bounded::BoundedSlice, Get};
34
35/// A structure that helps identify a message inside a collection of messages sorted by `sent_at`.
36///
37/// This structure contains a `sent_at` field and a reverse index. Using this information, we can
38/// identify a message inside a sorted collection by walking back `reverse_idx` positions starting
39/// from the last message that has the provided `sent_at`.
40///
41/// We use a reverse index instead of a normal index because sometimes the messages at the
42/// beginning of the collection are being pruned.
43///
44/// # Example
45///
46///
47/// For the collection
48/// `msgs = [{sent_at: 0}, {sent_at: 1}, {sent_at: 1}, {sent_at: 1}, {sent_at: 1}, {sent_at: 3}]`
49///
50/// `InboundMessageId {sent_at: 1, reverse_idx: 0}` points to `msgs[4]`
51/// `InboundMessageId {sent_at: 1, reverse_idx: 3}` points to `msgs[1]`
52/// `InboundMessageId {sent_at: 1, reverse_idx: 4}` points to `msgs[0]`
53#[derive(Encode, Decode, DecodeWithMemTracking, Clone, Default, Debug, PartialEq, TypeInfo)]
54pub struct InboundMessageId {
55	/// The block number at which this message was added to the message passing queue
56	/// on the relay chain.
57	pub sent_at: BlockNumber,
58	/// The reverse index of the message in the collection of messages sent at `sent_at`.
59	pub reverse_idx: u32,
60}
61
62/// A message that was received by the parachain.
63pub trait InboundMessage {
64	/// The corresponding compressed message.
65	/// This should be an equivalent message that stores the same metadata as the current message,
66	/// but stores only a hash of the message data.
67	type CompressedMessage: Debug;
68
69	/// Gets the message data.
70	fn data(&self) -> &[u8];
71
72	/// Gets the relay chain number where the current message was pushed to the corresponding
73	/// relay chain queue.
74	fn sent_at(&self) -> RelayChainBlockNumber;
75
76	/// Converts the current message into a `CompressedMessage`
77	fn to_compressed(&self) -> Self::CompressedMessage;
78}
79
80/// A collection of inbound messages.
81#[derive(
82	codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
83)]
84pub struct InboundMessagesCollection<Message: InboundMessage> {
85	messages: Vec<Message>,
86}
87
88impl<Message: InboundMessage> InboundMessagesCollection<Message> {
89	/// Creates a new instance of `InboundMessagesCollection` that contains the provided `messages`.
90	pub fn new(messages: Vec<Message>) -> Self {
91		Self { messages }
92	}
93
94	/// Drop all the messages up to `last_processed_msg`.
95	pub fn drop_processed_messages(&mut self, last_processed_msg: &InboundMessageId) {
96		let mut last_processed_msg_idx = None;
97		let messages = &mut self.messages;
98		for (idx, message) in messages.iter().enumerate().rev() {
99			let sent_at = message.sent_at();
100			if sent_at == last_processed_msg.sent_at {
101				last_processed_msg_idx = idx.checked_sub(last_processed_msg.reverse_idx as usize);
102				break;
103			}
104			// If we build on the same relay parent twice, we will receive the same messages again
105			// while `last_processed_msg` may have been increased. We need this check to make sure
106			// that the old messages are dropped.
107			if sent_at < last_processed_msg.sent_at {
108				last_processed_msg_idx = Some(idx);
109				break;
110			}
111		}
112		if let Some(last_processed_msg_idx) = last_processed_msg_idx {
113			messages.drain(..=last_processed_msg_idx);
114		}
115	}
116
117	/// Converts `self` into an [`AbridgedInboundMessagesCollection`].
118	///
119	/// The first messages in `self` (up to the provided `size_limit`) are kept in their current
120	/// form (they will contain the full message data).
121	/// The messages that exceed that limit are hashed.
122	pub fn into_abridged(
123		self,
124		size_limit: &mut usize,
125	) -> AbridgedInboundMessagesCollection<Message> {
126		let mut messages = self.messages;
127
128		let mut split_off_pos = messages.len();
129		for (idx, message) in messages.iter().enumerate() {
130			if *size_limit < message.data().len() {
131				break;
132			}
133			*size_limit -= message.data().len();
134
135			split_off_pos = idx + 1;
136		}
137
138		let extra_messages = messages.split_off(split_off_pos);
139		let hashed_messages = extra_messages.iter().map(|msg| msg.to_compressed()).collect();
140
141		AbridgedInboundMessagesCollection { full_messages: messages, hashed_messages }
142	}
143}
144
145/// A struct containing some info about the expected size of the abridged inbound messages.
146pub struct AbridgedInboundMessagesSizeInfo {
147	/// The max size of the full messages collection
148	pub max_full_messages_size: usize,
149	/// The max size of the first hashed message
150	pub first_hashed_msg_max_size: usize,
151}
152
153/// A compressed collection of inbound messages.
154///
155/// The first messages in the collection (up to a limit) contain the full message data.
156/// The messages that exceed that limit are hashed.
157#[derive(
158	codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
159)]
160pub struct AbridgedInboundMessagesCollection<Message: InboundMessage> {
161	full_messages: Vec<Message>,
162	hashed_messages: Vec<Message::CompressedMessage>,
163}
164
165impl<Message: InboundMessage> AbridgedInboundMessagesCollection<Message> {
166	/// Gets a tuple containing both the full messages and the hashed messages
167	/// stored by the current collection.
168	pub fn messages(&self) -> (&[Message], &[Message::CompressedMessage]) {
169		(&self.full_messages, &self.hashed_messages)
170	}
171
172	/// Check that the current collection contains at least 1 full message if needed.
173	pub fn check_enough_messages_included_basic(&self, collection_name: &str) {
174		if self.hashed_messages.is_empty() {
175			return;
176		}
177
178		// Here we just check that there is at least 1 full message.
179		assert!(
180			self.full_messages.len() >= 1,
181			"[{}] Advancement rule violation: full messages missing",
182			collection_name,
183		);
184	}
185
186	/// Check that the current collection contains as many full messages as possible, taking into
187	/// consideration the collection constraints.
188	///
189	/// The `AbridgedInboundMessagesCollection` is provided to the runtime by a collator.
190	/// A malicious collator can provide a collection that contains no full messages or fewer
191	/// full messages than possible, leading to censorship.
192	pub fn check_enough_messages_included_advanced(
193		&self,
194		collection_name: &str,
195		size_info: AbridgedInboundMessagesSizeInfo,
196	) {
197		// We should check that the collection contains as many full messages as possible
198		// without exceeding the max expected size.
199		let AbridgedInboundMessagesSizeInfo { max_full_messages_size, first_hashed_msg_max_size } =
200			size_info;
201
202		let mut full_messages_size = 0usize;
203		for msg in &self.full_messages {
204			full_messages_size = full_messages_size.saturating_add(msg.data().len());
205		}
206
207		// The worst case scenario is that were the first message that had to be hashed
208		// is a max size message.
209		assert!(
210			full_messages_size.saturating_add(first_hashed_msg_max_size) > max_full_messages_size,
211			"[{}] Advancement rule violation: full messages size smaller than expected. \
212			full msgs size: {}, first hashed msg max size: {}, max full msgs size: {}",
213			collection_name,
214			full_messages_size,
215			first_hashed_msg_max_size,
216			max_full_messages_size
217		);
218	}
219}
220
221impl<Message: InboundMessage> Default for AbridgedInboundMessagesCollection<Message> {
222	fn default() -> Self {
223		Self { full_messages: vec![], hashed_messages: vec![] }
224	}
225}
226
227impl InboundMessage for InboundDownwardMessage<RelayChainBlockNumber> {
228	type CompressedMessage = HashedMessage;
229
230	fn data(&self) -> &[u8] {
231		&self.msg
232	}
233
234	fn sent_at(&self) -> RelayChainBlockNumber {
235		self.sent_at
236	}
237
238	fn to_compressed(&self) -> Self::CompressedMessage {
239		self.into()
240	}
241}
242
243pub type InboundDownwardMessages =
244	InboundMessagesCollection<InboundDownwardMessage<RelayChainBlockNumber>>;
245
246pub type AbridgedInboundDownwardMessages =
247	AbridgedInboundMessagesCollection<InboundDownwardMessage<RelayChainBlockNumber>>;
248
249impl AbridgedInboundDownwardMessages {
250	/// Returns an iterator over the messages that maps them to `BoundedSlices`.
251	pub fn bounded_msgs_iter<MaxMessageLen: Get<u32>>(
252		&self,
253	) -> impl Iterator<Item = BoundedSlice<'_, u8, MaxMessageLen>> {
254		self.full_messages
255			.iter()
256			// Note: we are not using `.defensive()` here since that prints the whole value to
257			// console. In case that the message is too long, this clogs up the log quite badly.
258			.filter_map(|m| match BoundedSlice::try_from(&m.msg[..]) {
259				Ok(bounded) => Some(bounded),
260				Err(_) => {
261					defensive!("Inbound Downward message was too long; dropping");
262					None
263				},
264			})
265	}
266}
267
268impl InboundMessage for (ParaId, InboundHrmpMessage) {
269	type CompressedMessage = (ParaId, HashedMessage);
270
271	fn data(&self) -> &[u8] {
272		&self.1.data
273	}
274
275	fn sent_at(&self) -> RelayChainBlockNumber {
276		self.1.sent_at
277	}
278
279	fn to_compressed(&self) -> Self::CompressedMessage {
280		let (sender, message) = self;
281		(*sender, message.into())
282	}
283}
284
285pub type InboundHrmpMessages = InboundMessagesCollection<(ParaId, InboundHrmpMessage)>;
286
287impl InboundHrmpMessages {
288	// Prepare horizontal messages for a more convenient processing:
289	//
290	// Instead of a mapping from a para to a list of inbound HRMP messages, we will have a
291	// list of tuples `(sender, message)` first ordered by `sent_at` (the relay chain block
292	// number in which the message hit the relay-chain) and second ordered by para id
293	// ascending.
294	pub fn from_map(messages_map: BTreeMap<ParaId, Vec<InboundHrmpMessage>>) -> Self {
295		let mut messages = messages_map
296			.into_iter()
297			.flat_map(|(sender, channel_contents)| {
298				channel_contents.into_iter().map(move |message| (sender, message))
299			})
300			.collect::<Vec<_>>();
301		messages.sort_by(|(sender_a, msg_a), (sender_b, msg_b)| {
302			// first sort by sent-at and then by the para id
303			(msg_a.sent_at, sender_a).cmp(&(msg_b.sent_at, sender_b))
304		});
305
306		Self { messages }
307	}
308}
309
310pub type AbridgedInboundHrmpMessages =
311	AbridgedInboundMessagesCollection<(ParaId, InboundHrmpMessage)>;
312
313impl AbridgedInboundHrmpMessages {
314	/// Returns an iterator over the deconstructed messages.
315	pub fn flat_msgs_iter(&self) -> impl Iterator<Item = (ParaId, RelayChainBlockNumber, &[u8])> {
316		self.full_messages
317			.iter()
318			.map(|&(sender, ref message)| (sender, message.sent_at, &message.data[..]))
319	}
320}
321
322/// The basic inherent data that is passed by the collator to the parachain runtime.
323/// This data doesn't contain any messages.
324#[derive(
325	codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
326)]
327pub struct BasicParachainInherentData {
328	pub validation_data: PersistedValidationData,
329	pub relay_chain_state: sp_trie::StorageProof,
330	pub relay_parent_descendants: Vec<RelayHeader>,
331	pub collator_peer_id: Option<ApprovedPeerId>,
332}
333
334/// The messages that are passed by the collator to the parachain runtime as part of the
335/// inherent data.
336#[derive(
337	codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
338)]
339pub struct InboundMessagesData {
340	pub downward_messages: AbridgedInboundDownwardMessages,
341	pub horizontal_messages: AbridgedInboundHrmpMessages,
342}
343
344impl InboundMessagesData {
345	/// Creates a new instance of `InboundMessagesData` with the provided messages.
346	pub fn new(
347		dmq_msgs: AbridgedInboundDownwardMessages,
348		hrmp_msgs: AbridgedInboundHrmpMessages,
349	) -> Self {
350		Self { downward_messages: dmq_msgs, horizontal_messages: hrmp_msgs }
351	}
352}
353
354/// Deconstructs a `ParachainInherentData` instance.
355pub fn deconstruct_parachain_inherent_data(
356	data: ParachainInherentData,
357) -> (BasicParachainInherentData, InboundDownwardMessages, InboundHrmpMessages) {
358	(
359		BasicParachainInherentData {
360			validation_data: data.validation_data,
361			relay_chain_state: data.relay_chain_state,
362			relay_parent_descendants: data.relay_parent_descendants,
363			collator_peer_id: data.collator_peer_id,
364		},
365		InboundDownwardMessages::new(data.downward_messages),
366		InboundHrmpMessages::from_map(data.horizontal_messages),
367	)
368}
369
370#[cfg(test)]
371mod tests {
372	use super::*;
373
374	fn build_inbound_dm_vec(
375		info: &[(RelayChainBlockNumber, usize)],
376	) -> Vec<InboundDownwardMessage<RelayChainBlockNumber>> {
377		let mut messages = vec![];
378		for (sent_at, size) in info.iter() {
379			let data = vec![1; *size];
380			messages.push(InboundDownwardMessage { sent_at: *sent_at, msg: data })
381		}
382		messages
383	}
384
385	#[test]
386	fn drop_processed_messages_works() {
387		let msgs_vec =
388			build_inbound_dm_vec(&[(0, 0), (0, 0), (2, 0), (2, 0), (2, 0), (2, 0), (3, 0)]);
389
390		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
391
392		msgs.drop_processed_messages(&InboundMessageId { sent_at: 3, reverse_idx: 0 });
393		assert_eq!(msgs.messages, []);
394
395		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
396		msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 0 });
397		assert_eq!(msgs.messages, msgs_vec[6..]);
398		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
399		msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 1 });
400		assert_eq!(msgs.messages, msgs_vec[5..]);
401		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
402		msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 4 });
403		assert_eq!(msgs.messages, msgs_vec[2..]);
404
405		// Go back starting from the last message sent at block 2, with 1 more message than the
406		// total number of messages sent at 2.
407		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
408		msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 5 });
409		assert_eq!(msgs.messages, msgs_vec[1..]);
410
411		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
412		msgs.drop_processed_messages(&InboundMessageId { sent_at: 0, reverse_idx: 1 });
413		assert_eq!(msgs.messages, msgs_vec[1..]);
414		// Go back starting from the last message sent at block 0, with 1 more message than the
415		// total number of messages sent at 0.
416		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
417		msgs.drop_processed_messages(&InboundMessageId { sent_at: 0, reverse_idx: 3 });
418		assert_eq!(msgs.messages, msgs_vec);
419	}
420
421	#[test]
422	fn into_abridged_works() {
423		let msgs = InboundDownwardMessages::new(vec![]);
424		let mut size_limit = 0;
425		let abridged_msgs = msgs.into_abridged(&mut size_limit);
426		assert_eq!(size_limit, 0);
427		assert_eq!(&abridged_msgs.full_messages, &vec![]);
428		assert_eq!(abridged_msgs.hashed_messages, vec![]);
429
430		let msgs_vec = build_inbound_dm_vec(&[(0, 100), (0, 100), (0, 150), (0, 50)]);
431		let msgs = InboundDownwardMessages::new(msgs_vec.clone());
432
433		let mut size_limit = 150;
434		let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
435		assert_eq!(size_limit, 50);
436		assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..1]);
437		assert_eq!(
438			abridged_msgs.hashed_messages,
439			vec![(&msgs_vec[1]).into(), (&msgs_vec[2]).into(), (&msgs_vec[3]).into()]
440		);
441
442		let mut size_limit = 200;
443		let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
444		assert_eq!(size_limit, 0);
445		assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..2]);
446		assert_eq!(
447			abridged_msgs.hashed_messages,
448			vec![(&msgs_vec[2]).into(), (&msgs_vec[3]).into()]
449		);
450
451		let mut size_limit = 399;
452		let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
453		assert_eq!(size_limit, 49);
454		assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..3]);
455		assert_eq!(abridged_msgs.hashed_messages, vec![(&msgs_vec[3]).into()]);
456
457		let mut size_limit = 400;
458		let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
459		assert_eq!(size_limit, 0);
460		assert_eq!(&abridged_msgs.full_messages, &msgs_vec);
461		assert_eq!(abridged_msgs.hashed_messages, vec![]);
462	}
463
464	#[test]
465	fn from_map_works() {
466		let mut messages_map: BTreeMap<ParaId, Vec<InboundHrmpMessage>> = BTreeMap::new();
467		messages_map.insert(
468			1000.into(),
469			vec![
470				InboundHrmpMessage { sent_at: 0, data: vec![0] },
471				InboundHrmpMessage { sent_at: 0, data: vec![1] },
472				InboundHrmpMessage { sent_at: 1, data: vec![2] },
473			],
474		);
475		messages_map.insert(
476			2000.into(),
477			vec![
478				InboundHrmpMessage { sent_at: 0, data: vec![3] },
479				InboundHrmpMessage { sent_at: 0, data: vec![4] },
480				InboundHrmpMessage { sent_at: 1, data: vec![5] },
481			],
482		);
483		messages_map.insert(
484			3000.into(),
485			vec![
486				InboundHrmpMessage { sent_at: 0, data: vec![6] },
487				InboundHrmpMessage { sent_at: 1, data: vec![7] },
488				InboundHrmpMessage { sent_at: 2, data: vec![8] },
489				InboundHrmpMessage { sent_at: 3, data: vec![9] },
490				InboundHrmpMessage { sent_at: 4, data: vec![10] },
491			],
492		);
493
494		let msgs = InboundHrmpMessages::from_map(messages_map);
495		assert_eq!(
496			msgs.messages,
497			[
498				(1000.into(), InboundHrmpMessage { sent_at: 0, data: vec![0] }),
499				(1000.into(), InboundHrmpMessage { sent_at: 0, data: vec![1] }),
500				(2000.into(), InboundHrmpMessage { sent_at: 0, data: vec![3] }),
501				(2000.into(), InboundHrmpMessage { sent_at: 0, data: vec![4] }),
502				(3000.into(), InboundHrmpMessage { sent_at: 0, data: vec![6] }),
503				(1000.into(), InboundHrmpMessage { sent_at: 1, data: vec![2] }),
504				(2000.into(), InboundHrmpMessage { sent_at: 1, data: vec![5] }),
505				(3000.into(), InboundHrmpMessage { sent_at: 1, data: vec![7] }),
506				(3000.into(), InboundHrmpMessage { sent_at: 2, data: vec![8] }),
507				(3000.into(), InboundHrmpMessage { sent_at: 3, data: vec![9] }),
508				(3000.into(), InboundHrmpMessage { sent_at: 4, data: vec![10] })
509			]
510		)
511	}
512
513	#[test]
514	fn check_enough_messages_included_basic_works() {
515		let mut messages = AbridgedInboundHrmpMessages {
516			full_messages: vec![(
517				1000.into(),
518				InboundHrmpMessage { sent_at: 0, data: vec![1; 100] },
519			)],
520			hashed_messages: vec![(
521				2000.into(),
522				HashedMessage { sent_at: 1, msg_hash: Default::default() },
523			)],
524		};
525
526		messages.check_enough_messages_included_basic("Test");
527
528		messages.full_messages = vec![];
529		let result =
530			std::panic::catch_unwind(|| messages.check_enough_messages_included_basic("Test"));
531		assert!(result.is_err());
532
533		messages.hashed_messages = vec![];
534		messages.check_enough_messages_included_basic("Test");
535	}
536
537	#[test]
538	fn check_enough_messages_included_advanced_works() {
539		let mixed_messages = AbridgedInboundHrmpMessages {
540			full_messages: vec![(
541				1000.into(),
542				InboundHrmpMessage { sent_at: 0, data: vec![1; 50] },
543			)],
544			hashed_messages: vec![(
545				2000.into(),
546				HashedMessage { sent_at: 1, msg_hash: Default::default() },
547			)],
548		};
549		let result = std::panic::catch_unwind(|| {
550			mixed_messages.check_enough_messages_included_advanced(
551				"Test",
552				AbridgedInboundMessagesSizeInfo {
553					max_full_messages_size: 100,
554					first_hashed_msg_max_size: 50,
555				},
556			)
557		});
558		assert!(result.is_err());
559		mixed_messages.check_enough_messages_included_advanced(
560			"Test",
561			AbridgedInboundMessagesSizeInfo {
562				max_full_messages_size: 100,
563				first_hashed_msg_max_size: 51,
564			},
565		);
566	}
567}