1use alloc::{collections::btree_map::BTreeMap, vec, vec::Vec};
20use core::fmt::Debug;
21use cumulus_primitives_core::{
22 relay_chain::{
23 ApprovedPeerId, BlockNumber as RelayChainBlockNumber, BlockNumber, Header as RelayHeader,
24 },
25 InboundDownwardMessage, InboundHrmpMessage, ParaId, PersistedValidationData,
26};
27use cumulus_primitives_parachain_inherent::{HashedMessage, ParachainInherentData};
28use frame_support::{
29 defensive,
30 pallet_prelude::{Decode, DecodeWithMemTracking, Encode},
31};
32use scale_info::TypeInfo;
33use sp_core::{bounded::BoundedSlice, Get};
34
35#[derive(Encode, Decode, DecodeWithMemTracking, Clone, Default, Debug, PartialEq, TypeInfo)]
54pub struct InboundMessageId {
55 pub sent_at: BlockNumber,
58 pub reverse_idx: u32,
60}
61
62pub trait InboundMessage {
64 type CompressedMessage: Debug;
68
69 fn data(&self) -> &[u8];
71
72 fn sent_at(&self) -> RelayChainBlockNumber;
75
76 fn to_compressed(&self) -> Self::CompressedMessage;
78}
79
80#[derive(
82 codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
83)]
84pub struct InboundMessagesCollection<Message: InboundMessage> {
85 messages: Vec<Message>,
86}
87
88impl<Message: InboundMessage> InboundMessagesCollection<Message> {
89 pub fn new(messages: Vec<Message>) -> Self {
91 Self { messages }
92 }
93
94 pub fn drop_processed_messages(&mut self, last_processed_msg: &InboundMessageId) {
96 let mut last_processed_msg_idx = None;
97 let messages = &mut self.messages;
98 for (idx, message) in messages.iter().enumerate().rev() {
99 let sent_at = message.sent_at();
100 if sent_at == last_processed_msg.sent_at {
101 last_processed_msg_idx = idx.checked_sub(last_processed_msg.reverse_idx as usize);
102 break;
103 }
104 if sent_at < last_processed_msg.sent_at {
108 last_processed_msg_idx = Some(idx);
109 break;
110 }
111 }
112 if let Some(last_processed_msg_idx) = last_processed_msg_idx {
113 messages.drain(..=last_processed_msg_idx);
114 }
115 }
116
117 pub fn into_abridged(
123 self,
124 size_limit: &mut usize,
125 ) -> AbridgedInboundMessagesCollection<Message> {
126 let mut messages = self.messages;
127
128 let mut split_off_pos = messages.len();
129 for (idx, message) in messages.iter().enumerate() {
130 if *size_limit < message.data().len() {
131 break;
132 }
133 *size_limit -= message.data().len();
134
135 split_off_pos = idx + 1;
136 }
137
138 let extra_messages = messages.split_off(split_off_pos);
139 let hashed_messages = extra_messages.iter().map(|msg| msg.to_compressed()).collect();
140
141 AbridgedInboundMessagesCollection { full_messages: messages, hashed_messages }
142 }
143}
144
145#[derive(
150 codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
151)]
152pub struct AbridgedInboundMessagesCollection<Message: InboundMessage> {
153 full_messages: Vec<Message>,
154 hashed_messages: Vec<Message::CompressedMessage>,
155}
156
157impl<Message: InboundMessage> AbridgedInboundMessagesCollection<Message> {
158 pub fn messages(&self) -> (&[Message], &[Message::CompressedMessage]) {
161 (&self.full_messages, &self.hashed_messages)
162 }
163
164 pub fn check_enough_messages_included(&self, collection_name: &str) {
170 if self.hashed_messages.is_empty() {
171 return;
172 }
173
174 assert!(
184 self.full_messages.len() >= 1,
185 "[{}] Advancement rule violation: mandatory messages missing",
186 collection_name,
187 );
188 }
189}
190
191impl<Message: InboundMessage> Default for AbridgedInboundMessagesCollection<Message> {
192 fn default() -> Self {
193 Self { full_messages: vec![], hashed_messages: vec![] }
194 }
195}
196
197impl InboundMessage for InboundDownwardMessage<RelayChainBlockNumber> {
198 type CompressedMessage = HashedMessage;
199
200 fn data(&self) -> &[u8] {
201 &self.msg
202 }
203
204 fn sent_at(&self) -> RelayChainBlockNumber {
205 self.sent_at
206 }
207
208 fn to_compressed(&self) -> Self::CompressedMessage {
209 self.into()
210 }
211}
212
213pub type InboundDownwardMessages =
214 InboundMessagesCollection<InboundDownwardMessage<RelayChainBlockNumber>>;
215
216pub type AbridgedInboundDownwardMessages =
217 AbridgedInboundMessagesCollection<InboundDownwardMessage<RelayChainBlockNumber>>;
218
219impl AbridgedInboundDownwardMessages {
220 pub fn bounded_msgs_iter<MaxMessageLen: Get<u32>>(
222 &self,
223 ) -> impl Iterator<Item = BoundedSlice<'_, u8, MaxMessageLen>> {
224 self.full_messages
225 .iter()
226 .filter_map(|m| match BoundedSlice::try_from(&m.msg[..]) {
229 Ok(bounded) => Some(bounded),
230 Err(_) => {
231 defensive!("Inbound Downward message was too long; dropping");
232 None
233 },
234 })
235 }
236}
237
238impl InboundMessage for (ParaId, InboundHrmpMessage) {
239 type CompressedMessage = (ParaId, HashedMessage);
240
241 fn data(&self) -> &[u8] {
242 &self.1.data
243 }
244
245 fn sent_at(&self) -> RelayChainBlockNumber {
246 self.1.sent_at
247 }
248
249 fn to_compressed(&self) -> Self::CompressedMessage {
250 let (sender, message) = self;
251 (*sender, message.into())
252 }
253}
254
255pub type InboundHrmpMessages = InboundMessagesCollection<(ParaId, InboundHrmpMessage)>;
256
257impl InboundHrmpMessages {
258 pub fn from_map(messages_map: BTreeMap<ParaId, Vec<InboundHrmpMessage>>) -> Self {
265 let mut messages = messages_map
266 .into_iter()
267 .flat_map(|(sender, channel_contents)| {
268 channel_contents.into_iter().map(move |message| (sender, message))
269 })
270 .collect::<Vec<_>>();
271 messages.sort_by(|(sender_a, msg_a), (sender_b, msg_b)| {
272 (msg_a.sent_at, sender_a).cmp(&(msg_b.sent_at, sender_b))
274 });
275
276 Self { messages }
277 }
278}
279
280pub type AbridgedInboundHrmpMessages =
281 AbridgedInboundMessagesCollection<(ParaId, InboundHrmpMessage)>;
282
283impl AbridgedInboundHrmpMessages {
284 pub fn flat_msgs_iter(&self) -> impl Iterator<Item = (ParaId, RelayChainBlockNumber, &[u8])> {
286 self.full_messages
287 .iter()
288 .map(|&(sender, ref message)| (sender, message.sent_at, &message.data[..]))
289 }
290}
291
292#[derive(
295 codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
296)]
297pub struct BasicParachainInherentData {
298 pub validation_data: PersistedValidationData,
299 pub relay_chain_state: sp_trie::StorageProof,
300 pub relay_parent_descendants: Vec<RelayHeader>,
301 pub collator_peer_id: Option<ApprovedPeerId>,
302}
303
304#[derive(
307 codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
308)]
309pub struct InboundMessagesData {
310 pub downward_messages: AbridgedInboundDownwardMessages,
311 pub horizontal_messages: AbridgedInboundHrmpMessages,
312}
313
314impl InboundMessagesData {
315 pub fn new(
317 dmq_msgs: AbridgedInboundDownwardMessages,
318 hrmp_msgs: AbridgedInboundHrmpMessages,
319 ) -> Self {
320 Self { downward_messages: dmq_msgs, horizontal_messages: hrmp_msgs }
321 }
322}
323
324pub fn deconstruct_parachain_inherent_data(
326 data: ParachainInherentData,
327) -> (BasicParachainInherentData, InboundDownwardMessages, InboundHrmpMessages) {
328 (
329 BasicParachainInherentData {
330 validation_data: data.validation_data,
331 relay_chain_state: data.relay_chain_state,
332 relay_parent_descendants: data.relay_parent_descendants,
333 collator_peer_id: data.collator_peer_id,
334 },
335 InboundDownwardMessages::new(data.downward_messages),
336 InboundHrmpMessages::from_map(data.horizontal_messages),
337 )
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343
344 fn build_inbound_dm_vec(
345 info: &[(RelayChainBlockNumber, usize)],
346 ) -> Vec<InboundDownwardMessage<RelayChainBlockNumber>> {
347 let mut messages = vec![];
348 for (sent_at, size) in info.iter() {
349 let data = vec![1; *size];
350 messages.push(InboundDownwardMessage { sent_at: *sent_at, msg: data })
351 }
352 messages
353 }
354
355 #[test]
356 fn drop_processed_messages_works() {
357 let msgs_vec =
358 build_inbound_dm_vec(&[(0, 0), (0, 0), (2, 0), (2, 0), (2, 0), (2, 0), (3, 0)]);
359
360 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
361
362 msgs.drop_processed_messages(&InboundMessageId { sent_at: 3, reverse_idx: 0 });
363 assert_eq!(msgs.messages, []);
364
365 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
366 msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 0 });
367 assert_eq!(msgs.messages, msgs_vec[6..]);
368 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
369 msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 1 });
370 assert_eq!(msgs.messages, msgs_vec[5..]);
371 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
372 msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 4 });
373 assert_eq!(msgs.messages, msgs_vec[2..]);
374
375 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
378 msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 5 });
379 assert_eq!(msgs.messages, msgs_vec[1..]);
380
381 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
382 msgs.drop_processed_messages(&InboundMessageId { sent_at: 0, reverse_idx: 1 });
383 assert_eq!(msgs.messages, msgs_vec[1..]);
384 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
387 msgs.drop_processed_messages(&InboundMessageId { sent_at: 0, reverse_idx: 3 });
388 assert_eq!(msgs.messages, msgs_vec);
389 }
390
391 #[test]
392 fn into_abridged_works() {
393 let msgs = InboundDownwardMessages::new(vec![]);
394 let mut size_limit = 0;
395 let abridged_msgs = msgs.into_abridged(&mut size_limit);
396 assert_eq!(size_limit, 0);
397 assert_eq!(&abridged_msgs.full_messages, &vec![]);
398 assert_eq!(abridged_msgs.hashed_messages, vec![]);
399
400 let msgs_vec = build_inbound_dm_vec(&[(0, 100), (0, 100), (0, 150), (0, 50)]);
401 let msgs = InboundDownwardMessages::new(msgs_vec.clone());
402
403 let mut size_limit = 150;
404 let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
405 assert_eq!(size_limit, 50);
406 assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..1]);
407 assert_eq!(
408 abridged_msgs.hashed_messages,
409 vec![(&msgs_vec[1]).into(), (&msgs_vec[2]).into(), (&msgs_vec[3]).into()]
410 );
411
412 let mut size_limit = 200;
413 let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
414 assert_eq!(size_limit, 0);
415 assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..2]);
416 assert_eq!(
417 abridged_msgs.hashed_messages,
418 vec![(&msgs_vec[2]).into(), (&msgs_vec[3]).into()]
419 );
420
421 let mut size_limit = 399;
422 let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
423 assert_eq!(size_limit, 49);
424 assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..3]);
425 assert_eq!(abridged_msgs.hashed_messages, vec![(&msgs_vec[3]).into()]);
426
427 let mut size_limit = 400;
428 let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
429 assert_eq!(size_limit, 0);
430 assert_eq!(&abridged_msgs.full_messages, &msgs_vec);
431 assert_eq!(abridged_msgs.hashed_messages, vec![]);
432 }
433
434 #[test]
435 fn from_map_works() {
436 let mut messages_map: BTreeMap<ParaId, Vec<InboundHrmpMessage>> = BTreeMap::new();
437 messages_map.insert(
438 1000.into(),
439 vec![
440 InboundHrmpMessage { sent_at: 0, data: vec![0] },
441 InboundHrmpMessage { sent_at: 0, data: vec![1] },
442 InboundHrmpMessage { sent_at: 1, data: vec![2] },
443 ],
444 );
445 messages_map.insert(
446 2000.into(),
447 vec![
448 InboundHrmpMessage { sent_at: 0, data: vec![3] },
449 InboundHrmpMessage { sent_at: 0, data: vec![4] },
450 InboundHrmpMessage { sent_at: 1, data: vec![5] },
451 ],
452 );
453 messages_map.insert(
454 3000.into(),
455 vec![
456 InboundHrmpMessage { sent_at: 0, data: vec![6] },
457 InboundHrmpMessage { sent_at: 1, data: vec![7] },
458 InboundHrmpMessage { sent_at: 2, data: vec![8] },
459 InboundHrmpMessage { sent_at: 3, data: vec![9] },
460 InboundHrmpMessage { sent_at: 4, data: vec![10] },
461 ],
462 );
463
464 let msgs = InboundHrmpMessages::from_map(messages_map);
465 assert_eq!(
466 msgs.messages,
467 [
468 (1000.into(), InboundHrmpMessage { sent_at: 0, data: vec![0] }),
469 (1000.into(), InboundHrmpMessage { sent_at: 0, data: vec![1] }),
470 (2000.into(), InboundHrmpMessage { sent_at: 0, data: vec![3] }),
471 (2000.into(), InboundHrmpMessage { sent_at: 0, data: vec![4] }),
472 (3000.into(), InboundHrmpMessage { sent_at: 0, data: vec![6] }),
473 (1000.into(), InboundHrmpMessage { sent_at: 1, data: vec![2] }),
474 (2000.into(), InboundHrmpMessage { sent_at: 1, data: vec![5] }),
475 (3000.into(), InboundHrmpMessage { sent_at: 1, data: vec![7] }),
476 (3000.into(), InboundHrmpMessage { sent_at: 2, data: vec![8] }),
477 (3000.into(), InboundHrmpMessage { sent_at: 3, data: vec![9] }),
478 (3000.into(), InboundHrmpMessage { sent_at: 4, data: vec![10] })
479 ]
480 )
481 }
482
483 #[test]
484 fn check_enough_messages_included_works() {
485 let mut messages = AbridgedInboundHrmpMessages {
486 full_messages: vec![(
487 1000.into(),
488 InboundHrmpMessage { sent_at: 0, data: vec![1; 100] },
489 )],
490 hashed_messages: vec![(
491 2000.into(),
492 HashedMessage { sent_at: 1, msg_hash: Default::default() },
493 )],
494 };
495
496 messages.check_enough_messages_included("Test");
497
498 messages.full_messages = vec![];
499 let result = std::panic::catch_unwind(|| messages.check_enough_messages_included("Test"));
500 assert!(result.is_err());
501
502 messages.hashed_messages = vec![];
503 messages.check_enough_messages_included("Test");
504 }
505}