1use alloc::{collections::btree_map::BTreeMap, vec, vec::Vec};
20use core::fmt::Debug;
21use cumulus_primitives_core::{
22 relay_chain::{
23 ApprovedPeerId, BlockNumber as RelayChainBlockNumber, BlockNumber, Header as RelayHeader,
24 },
25 InboundDownwardMessage, InboundHrmpMessage, ParaId, PersistedValidationData,
26};
27use cumulus_primitives_parachain_inherent::{HashedMessage, ParachainInherentData};
28use frame_support::{
29 defensive,
30 pallet_prelude::{Decode, DecodeWithMemTracking, Encode},
31};
32use scale_info::TypeInfo;
33use sp_core::{bounded::BoundedSlice, Get};
34
35#[derive(Encode, Decode, DecodeWithMemTracking, Clone, Default, Debug, PartialEq, TypeInfo)]
54pub struct InboundMessageId {
55 pub sent_at: BlockNumber,
58 pub reverse_idx: u32,
60}
61
62pub trait InboundMessage {
64 type CompressedMessage: Debug;
68
69 fn data(&self) -> &[u8];
71
72 fn sent_at(&self) -> RelayChainBlockNumber;
75
76 fn to_compressed(&self) -> Self::CompressedMessage;
78}
79
80#[derive(
82 codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
83)]
84pub struct InboundMessagesCollection<Message: InboundMessage> {
85 messages: Vec<Message>,
86}
87
88impl<Message: InboundMessage> InboundMessagesCollection<Message> {
89 pub fn new(messages: Vec<Message>) -> Self {
91 Self { messages }
92 }
93
94 pub fn drop_processed_messages(&mut self, last_processed_msg: &InboundMessageId) {
96 let mut last_processed_msg_idx = None;
97 let messages = &mut self.messages;
98 for (idx, message) in messages.iter().enumerate().rev() {
99 let sent_at = message.sent_at();
100 if sent_at == last_processed_msg.sent_at {
101 last_processed_msg_idx = idx.checked_sub(last_processed_msg.reverse_idx as usize);
102 break;
103 }
104 if sent_at < last_processed_msg.sent_at {
108 last_processed_msg_idx = Some(idx);
109 break;
110 }
111 }
112 if let Some(last_processed_msg_idx) = last_processed_msg_idx {
113 messages.drain(..=last_processed_msg_idx);
114 }
115 }
116
117 pub fn into_abridged(
123 self,
124 size_limit: &mut usize,
125 ) -> AbridgedInboundMessagesCollection<Message> {
126 let mut messages = self.messages;
127
128 let mut split_off_pos = messages.len();
129 for (idx, message) in messages.iter().enumerate() {
130 if *size_limit < message.data().len() {
131 break;
132 }
133 *size_limit -= message.data().len();
134
135 split_off_pos = idx + 1;
136 }
137
138 let extra_messages = messages.split_off(split_off_pos);
139 let hashed_messages = extra_messages.iter().map(|msg| msg.to_compressed()).collect();
140
141 AbridgedInboundMessagesCollection { full_messages: messages, hashed_messages }
142 }
143}
144
145pub struct AbridgedInboundMessagesSizeInfo {
147 pub max_full_messages_size: usize,
149 pub first_hashed_msg_max_size: usize,
151}
152
153#[derive(
158 codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
159)]
160pub struct AbridgedInboundMessagesCollection<Message: InboundMessage> {
161 full_messages: Vec<Message>,
162 hashed_messages: Vec<Message::CompressedMessage>,
163}
164
165impl<Message: InboundMessage> AbridgedInboundMessagesCollection<Message> {
166 pub fn messages(&self) -> (&[Message], &[Message::CompressedMessage]) {
169 (&self.full_messages, &self.hashed_messages)
170 }
171
172 pub fn check_enough_messages_included_basic(&self, collection_name: &str) {
174 if self.hashed_messages.is_empty() {
175 return;
176 }
177
178 assert!(
180 self.full_messages.len() >= 1,
181 "[{}] Advancement rule violation: full messages missing",
182 collection_name,
183 );
184 }
185
186 pub fn check_enough_messages_included_advanced(
193 &self,
194 collection_name: &str,
195 size_info: AbridgedInboundMessagesSizeInfo,
196 ) {
197 let AbridgedInboundMessagesSizeInfo { max_full_messages_size, first_hashed_msg_max_size } =
200 size_info;
201
202 let mut full_messages_size = 0usize;
203 for msg in &self.full_messages {
204 full_messages_size = full_messages_size.saturating_add(msg.data().len());
205 }
206
207 assert!(
210 full_messages_size.saturating_add(first_hashed_msg_max_size) > max_full_messages_size,
211 "[{}] Advancement rule violation: full messages size smaller than expected. \
212 full msgs size: {}, first hashed msg max size: {}, max full msgs size: {}",
213 collection_name,
214 full_messages_size,
215 first_hashed_msg_max_size,
216 max_full_messages_size
217 );
218 }
219}
220
221impl<Message: InboundMessage> Default for AbridgedInboundMessagesCollection<Message> {
222 fn default() -> Self {
223 Self { full_messages: vec![], hashed_messages: vec![] }
224 }
225}
226
227impl InboundMessage for InboundDownwardMessage<RelayChainBlockNumber> {
228 type CompressedMessage = HashedMessage;
229
230 fn data(&self) -> &[u8] {
231 &self.msg
232 }
233
234 fn sent_at(&self) -> RelayChainBlockNumber {
235 self.sent_at
236 }
237
238 fn to_compressed(&self) -> Self::CompressedMessage {
239 self.into()
240 }
241}
242
243pub type InboundDownwardMessages =
244 InboundMessagesCollection<InboundDownwardMessage<RelayChainBlockNumber>>;
245
246pub type AbridgedInboundDownwardMessages =
247 AbridgedInboundMessagesCollection<InboundDownwardMessage<RelayChainBlockNumber>>;
248
249impl AbridgedInboundDownwardMessages {
250 pub fn bounded_msgs_iter<MaxMessageLen: Get<u32>>(
252 &self,
253 ) -> impl Iterator<Item = BoundedSlice<'_, u8, MaxMessageLen>> {
254 self.full_messages
255 .iter()
256 .filter_map(|m| match BoundedSlice::try_from(&m.msg[..]) {
259 Ok(bounded) => Some(bounded),
260 Err(_) => {
261 defensive!("Inbound Downward message was too long; dropping");
262 None
263 },
264 })
265 }
266}
267
268impl InboundMessage for (ParaId, InboundHrmpMessage) {
269 type CompressedMessage = (ParaId, HashedMessage);
270
271 fn data(&self) -> &[u8] {
272 &self.1.data
273 }
274
275 fn sent_at(&self) -> RelayChainBlockNumber {
276 self.1.sent_at
277 }
278
279 fn to_compressed(&self) -> Self::CompressedMessage {
280 let (sender, message) = self;
281 (*sender, message.into())
282 }
283}
284
285pub type InboundHrmpMessages = InboundMessagesCollection<(ParaId, InboundHrmpMessage)>;
286
287impl InboundHrmpMessages {
288 pub fn from_map(messages_map: BTreeMap<ParaId, Vec<InboundHrmpMessage>>) -> Self {
295 let mut messages = messages_map
296 .into_iter()
297 .flat_map(|(sender, channel_contents)| {
298 channel_contents.into_iter().map(move |message| (sender, message))
299 })
300 .collect::<Vec<_>>();
301 messages.sort_by(|(sender_a, msg_a), (sender_b, msg_b)| {
302 (msg_a.sent_at, sender_a).cmp(&(msg_b.sent_at, sender_b))
304 });
305
306 Self { messages }
307 }
308}
309
310pub type AbridgedInboundHrmpMessages =
311 AbridgedInboundMessagesCollection<(ParaId, InboundHrmpMessage)>;
312
313impl AbridgedInboundHrmpMessages {
314 pub fn flat_msgs_iter(&self) -> impl Iterator<Item = (ParaId, RelayChainBlockNumber, &[u8])> {
316 self.full_messages
317 .iter()
318 .map(|&(sender, ref message)| (sender, message.sent_at, &message.data[..]))
319 }
320}
321
322#[derive(
325 codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
326)]
327pub struct BasicParachainInherentData {
328 pub validation_data: PersistedValidationData,
329 pub relay_chain_state: sp_trie::StorageProof,
330 pub relay_parent_descendants: Vec<RelayHeader>,
331 pub collator_peer_id: Option<ApprovedPeerId>,
332}
333
334#[derive(
337 codec::Encode, codec::Decode, codec::DecodeWithMemTracking, Debug, Clone, PartialEq, TypeInfo,
338)]
339pub struct InboundMessagesData {
340 pub downward_messages: AbridgedInboundDownwardMessages,
341 pub horizontal_messages: AbridgedInboundHrmpMessages,
342}
343
344impl InboundMessagesData {
345 pub fn new(
347 dmq_msgs: AbridgedInboundDownwardMessages,
348 hrmp_msgs: AbridgedInboundHrmpMessages,
349 ) -> Self {
350 Self { downward_messages: dmq_msgs, horizontal_messages: hrmp_msgs }
351 }
352}
353
354pub fn deconstruct_parachain_inherent_data(
356 data: ParachainInherentData,
357) -> (BasicParachainInherentData, InboundDownwardMessages, InboundHrmpMessages) {
358 (
359 BasicParachainInherentData {
360 validation_data: data.validation_data,
361 relay_chain_state: data.relay_chain_state,
362 relay_parent_descendants: data.relay_parent_descendants,
363 collator_peer_id: data.collator_peer_id,
364 },
365 InboundDownwardMessages::new(data.downward_messages),
366 InboundHrmpMessages::from_map(data.horizontal_messages),
367 )
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373
374 fn build_inbound_dm_vec(
375 info: &[(RelayChainBlockNumber, usize)],
376 ) -> Vec<InboundDownwardMessage<RelayChainBlockNumber>> {
377 let mut messages = vec![];
378 for (sent_at, size) in info.iter() {
379 let data = vec![1; *size];
380 messages.push(InboundDownwardMessage { sent_at: *sent_at, msg: data })
381 }
382 messages
383 }
384
385 #[test]
386 fn drop_processed_messages_works() {
387 let msgs_vec =
388 build_inbound_dm_vec(&[(0, 0), (0, 0), (2, 0), (2, 0), (2, 0), (2, 0), (3, 0)]);
389
390 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
391
392 msgs.drop_processed_messages(&InboundMessageId { sent_at: 3, reverse_idx: 0 });
393 assert_eq!(msgs.messages, []);
394
395 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
396 msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 0 });
397 assert_eq!(msgs.messages, msgs_vec[6..]);
398 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
399 msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 1 });
400 assert_eq!(msgs.messages, msgs_vec[5..]);
401 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
402 msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 4 });
403 assert_eq!(msgs.messages, msgs_vec[2..]);
404
405 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
408 msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 5 });
409 assert_eq!(msgs.messages, msgs_vec[1..]);
410
411 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
412 msgs.drop_processed_messages(&InboundMessageId { sent_at: 0, reverse_idx: 1 });
413 assert_eq!(msgs.messages, msgs_vec[1..]);
414 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
417 msgs.drop_processed_messages(&InboundMessageId { sent_at: 0, reverse_idx: 3 });
418 assert_eq!(msgs.messages, msgs_vec);
419 }
420
421 #[test]
422 fn into_abridged_works() {
423 let msgs = InboundDownwardMessages::new(vec![]);
424 let mut size_limit = 0;
425 let abridged_msgs = msgs.into_abridged(&mut size_limit);
426 assert_eq!(size_limit, 0);
427 assert_eq!(&abridged_msgs.full_messages, &vec![]);
428 assert_eq!(abridged_msgs.hashed_messages, vec![]);
429
430 let msgs_vec = build_inbound_dm_vec(&[(0, 100), (0, 100), (0, 150), (0, 50)]);
431 let msgs = InboundDownwardMessages::new(msgs_vec.clone());
432
433 let mut size_limit = 150;
434 let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
435 assert_eq!(size_limit, 50);
436 assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..1]);
437 assert_eq!(
438 abridged_msgs.hashed_messages,
439 vec![(&msgs_vec[1]).into(), (&msgs_vec[2]).into(), (&msgs_vec[3]).into()]
440 );
441
442 let mut size_limit = 200;
443 let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
444 assert_eq!(size_limit, 0);
445 assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..2]);
446 assert_eq!(
447 abridged_msgs.hashed_messages,
448 vec![(&msgs_vec[2]).into(), (&msgs_vec[3]).into()]
449 );
450
451 let mut size_limit = 399;
452 let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
453 assert_eq!(size_limit, 49);
454 assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..3]);
455 assert_eq!(abridged_msgs.hashed_messages, vec![(&msgs_vec[3]).into()]);
456
457 let mut size_limit = 400;
458 let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
459 assert_eq!(size_limit, 0);
460 assert_eq!(&abridged_msgs.full_messages, &msgs_vec);
461 assert_eq!(abridged_msgs.hashed_messages, vec![]);
462 }
463
464 #[test]
465 fn from_map_works() {
466 let mut messages_map: BTreeMap<ParaId, Vec<InboundHrmpMessage>> = BTreeMap::new();
467 messages_map.insert(
468 1000.into(),
469 vec![
470 InboundHrmpMessage { sent_at: 0, data: vec![0] },
471 InboundHrmpMessage { sent_at: 0, data: vec![1] },
472 InboundHrmpMessage { sent_at: 1, data: vec![2] },
473 ],
474 );
475 messages_map.insert(
476 2000.into(),
477 vec![
478 InboundHrmpMessage { sent_at: 0, data: vec![3] },
479 InboundHrmpMessage { sent_at: 0, data: vec![4] },
480 InboundHrmpMessage { sent_at: 1, data: vec![5] },
481 ],
482 );
483 messages_map.insert(
484 3000.into(),
485 vec![
486 InboundHrmpMessage { sent_at: 0, data: vec![6] },
487 InboundHrmpMessage { sent_at: 1, data: vec![7] },
488 InboundHrmpMessage { sent_at: 2, data: vec![8] },
489 InboundHrmpMessage { sent_at: 3, data: vec![9] },
490 InboundHrmpMessage { sent_at: 4, data: vec![10] },
491 ],
492 );
493
494 let msgs = InboundHrmpMessages::from_map(messages_map);
495 assert_eq!(
496 msgs.messages,
497 [
498 (1000.into(), InboundHrmpMessage { sent_at: 0, data: vec![0] }),
499 (1000.into(), InboundHrmpMessage { sent_at: 0, data: vec![1] }),
500 (2000.into(), InboundHrmpMessage { sent_at: 0, data: vec![3] }),
501 (2000.into(), InboundHrmpMessage { sent_at: 0, data: vec![4] }),
502 (3000.into(), InboundHrmpMessage { sent_at: 0, data: vec![6] }),
503 (1000.into(), InboundHrmpMessage { sent_at: 1, data: vec![2] }),
504 (2000.into(), InboundHrmpMessage { sent_at: 1, data: vec![5] }),
505 (3000.into(), InboundHrmpMessage { sent_at: 1, data: vec![7] }),
506 (3000.into(), InboundHrmpMessage { sent_at: 2, data: vec![8] }),
507 (3000.into(), InboundHrmpMessage { sent_at: 3, data: vec![9] }),
508 (3000.into(), InboundHrmpMessage { sent_at: 4, data: vec![10] })
509 ]
510 )
511 }
512
513 #[test]
514 fn check_enough_messages_included_basic_works() {
515 let mut messages = AbridgedInboundHrmpMessages {
516 full_messages: vec![(
517 1000.into(),
518 InboundHrmpMessage { sent_at: 0, data: vec![1; 100] },
519 )],
520 hashed_messages: vec![(
521 2000.into(),
522 HashedMessage { sent_at: 1, msg_hash: Default::default() },
523 )],
524 };
525
526 messages.check_enough_messages_included_basic("Test");
527
528 messages.full_messages = vec![];
529 let result =
530 std::panic::catch_unwind(|| messages.check_enough_messages_included_basic("Test"));
531 assert!(result.is_err());
532
533 messages.hashed_messages = vec![];
534 messages.check_enough_messages_included_basic("Test");
535 }
536
537 #[test]
538 fn check_enough_messages_included_advanced_works() {
539 let mixed_messages = AbridgedInboundHrmpMessages {
540 full_messages: vec![(
541 1000.into(),
542 InboundHrmpMessage { sent_at: 0, data: vec![1; 50] },
543 )],
544 hashed_messages: vec![(
545 2000.into(),
546 HashedMessage { sent_at: 1, msg_hash: Default::default() },
547 )],
548 };
549 let result = std::panic::catch_unwind(|| {
550 mixed_messages.check_enough_messages_included_advanced(
551 "Test",
552 AbridgedInboundMessagesSizeInfo {
553 max_full_messages_size: 100,
554 first_hashed_msg_max_size: 50,
555 },
556 )
557 });
558 assert!(result.is_err());
559 mixed_messages.check_enough_messages_included_advanced(
560 "Test",
561 AbridgedInboundMessagesSizeInfo {
562 max_full_messages_size: 100,
563 first_hashed_msg_max_size: 51,
564 },
565 );
566 }
567}