referrerpolicy=no-referrer-when-downgrade

cumulus_pallet_parachain_system/
parachain_inherent.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17//! Cumulus parachain inherent related structures.
18
19use alloc::{collections::btree_map::BTreeMap, vec, vec::Vec};
20use core::fmt::Debug;
21use cumulus_primitives_core::{
22	relay_chain::{
23		ApprovedPeerId, BlockNumber as RelayChainBlockNumber, BlockNumber, Header as RelayHeader,
24	},
25	InboundDownwardMessage, InboundHrmpMessage, ParaId, PersistedValidationData,
26};
27use cumulus_primitives_parachain_inherent::{HashedMessage, ParachainInherentData};
28use frame_support::{
29	defensive,
30	pallet_prelude::{Decode, DecodeWithMemTracking, Encode},
31};
32use scale_info::TypeInfo;
33use sp_core::{bounded::BoundedSlice, Get};
34
35/// A structure that helps identify a message inside a collection of messages sorted by `sent_at`.
36///
37/// This structure contains a `sent_at` field and a reverse index. Using this information, we can
38/// identify a message inside a sorted collection by walking back `reverse_idx` positions starting
39/// from the last message that has the provided `sent_at`.
40///
41/// We use a reverse index instead of a normal index because sometimes the messages at the
42/// beginning of the collection are being pruned.
43///
44/// # Example
45///
46///
47/// For the collection
48/// `msgs = [{sent_at: 0}, {sent_at: 1}, {sent_at: 1}, {sent_at: 1}, {sent_at: 1}, {sent_at: 3}]`
49///
50/// `InboundMessageId {sent_at: 1, reverse_idx: 0}` points to `msgs[4]`
51/// `InboundMessageId {sent_at: 1, reverse_idx: 3}` points to `msgs[1]`
52/// `InboundMessageId {sent_at: 1, reverse_idx: 4}` points to `msgs[0]`
53#[derive(
54	Encode,
55	Decode,
56	DecodeWithMemTracking,
57	Clone,
58	Default,
59	sp_runtime::RuntimeDebug,
60	PartialEq,
61	TypeInfo,
62)]
63pub struct InboundMessageId {
64	/// The block number at which this message was added to the message passing queue
65	/// on the relay chain.
66	pub sent_at: BlockNumber,
67	/// The reverse index of the message in the collection of messages sent at `sent_at`.
68	pub reverse_idx: u32,
69}
70
71/// A message that was received by the parachain.
72pub trait InboundMessage {
73	/// The corresponding compressed message.
74	/// This should be an equivalent message that stores the same metadata as the current message,
75	/// but stores only a hash of the message data.
76	type CompressedMessage: Debug;
77
78	/// Gets the message data.
79	fn data(&self) -> &[u8];
80
81	/// Gets the relay chain number where the current message was pushed to the corresponding
82	/// relay chain queue.
83	fn sent_at(&self) -> RelayChainBlockNumber;
84
85	/// Converts the current message into a `CompressedMessage`
86	fn to_compressed(&self) -> Self::CompressedMessage;
87}
88
89/// A collection of inbound messages.
90#[derive(
91	codec::Encode,
92	codec::Decode,
93	codec::DecodeWithMemTracking,
94	sp_core::RuntimeDebug,
95	Clone,
96	PartialEq,
97	TypeInfo,
98)]
99pub struct InboundMessagesCollection<Message: InboundMessage> {
100	messages: Vec<Message>,
101}
102
103impl<Message: InboundMessage> InboundMessagesCollection<Message> {
104	/// Creates a new instance of `InboundMessagesCollection` that contains the provided `messages`.
105	pub fn new(messages: Vec<Message>) -> Self {
106		Self { messages }
107	}
108
109	/// Drop all the messages up to `last_processed_msg`.
110	pub fn drop_processed_messages(&mut self, last_processed_msg: &InboundMessageId) {
111		let mut last_processed_msg_idx = None;
112		let messages = &mut self.messages;
113		for (idx, message) in messages.iter().enumerate().rev() {
114			let sent_at = message.sent_at();
115			if sent_at == last_processed_msg.sent_at {
116				last_processed_msg_idx = idx.checked_sub(last_processed_msg.reverse_idx as usize);
117				break;
118			}
119			// If we build on the same relay parent twice, we will receive the same messages again
120			// while `last_processed_msg` may have been increased. We need this check to make sure
121			// that the old messages are dropped.
122			if sent_at < last_processed_msg.sent_at {
123				last_processed_msg_idx = Some(idx);
124				break;
125			}
126		}
127		if let Some(last_processed_msg_idx) = last_processed_msg_idx {
128			messages.drain(..=last_processed_msg_idx);
129		}
130	}
131
132	/// Converts `self` into an [`AbridgedInboundMessagesCollection`].
133	///
134	/// The first messages in `self` (up to the provided `size_limit`) are kept in their current
135	/// form (they will contain the full message data).
136	/// The messages that exceed that limit are hashed.
137	pub fn into_abridged(
138		self,
139		size_limit: &mut usize,
140	) -> AbridgedInboundMessagesCollection<Message> {
141		let mut messages = self.messages;
142
143		let mut split_off_pos = messages.len();
144		for (idx, message) in messages.iter().enumerate() {
145			if *size_limit < message.data().len() {
146				break;
147			}
148			*size_limit -= message.data().len();
149
150			split_off_pos = idx + 1;
151		}
152
153		let extra_messages = messages.split_off(split_off_pos);
154		let hashed_messages = extra_messages.iter().map(|msg| msg.to_compressed()).collect();
155
156		AbridgedInboundMessagesCollection { full_messages: messages, hashed_messages }
157	}
158}
159
160/// A compressed collection of inbound messages.
161///
162/// The first messages in the collection (up to a limit) contain the full message data.
163/// The messages that exceed that limit are hashed.
164#[derive(
165	codec::Encode,
166	codec::Decode,
167	codec::DecodeWithMemTracking,
168	sp_core::RuntimeDebug,
169	Clone,
170	PartialEq,
171	TypeInfo,
172)]
173pub struct AbridgedInboundMessagesCollection<Message: InboundMessage> {
174	full_messages: Vec<Message>,
175	hashed_messages: Vec<Message::CompressedMessage>,
176}
177
178impl<Message: InboundMessage> AbridgedInboundMessagesCollection<Message> {
179	/// Gets a tuple containing both the full messages and the hashed messages
180	/// stored by the current collection.
181	pub fn messages(&self) -> (&[Message], &[Message::CompressedMessage]) {
182		(&self.full_messages, &self.hashed_messages)
183	}
184
185	/// Check that the current collection contains as many full messages as possible.
186	///
187	/// The `AbridgedInboundMessagesCollection` is provided to the runtime by a collator.
188	/// A malicious collator can provide a collection that contains no full messages or fewer
189	/// full messages than possible, leading to censorship.
190	pub fn check_enough_messages_included(&self, collection_name: &str) {
191		if self.hashed_messages.is_empty() {
192			return;
193		}
194
195		// Ideally, we should check that the collection contains as many full messages as possible
196		// without exceeding the max expected size. The worst case scenario is that were the first
197		// message that had to be hashed is a max size message. So in this case, the min expected
198		// size would be `max_expected_size - max_msg_size`. However, there are multiple issues:
199		// 1. The max message size config can change while we still have to process messages with
200		//    the old max message size.
201		// 2. We can't access the max downward message size from the parachain runtime.
202		//
203		// So the safest approach is to check that there is at least 1 full message.
204		assert!(
205			self.full_messages.len() >= 1,
206			"[{}] Advancement rule violation: mandatory messages missing",
207			collection_name,
208		);
209	}
210}
211
212impl<Message: InboundMessage> Default for AbridgedInboundMessagesCollection<Message> {
213	fn default() -> Self {
214		Self { full_messages: vec![], hashed_messages: vec![] }
215	}
216}
217
218impl InboundMessage for InboundDownwardMessage<RelayChainBlockNumber> {
219	type CompressedMessage = HashedMessage;
220
221	fn data(&self) -> &[u8] {
222		&self.msg
223	}
224
225	fn sent_at(&self) -> RelayChainBlockNumber {
226		self.sent_at
227	}
228
229	fn to_compressed(&self) -> Self::CompressedMessage {
230		self.into()
231	}
232}
233
234pub type InboundDownwardMessages =
235	InboundMessagesCollection<InboundDownwardMessage<RelayChainBlockNumber>>;
236
237pub type AbridgedInboundDownwardMessages =
238	AbridgedInboundMessagesCollection<InboundDownwardMessage<RelayChainBlockNumber>>;
239
240impl AbridgedInboundDownwardMessages {
241	/// Returns an iterator over the messages that maps them to `BoundedSlices`.
242	pub fn bounded_msgs_iter<MaxMessageLen: Get<u32>>(
243		&self,
244	) -> impl Iterator<Item = BoundedSlice<u8, MaxMessageLen>> {
245		self.full_messages
246			.iter()
247			// Note: we are not using `.defensive()` here since that prints the whole value to
248			// console. In case that the message is too long, this clogs up the log quite badly.
249			.filter_map(|m| match BoundedSlice::try_from(&m.msg[..]) {
250				Ok(bounded) => Some(bounded),
251				Err(_) => {
252					defensive!("Inbound Downward message was too long; dropping");
253					None
254				},
255			})
256	}
257}
258
259impl InboundMessage for (ParaId, InboundHrmpMessage) {
260	type CompressedMessage = (ParaId, HashedMessage);
261
262	fn data(&self) -> &[u8] {
263		&self.1.data
264	}
265
266	fn sent_at(&self) -> RelayChainBlockNumber {
267		self.1.sent_at
268	}
269
270	fn to_compressed(&self) -> Self::CompressedMessage {
271		let (sender, message) = self;
272		(*sender, message.into())
273	}
274}
275
276pub type InboundHrmpMessages = InboundMessagesCollection<(ParaId, InboundHrmpMessage)>;
277
278impl InboundHrmpMessages {
279	// Prepare horizontal messages for a more convenient processing:
280	//
281	// Instead of a mapping from a para to a list of inbound HRMP messages, we will have a
282	// list of tuples `(sender, message)` first ordered by `sent_at` (the relay chain block
283	// number in which the message hit the relay-chain) and second ordered by para id
284	// ascending.
285	pub fn from_map(messages_map: BTreeMap<ParaId, Vec<InboundHrmpMessage>>) -> Self {
286		let mut messages = messages_map
287			.into_iter()
288			.flat_map(|(sender, channel_contents)| {
289				channel_contents.into_iter().map(move |message| (sender, message))
290			})
291			.collect::<Vec<_>>();
292		messages.sort_by(|(sender_a, msg_a), (sender_b, msg_b)| {
293			// first sort by sent-at and then by the para id
294			(msg_a.sent_at, sender_a).cmp(&(msg_b.sent_at, sender_b))
295		});
296
297		Self { messages }
298	}
299}
300
301pub type AbridgedInboundHrmpMessages =
302	AbridgedInboundMessagesCollection<(ParaId, InboundHrmpMessage)>;
303
304impl AbridgedInboundHrmpMessages {
305	/// Returns an iterator over the deconstructed messages.
306	pub fn flat_msgs_iter(&self) -> impl Iterator<Item = (ParaId, RelayChainBlockNumber, &[u8])> {
307		self.full_messages
308			.iter()
309			.map(|&(sender, ref message)| (sender, message.sent_at, &message.data[..]))
310	}
311}
312
313/// The basic inherent data that is passed by the collator to the parachain runtime.
314/// This data doesn't contain any messages.
315#[derive(
316	codec::Encode,
317	codec::Decode,
318	codec::DecodeWithMemTracking,
319	sp_core::RuntimeDebug,
320	Clone,
321	PartialEq,
322	TypeInfo,
323)]
324pub struct BasicParachainInherentData {
325	pub validation_data: PersistedValidationData,
326	pub relay_chain_state: sp_trie::StorageProof,
327	pub relay_parent_descendants: Vec<RelayHeader>,
328	pub collator_peer_id: Option<ApprovedPeerId>,
329}
330
331/// The messages that are passed by the collator to the parachain runtime as part of the
332/// inherent data.
333#[derive(
334	codec::Encode,
335	codec::Decode,
336	codec::DecodeWithMemTracking,
337	sp_core::RuntimeDebug,
338	Clone,
339	PartialEq,
340	TypeInfo,
341)]
342pub struct InboundMessagesData {
343	pub downward_messages: AbridgedInboundDownwardMessages,
344	pub horizontal_messages: AbridgedInboundHrmpMessages,
345}
346
347impl InboundMessagesData {
348	/// Creates a new instance of `InboundMessagesData` with the provided messages.
349	pub fn new(
350		dmq_msgs: AbridgedInboundDownwardMessages,
351		hrmp_msgs: AbridgedInboundHrmpMessages,
352	) -> Self {
353		Self { downward_messages: dmq_msgs, horizontal_messages: hrmp_msgs }
354	}
355}
356
357/// Deconstructs a `ParachainInherentData` instance.
358pub fn deconstruct_parachain_inherent_data(
359	data: ParachainInherentData,
360) -> (BasicParachainInherentData, InboundDownwardMessages, InboundHrmpMessages) {
361	(
362		BasicParachainInherentData {
363			validation_data: data.validation_data,
364			relay_chain_state: data.relay_chain_state,
365			relay_parent_descendants: data.relay_parent_descendants,
366			collator_peer_id: data.collator_peer_id,
367		},
368		InboundDownwardMessages::new(data.downward_messages),
369		InboundHrmpMessages::from_map(data.horizontal_messages),
370	)
371}
372
373#[cfg(test)]
374mod tests {
375	use super::*;
376
377	fn build_inbound_dm_vec(
378		info: &[(RelayChainBlockNumber, usize)],
379	) -> Vec<InboundDownwardMessage<RelayChainBlockNumber>> {
380		let mut messages = vec![];
381		for (sent_at, size) in info.iter() {
382			let data = vec![1; *size];
383			messages.push(InboundDownwardMessage { sent_at: *sent_at, msg: data })
384		}
385		messages
386	}
387
388	#[test]
389	fn drop_processed_messages_works() {
390		let msgs_vec =
391			build_inbound_dm_vec(&[(0, 0), (0, 0), (2, 0), (2, 0), (2, 0), (2, 0), (3, 0)]);
392
393		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
394
395		msgs.drop_processed_messages(&InboundMessageId { sent_at: 3, reverse_idx: 0 });
396		assert_eq!(msgs.messages, []);
397
398		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
399		msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 0 });
400		assert_eq!(msgs.messages, msgs_vec[6..]);
401		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
402		msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 1 });
403		assert_eq!(msgs.messages, msgs_vec[5..]);
404		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
405		msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 4 });
406		assert_eq!(msgs.messages, msgs_vec[2..]);
407
408		// Go back starting from the last message sent at block 2, with 1 more message than the
409		// total number of messages sent at 2.
410		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
411		msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 5 });
412		assert_eq!(msgs.messages, msgs_vec[1..]);
413
414		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
415		msgs.drop_processed_messages(&InboundMessageId { sent_at: 0, reverse_idx: 1 });
416		assert_eq!(msgs.messages, msgs_vec[1..]);
417		// Go back starting from the last message sent at block 0, with 1 more message than the
418		// total number of messages sent at 0.
419		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
420		msgs.drop_processed_messages(&InboundMessageId { sent_at: 0, reverse_idx: 3 });
421		assert_eq!(msgs.messages, msgs_vec);
422	}
423
424	#[test]
425	fn into_abridged_works() {
426		let msgs = InboundDownwardMessages::new(vec![]);
427		let mut size_limit = 0;
428		let abridged_msgs = msgs.into_abridged(&mut size_limit);
429		assert_eq!(size_limit, 0);
430		assert_eq!(&abridged_msgs.full_messages, &vec![]);
431		assert_eq!(abridged_msgs.hashed_messages, vec![]);
432
433		let msgs_vec = build_inbound_dm_vec(&[(0, 100), (0, 100), (0, 150), (0, 50)]);
434		let msgs = InboundDownwardMessages::new(msgs_vec.clone());
435
436		let mut size_limit = 150;
437		let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
438		assert_eq!(size_limit, 50);
439		assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..1]);
440		assert_eq!(
441			abridged_msgs.hashed_messages,
442			vec![(&msgs_vec[1]).into(), (&msgs_vec[2]).into(), (&msgs_vec[3]).into()]
443		);
444
445		let mut size_limit = 200;
446		let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
447		assert_eq!(size_limit, 0);
448		assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..2]);
449		assert_eq!(
450			abridged_msgs.hashed_messages,
451			vec![(&msgs_vec[2]).into(), (&msgs_vec[3]).into()]
452		);
453
454		let mut size_limit = 399;
455		let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
456		assert_eq!(size_limit, 49);
457		assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..3]);
458		assert_eq!(abridged_msgs.hashed_messages, vec![(&msgs_vec[3]).into()]);
459
460		let mut size_limit = 400;
461		let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
462		assert_eq!(size_limit, 0);
463		assert_eq!(&abridged_msgs.full_messages, &msgs_vec);
464		assert_eq!(abridged_msgs.hashed_messages, vec![]);
465	}
466
467	#[test]
468	fn from_map_works() {
469		let mut messages_map: BTreeMap<ParaId, Vec<InboundHrmpMessage>> = BTreeMap::new();
470		messages_map.insert(
471			1000.into(),
472			vec![
473				InboundHrmpMessage { sent_at: 0, data: vec![0] },
474				InboundHrmpMessage { sent_at: 0, data: vec![1] },
475				InboundHrmpMessage { sent_at: 1, data: vec![2] },
476			],
477		);
478		messages_map.insert(
479			2000.into(),
480			vec![
481				InboundHrmpMessage { sent_at: 0, data: vec![3] },
482				InboundHrmpMessage { sent_at: 0, data: vec![4] },
483				InboundHrmpMessage { sent_at: 1, data: vec![5] },
484			],
485		);
486		messages_map.insert(
487			3000.into(),
488			vec![
489				InboundHrmpMessage { sent_at: 0, data: vec![6] },
490				InboundHrmpMessage { sent_at: 1, data: vec![7] },
491				InboundHrmpMessage { sent_at: 2, data: vec![8] },
492				InboundHrmpMessage { sent_at: 3, data: vec![9] },
493				InboundHrmpMessage { sent_at: 4, data: vec![10] },
494			],
495		);
496
497		let msgs = InboundHrmpMessages::from_map(messages_map);
498		assert_eq!(
499			msgs.messages,
500			[
501				(1000.into(), InboundHrmpMessage { sent_at: 0, data: vec![0] }),
502				(1000.into(), InboundHrmpMessage { sent_at: 0, data: vec![1] }),
503				(2000.into(), InboundHrmpMessage { sent_at: 0, data: vec![3] }),
504				(2000.into(), InboundHrmpMessage { sent_at: 0, data: vec![4] }),
505				(3000.into(), InboundHrmpMessage { sent_at: 0, data: vec![6] }),
506				(1000.into(), InboundHrmpMessage { sent_at: 1, data: vec![2] }),
507				(2000.into(), InboundHrmpMessage { sent_at: 1, data: vec![5] }),
508				(3000.into(), InboundHrmpMessage { sent_at: 1, data: vec![7] }),
509				(3000.into(), InboundHrmpMessage { sent_at: 2, data: vec![8] }),
510				(3000.into(), InboundHrmpMessage { sent_at: 3, data: vec![9] }),
511				(3000.into(), InboundHrmpMessage { sent_at: 4, data: vec![10] })
512			]
513		)
514	}
515
516	#[test]
517	fn check_enough_messages_included_works() {
518		let mut messages = AbridgedInboundHrmpMessages {
519			full_messages: vec![(
520				1000.into(),
521				InboundHrmpMessage { sent_at: 0, data: vec![1; 100] },
522			)],
523			hashed_messages: vec![(
524				2000.into(),
525				HashedMessage { sent_at: 1, msg_hash: Default::default() },
526			)],
527		};
528
529		messages.check_enough_messages_included("Test");
530
531		messages.full_messages = vec![];
532		let result = std::panic::catch_unwind(|| messages.check_enough_messages_included("Test"));
533		assert!(result.is_err());
534
535		messages.hashed_messages = vec![];
536		messages.check_enough_messages_included("Test");
537	}
538}