referrerpolicy=no-referrer-when-downgrade

cumulus_pallet_parachain_system/
parachain_inherent.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17//! Cumulus parachain inherent related structures.
18
19use alloc::{collections::btree_map::BTreeMap, vec, vec::Vec};
20use core::fmt::Debug;
21use cumulus_primitives_core::{
22	relay_chain::{
23		ApprovedPeerId, BlockNumber as RelayChainBlockNumber, BlockNumber, Header as RelayHeader,
24	},
25	InboundDownwardMessage, InboundHrmpMessage, ParaId, PersistedValidationData,
26};
27use cumulus_primitives_parachain_inherent::{HashedMessage, ParachainInherentData};
28use frame_support::{
29	defensive,
30	pallet_prelude::{Decode, DecodeWithMemTracking, Encode},
31};
32use scale_info::TypeInfo;
33use sp_core::{bounded::BoundedSlice, Get};
34
35/// A structure that helps identify a message inside a collection of messages sorted by `sent_at`.
36///
37/// This structure contains a `sent_at` field and a reverse index. Using this information, we can
38/// identify a message inside a sorted collection by walking back `reverse_idx` positions starting
39/// from the last message that has the provided `sent_at`.
40///
41/// We use a reverse index instead of a normal index because sometimes the messages at the
42/// beginning of the collection are being pruned.
43///
44/// # Example
45///
46///
47/// For the collection
48/// `msgs = [{sent_at: 0}, {sent_at: 1}, {sent_at: 1}, {sent_at: 1}, {sent_at: 1}, {sent_at: 3}]`
49///
50/// `InboundMessageId {sent_at: 1, reverse_idx: 0}` points to `msgs[4]`
51/// `InboundMessageId {sent_at: 1, reverse_idx: 3}` points to `msgs[1]`
52/// `InboundMessageId {sent_at: 1, reverse_idx: 4}` points to `msgs[0]`
53#[derive(Encode, Decode, DecodeWithMemTracking, Clone, Default, Debug, PartialEq, TypeInfo)]
54pub struct InboundMessageId {
55	/// The block number at which this message was added to the message passing queue
56	/// on the relay chain.
57	pub sent_at: BlockNumber,
58	/// The reverse index of the message in the collection of messages sent at `sent_at`.
59	pub reverse_idx: u32,
60}
61
62/// A message that was received by the parachain.
63pub trait InboundMessage {
64	/// The corresponding compressed message.
65	/// This should be an equivalent message that stores the same metadata as the current message,
66	/// but stores only a hash of the message data.
67	type CompressedMessage: Debug;
68
69	/// Gets the message data.
70	fn data(&self) -> &[u8];
71
72	/// Gets the relay chain number where the current message was pushed to the corresponding
73	/// relay chain queue.
74	fn sent_at(&self) -> RelayChainBlockNumber;
75
76	/// Converts the current message into a `CompressedMessage`
77	fn to_compressed(&self) -> Self::CompressedMessage;
78}
79
80/// A collection of inbound messages.
81#[derive(
82	codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
83)]
84pub struct InboundMessagesCollection<Message: InboundMessage> {
85	messages: Vec<Message>,
86}
87
88impl<Message: InboundMessage> InboundMessagesCollection<Message> {
89	/// Creates a new instance of `InboundMessagesCollection` that contains the provided `messages`.
90	pub fn new(messages: Vec<Message>) -> Self {
91		Self { messages }
92	}
93
94	/// Drop all the messages up to `last_processed_msg`.
95	pub fn drop_processed_messages(&mut self, last_processed_msg: &InboundMessageId) {
96		let mut last_processed_msg_idx = None;
97		let messages = &mut self.messages;
98		for (idx, message) in messages.iter().enumerate().rev() {
99			let sent_at = message.sent_at();
100			if sent_at == last_processed_msg.sent_at {
101				last_processed_msg_idx = idx.checked_sub(last_processed_msg.reverse_idx as usize);
102				break;
103			}
104			// If we build on the same relay parent twice, we will receive the same messages again
105			// while `last_processed_msg` may have been increased. We need this check to make sure
106			// that the old messages are dropped.
107			if sent_at < last_processed_msg.sent_at {
108				last_processed_msg_idx = Some(idx);
109				break;
110			}
111		}
112		if let Some(last_processed_msg_idx) = last_processed_msg_idx {
113			messages.drain(..=last_processed_msg_idx);
114		}
115	}
116
117	/// Converts `self` into an [`AbridgedInboundMessagesCollection`].
118	///
119	/// The first messages in `self` (up to the provided `size_limit`) are kept in their current
120	/// form (they will contain the full message data).
121	/// The messages that exceed that limit are hashed.
122	pub fn into_abridged(
123		self,
124		size_limit: &mut usize,
125	) -> AbridgedInboundMessagesCollection<Message> {
126		let mut messages = self.messages;
127
128		let mut split_off_pos = messages.len();
129		for (idx, message) in messages.iter().enumerate() {
130			if *size_limit < message.data().len() {
131				break;
132			}
133			*size_limit -= message.data().len();
134
135			split_off_pos = idx + 1;
136		}
137
138		let extra_messages = messages.split_off(split_off_pos);
139		let hashed_messages = extra_messages.iter().map(|msg| msg.to_compressed()).collect();
140
141		AbridgedInboundMessagesCollection { full_messages: messages, hashed_messages }
142	}
143}
144
145/// A compressed collection of inbound messages.
146///
147/// The first messages in the collection (up to a limit) contain the full message data.
148/// The messages that exceed that limit are hashed.
149#[derive(
150	codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
151)]
152pub struct AbridgedInboundMessagesCollection<Message: InboundMessage> {
153	full_messages: Vec<Message>,
154	hashed_messages: Vec<Message::CompressedMessage>,
155}
156
157impl<Message: InboundMessage> AbridgedInboundMessagesCollection<Message> {
158	/// Gets a tuple containing both the full messages and the hashed messages
159	/// stored by the current collection.
160	pub fn messages(&self) -> (&[Message], &[Message::CompressedMessage]) {
161		(&self.full_messages, &self.hashed_messages)
162	}
163
164	/// Check that the current collection contains as many full messages as possible.
165	///
166	/// The `AbridgedInboundMessagesCollection` is provided to the runtime by a collator.
167	/// A malicious collator can provide a collection that contains no full messages or fewer
168	/// full messages than possible, leading to censorship.
169	pub fn check_enough_messages_included(&self, collection_name: &str) {
170		if self.hashed_messages.is_empty() {
171			return;
172		}
173
174		// Ideally, we should check that the collection contains as many full messages as possible
175		// without exceeding the max expected size. The worst case scenario is that were the first
176		// message that had to be hashed is a max size message. So in this case, the min expected
177		// size would be `max_expected_size - max_msg_size`. However, there are multiple issues:
178		// 1. The max message size config can change while we still have to process messages with
179		//    the old max message size.
180		// 2. We can't access the max downward message size from the parachain runtime.
181		//
182		// So the safest approach is to check that there is at least 1 full message.
183		assert!(
184			self.full_messages.len() >= 1,
185			"[{}] Advancement rule violation: mandatory messages missing",
186			collection_name,
187		);
188	}
189}
190
191impl<Message: InboundMessage> Default for AbridgedInboundMessagesCollection<Message> {
192	fn default() -> Self {
193		Self { full_messages: vec![], hashed_messages: vec![] }
194	}
195}
196
197impl InboundMessage for InboundDownwardMessage<RelayChainBlockNumber> {
198	type CompressedMessage = HashedMessage;
199
200	fn data(&self) -> &[u8] {
201		&self.msg
202	}
203
204	fn sent_at(&self) -> RelayChainBlockNumber {
205		self.sent_at
206	}
207
208	fn to_compressed(&self) -> Self::CompressedMessage {
209		self.into()
210	}
211}
212
213pub type InboundDownwardMessages =
214	InboundMessagesCollection<InboundDownwardMessage<RelayChainBlockNumber>>;
215
216pub type AbridgedInboundDownwardMessages =
217	AbridgedInboundMessagesCollection<InboundDownwardMessage<RelayChainBlockNumber>>;
218
219impl AbridgedInboundDownwardMessages {
220	/// Returns an iterator over the messages that maps them to `BoundedSlices`.
221	pub fn bounded_msgs_iter<MaxMessageLen: Get<u32>>(
222		&self,
223	) -> impl Iterator<Item = BoundedSlice<'_, u8, MaxMessageLen>> {
224		self.full_messages
225			.iter()
226			// Note: we are not using `.defensive()` here since that prints the whole value to
227			// console. In case that the message is too long, this clogs up the log quite badly.
228			.filter_map(|m| match BoundedSlice::try_from(&m.msg[..]) {
229				Ok(bounded) => Some(bounded),
230				Err(_) => {
231					defensive!("Inbound Downward message was too long; dropping");
232					None
233				},
234			})
235	}
236}
237
238impl InboundMessage for (ParaId, InboundHrmpMessage) {
239	type CompressedMessage = (ParaId, HashedMessage);
240
241	fn data(&self) -> &[u8] {
242		&self.1.data
243	}
244
245	fn sent_at(&self) -> RelayChainBlockNumber {
246		self.1.sent_at
247	}
248
249	fn to_compressed(&self) -> Self::CompressedMessage {
250		let (sender, message) = self;
251		(*sender, message.into())
252	}
253}
254
255pub type InboundHrmpMessages = InboundMessagesCollection<(ParaId, InboundHrmpMessage)>;
256
257impl InboundHrmpMessages {
258	// Prepare horizontal messages for a more convenient processing:
259	//
260	// Instead of a mapping from a para to a list of inbound HRMP messages, we will have a
261	// list of tuples `(sender, message)` first ordered by `sent_at` (the relay chain block
262	// number in which the message hit the relay-chain) and second ordered by para id
263	// ascending.
264	pub fn from_map(messages_map: BTreeMap<ParaId, Vec<InboundHrmpMessage>>) -> Self {
265		let mut messages = messages_map
266			.into_iter()
267			.flat_map(|(sender, channel_contents)| {
268				channel_contents.into_iter().map(move |message| (sender, message))
269			})
270			.collect::<Vec<_>>();
271		messages.sort_by(|(sender_a, msg_a), (sender_b, msg_b)| {
272			// first sort by sent-at and then by the para id
273			(msg_a.sent_at, sender_a).cmp(&(msg_b.sent_at, sender_b))
274		});
275
276		Self { messages }
277	}
278}
279
280pub type AbridgedInboundHrmpMessages =
281	AbridgedInboundMessagesCollection<(ParaId, InboundHrmpMessage)>;
282
283impl AbridgedInboundHrmpMessages {
284	/// Returns an iterator over the deconstructed messages.
285	pub fn flat_msgs_iter(&self) -> impl Iterator<Item = (ParaId, RelayChainBlockNumber, &[u8])> {
286		self.full_messages
287			.iter()
288			.map(|&(sender, ref message)| (sender, message.sent_at, &message.data[..]))
289	}
290}
291
292/// The basic inherent data that is passed by the collator to the parachain runtime.
293/// This data doesn't contain any messages.
294#[derive(
295	codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
296)]
297pub struct BasicParachainInherentData {
298	pub validation_data: PersistedValidationData,
299	pub relay_chain_state: sp_trie::StorageProof,
300	pub relay_parent_descendants: Vec<RelayHeader>,
301	pub collator_peer_id: Option<ApprovedPeerId>,
302}
303
304/// The messages that are passed by the collator to the parachain runtime as part of the
305/// inherent data.
306#[derive(
307	codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
308)]
309pub struct InboundMessagesData {
310	pub downward_messages: AbridgedInboundDownwardMessages,
311	pub horizontal_messages: AbridgedInboundHrmpMessages,
312}
313
314impl InboundMessagesData {
315	/// Creates a new instance of `InboundMessagesData` with the provided messages.
316	pub fn new(
317		dmq_msgs: AbridgedInboundDownwardMessages,
318		hrmp_msgs: AbridgedInboundHrmpMessages,
319	) -> Self {
320		Self { downward_messages: dmq_msgs, horizontal_messages: hrmp_msgs }
321	}
322}
323
324/// Deconstructs a `ParachainInherentData` instance.
325pub fn deconstruct_parachain_inherent_data(
326	data: ParachainInherentData,
327) -> (BasicParachainInherentData, InboundDownwardMessages, InboundHrmpMessages) {
328	(
329		BasicParachainInherentData {
330			validation_data: data.validation_data,
331			relay_chain_state: data.relay_chain_state,
332			relay_parent_descendants: data.relay_parent_descendants,
333			collator_peer_id: data.collator_peer_id,
334		},
335		InboundDownwardMessages::new(data.downward_messages),
336		InboundHrmpMessages::from_map(data.horizontal_messages),
337	)
338}
339
340#[cfg(test)]
341mod tests {
342	use super::*;
343
344	fn build_inbound_dm_vec(
345		info: &[(RelayChainBlockNumber, usize)],
346	) -> Vec<InboundDownwardMessage<RelayChainBlockNumber>> {
347		let mut messages = vec![];
348		for (sent_at, size) in info.iter() {
349			let data = vec![1; *size];
350			messages.push(InboundDownwardMessage { sent_at: *sent_at, msg: data })
351		}
352		messages
353	}
354
355	#[test]
356	fn drop_processed_messages_works() {
357		let msgs_vec =
358			build_inbound_dm_vec(&[(0, 0), (0, 0), (2, 0), (2, 0), (2, 0), (2, 0), (3, 0)]);
359
360		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
361
362		msgs.drop_processed_messages(&InboundMessageId { sent_at: 3, reverse_idx: 0 });
363		assert_eq!(msgs.messages, []);
364
365		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
366		msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 0 });
367		assert_eq!(msgs.messages, msgs_vec[6..]);
368		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
369		msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 1 });
370		assert_eq!(msgs.messages, msgs_vec[5..]);
371		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
372		msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 4 });
373		assert_eq!(msgs.messages, msgs_vec[2..]);
374
375		// Go back starting from the last message sent at block 2, with 1 more message than the
376		// total number of messages sent at 2.
377		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
378		msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 5 });
379		assert_eq!(msgs.messages, msgs_vec[1..]);
380
381		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
382		msgs.drop_processed_messages(&InboundMessageId { sent_at: 0, reverse_idx: 1 });
383		assert_eq!(msgs.messages, msgs_vec[1..]);
384		// Go back starting from the last message sent at block 0, with 1 more message than the
385		// total number of messages sent at 0.
386		let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
387		msgs.drop_processed_messages(&InboundMessageId { sent_at: 0, reverse_idx: 3 });
388		assert_eq!(msgs.messages, msgs_vec);
389	}
390
391	#[test]
392	fn into_abridged_works() {
393		let msgs = InboundDownwardMessages::new(vec![]);
394		let mut size_limit = 0;
395		let abridged_msgs = msgs.into_abridged(&mut size_limit);
396		assert_eq!(size_limit, 0);
397		assert_eq!(&abridged_msgs.full_messages, &vec![]);
398		assert_eq!(abridged_msgs.hashed_messages, vec![]);
399
400		let msgs_vec = build_inbound_dm_vec(&[(0, 100), (0, 100), (0, 150), (0, 50)]);
401		let msgs = InboundDownwardMessages::new(msgs_vec.clone());
402
403		let mut size_limit = 150;
404		let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
405		assert_eq!(size_limit, 50);
406		assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..1]);
407		assert_eq!(
408			abridged_msgs.hashed_messages,
409			vec![(&msgs_vec[1]).into(), (&msgs_vec[2]).into(), (&msgs_vec[3]).into()]
410		);
411
412		let mut size_limit = 200;
413		let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
414		assert_eq!(size_limit, 0);
415		assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..2]);
416		assert_eq!(
417			abridged_msgs.hashed_messages,
418			vec![(&msgs_vec[2]).into(), (&msgs_vec[3]).into()]
419		);
420
421		let mut size_limit = 399;
422		let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
423		assert_eq!(size_limit, 49);
424		assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..3]);
425		assert_eq!(abridged_msgs.hashed_messages, vec![(&msgs_vec[3]).into()]);
426
427		let mut size_limit = 400;
428		let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
429		assert_eq!(size_limit, 0);
430		assert_eq!(&abridged_msgs.full_messages, &msgs_vec);
431		assert_eq!(abridged_msgs.hashed_messages, vec![]);
432	}
433
434	#[test]
435	fn from_map_works() {
436		let mut messages_map: BTreeMap<ParaId, Vec<InboundHrmpMessage>> = BTreeMap::new();
437		messages_map.insert(
438			1000.into(),
439			vec![
440				InboundHrmpMessage { sent_at: 0, data: vec![0] },
441				InboundHrmpMessage { sent_at: 0, data: vec![1] },
442				InboundHrmpMessage { sent_at: 1, data: vec![2] },
443			],
444		);
445		messages_map.insert(
446			2000.into(),
447			vec![
448				InboundHrmpMessage { sent_at: 0, data: vec![3] },
449				InboundHrmpMessage { sent_at: 0, data: vec![4] },
450				InboundHrmpMessage { sent_at: 1, data: vec![5] },
451			],
452		);
453		messages_map.insert(
454			3000.into(),
455			vec![
456				InboundHrmpMessage { sent_at: 0, data: vec![6] },
457				InboundHrmpMessage { sent_at: 1, data: vec![7] },
458				InboundHrmpMessage { sent_at: 2, data: vec![8] },
459				InboundHrmpMessage { sent_at: 3, data: vec![9] },
460				InboundHrmpMessage { sent_at: 4, data: vec![10] },
461			],
462		);
463
464		let msgs = InboundHrmpMessages::from_map(messages_map);
465		assert_eq!(
466			msgs.messages,
467			[
468				(1000.into(), InboundHrmpMessage { sent_at: 0, data: vec![0] }),
469				(1000.into(), InboundHrmpMessage { sent_at: 0, data: vec![1] }),
470				(2000.into(), InboundHrmpMessage { sent_at: 0, data: vec![3] }),
471				(2000.into(), InboundHrmpMessage { sent_at: 0, data: vec![4] }),
472				(3000.into(), InboundHrmpMessage { sent_at: 0, data: vec![6] }),
473				(1000.into(), InboundHrmpMessage { sent_at: 1, data: vec![2] }),
474				(2000.into(), InboundHrmpMessage { sent_at: 1, data: vec![5] }),
475				(3000.into(), InboundHrmpMessage { sent_at: 1, data: vec![7] }),
476				(3000.into(), InboundHrmpMessage { sent_at: 2, data: vec![8] }),
477				(3000.into(), InboundHrmpMessage { sent_at: 3, data: vec![9] }),
478				(3000.into(), InboundHrmpMessage { sent_at: 4, data: vec![10] })
479			]
480		)
481	}
482
483	#[test]
484	fn check_enough_messages_included_works() {
485		let mut messages = AbridgedInboundHrmpMessages {
486			full_messages: vec![(
487				1000.into(),
488				InboundHrmpMessage { sent_at: 0, data: vec![1; 100] },
489			)],
490			hashed_messages: vec![(
491				2000.into(),
492				HashedMessage { sent_at: 1, msg_hash: Default::default() },
493			)],
494		};
495
496		messages.check_enough_messages_included("Test");
497
498		messages.full_messages = vec![];
499		let result = std::panic::catch_unwind(|| messages.check_enough_messages_included("Test"));
500		assert!(result.is_err());
501
502		messages.hashed_messages = vec![];
503		messages.check_enough_messages_included("Test");
504	}
505}