1use alloc::{collections::btree_map::BTreeMap, vec, vec::Vec};
20use core::fmt::Debug;
21use cumulus_primitives_core::{
22 relay_chain::{
23 ApprovedPeerId, BlockNumber as RelayChainBlockNumber, BlockNumber, Header as RelayHeader,
24 },
25 InboundDownwardMessage, InboundHrmpMessage, ParaId, PersistedValidationData,
26};
27use cumulus_primitives_parachain_inherent::{HashedMessage, ParachainInherentData};
28use frame_support::{
29 defensive,
30 pallet_prelude::{Decode, DecodeWithMemTracking, Encode},
31};
32use scale_info::TypeInfo;
33use sp_core::{bounded::BoundedSlice, Get};
34
35#[derive(
54 Encode,
55 Decode,
56 DecodeWithMemTracking,
57 Clone,
58 Default,
59 sp_runtime::RuntimeDebug,
60 PartialEq,
61 TypeInfo,
62)]
63pub struct InboundMessageId {
64 pub sent_at: BlockNumber,
67 pub reverse_idx: u32,
69}
70
71pub trait InboundMessage {
73 type CompressedMessage: Debug;
77
78 fn data(&self) -> &[u8];
80
81 fn sent_at(&self) -> RelayChainBlockNumber;
84
85 fn to_compressed(&self) -> Self::CompressedMessage;
87}
88
89#[derive(
91 codec::Encode,
92 codec::Decode,
93 codec::DecodeWithMemTracking,
94 sp_core::RuntimeDebug,
95 Clone,
96 PartialEq,
97 TypeInfo,
98)]
99pub struct InboundMessagesCollection<Message: InboundMessage> {
100 messages: Vec<Message>,
101}
102
103impl<Message: InboundMessage> InboundMessagesCollection<Message> {
104 pub fn new(messages: Vec<Message>) -> Self {
106 Self { messages }
107 }
108
109 pub fn drop_processed_messages(&mut self, last_processed_msg: &InboundMessageId) {
111 let mut last_processed_msg_idx = None;
112 let messages = &mut self.messages;
113 for (idx, message) in messages.iter().enumerate().rev() {
114 let sent_at = message.sent_at();
115 if sent_at == last_processed_msg.sent_at {
116 last_processed_msg_idx = idx.checked_sub(last_processed_msg.reverse_idx as usize);
117 break;
118 }
119 if sent_at < last_processed_msg.sent_at {
123 last_processed_msg_idx = Some(idx);
124 break;
125 }
126 }
127 if let Some(last_processed_msg_idx) = last_processed_msg_idx {
128 messages.drain(..=last_processed_msg_idx);
129 }
130 }
131
132 pub fn into_abridged(
138 self,
139 size_limit: &mut usize,
140 ) -> AbridgedInboundMessagesCollection<Message> {
141 let mut messages = self.messages;
142
143 let mut split_off_pos = messages.len();
144 for (idx, message) in messages.iter().enumerate() {
145 if *size_limit < message.data().len() {
146 break;
147 }
148 *size_limit -= message.data().len();
149
150 split_off_pos = idx + 1;
151 }
152
153 let extra_messages = messages.split_off(split_off_pos);
154 let hashed_messages = extra_messages.iter().map(|msg| msg.to_compressed()).collect();
155
156 AbridgedInboundMessagesCollection { full_messages: messages, hashed_messages }
157 }
158}
159
160#[derive(
165 codec::Encode,
166 codec::Decode,
167 codec::DecodeWithMemTracking,
168 sp_core::RuntimeDebug,
169 Clone,
170 PartialEq,
171 TypeInfo,
172)]
173pub struct AbridgedInboundMessagesCollection<Message: InboundMessage> {
174 full_messages: Vec<Message>,
175 hashed_messages: Vec<Message::CompressedMessage>,
176}
177
178impl<Message: InboundMessage> AbridgedInboundMessagesCollection<Message> {
179 pub fn messages(&self) -> (&[Message], &[Message::CompressedMessage]) {
182 (&self.full_messages, &self.hashed_messages)
183 }
184
185 pub fn check_enough_messages_included(&self, collection_name: &str) {
191 if self.hashed_messages.is_empty() {
192 return;
193 }
194
195 assert!(
205 self.full_messages.len() >= 1,
206 "[{}] Advancement rule violation: mandatory messages missing",
207 collection_name,
208 );
209 }
210}
211
212impl<Message: InboundMessage> Default for AbridgedInboundMessagesCollection<Message> {
213 fn default() -> Self {
214 Self { full_messages: vec![], hashed_messages: vec![] }
215 }
216}
217
218impl InboundMessage for InboundDownwardMessage<RelayChainBlockNumber> {
219 type CompressedMessage = HashedMessage;
220
221 fn data(&self) -> &[u8] {
222 &self.msg
223 }
224
225 fn sent_at(&self) -> RelayChainBlockNumber {
226 self.sent_at
227 }
228
229 fn to_compressed(&self) -> Self::CompressedMessage {
230 self.into()
231 }
232}
233
234pub type InboundDownwardMessages =
235 InboundMessagesCollection<InboundDownwardMessage<RelayChainBlockNumber>>;
236
237pub type AbridgedInboundDownwardMessages =
238 AbridgedInboundMessagesCollection<InboundDownwardMessage<RelayChainBlockNumber>>;
239
240impl AbridgedInboundDownwardMessages {
241 pub fn bounded_msgs_iter<MaxMessageLen: Get<u32>>(
243 &self,
244 ) -> impl Iterator<Item = BoundedSlice<u8, MaxMessageLen>> {
245 self.full_messages
246 .iter()
247 .filter_map(|m| match BoundedSlice::try_from(&m.msg[..]) {
250 Ok(bounded) => Some(bounded),
251 Err(_) => {
252 defensive!("Inbound Downward message was too long; dropping");
253 None
254 },
255 })
256 }
257}
258
259impl InboundMessage for (ParaId, InboundHrmpMessage) {
260 type CompressedMessage = (ParaId, HashedMessage);
261
262 fn data(&self) -> &[u8] {
263 &self.1.data
264 }
265
266 fn sent_at(&self) -> RelayChainBlockNumber {
267 self.1.sent_at
268 }
269
270 fn to_compressed(&self) -> Self::CompressedMessage {
271 let (sender, message) = self;
272 (*sender, message.into())
273 }
274}
275
276pub type InboundHrmpMessages = InboundMessagesCollection<(ParaId, InboundHrmpMessage)>;
277
278impl InboundHrmpMessages {
279 pub fn from_map(messages_map: BTreeMap<ParaId, Vec<InboundHrmpMessage>>) -> Self {
286 let mut messages = messages_map
287 .into_iter()
288 .flat_map(|(sender, channel_contents)| {
289 channel_contents.into_iter().map(move |message| (sender, message))
290 })
291 .collect::<Vec<_>>();
292 messages.sort_by(|(sender_a, msg_a), (sender_b, msg_b)| {
293 (msg_a.sent_at, sender_a).cmp(&(msg_b.sent_at, sender_b))
295 });
296
297 Self { messages }
298 }
299}
300
301pub type AbridgedInboundHrmpMessages =
302 AbridgedInboundMessagesCollection<(ParaId, InboundHrmpMessage)>;
303
304impl AbridgedInboundHrmpMessages {
305 pub fn flat_msgs_iter(&self) -> impl Iterator<Item = (ParaId, RelayChainBlockNumber, &[u8])> {
307 self.full_messages
308 .iter()
309 .map(|&(sender, ref message)| (sender, message.sent_at, &message.data[..]))
310 }
311}
312
313#[derive(
316 codec::Encode,
317 codec::Decode,
318 codec::DecodeWithMemTracking,
319 sp_core::RuntimeDebug,
320 Clone,
321 PartialEq,
322 TypeInfo,
323)]
324pub struct BasicParachainInherentData {
325 pub validation_data: PersistedValidationData,
326 pub relay_chain_state: sp_trie::StorageProof,
327 pub relay_parent_descendants: Vec<RelayHeader>,
328 pub collator_peer_id: Option<ApprovedPeerId>,
329}
330
331#[derive(
334 codec::Encode,
335 codec::Decode,
336 codec::DecodeWithMemTracking,
337 sp_core::RuntimeDebug,
338 Clone,
339 PartialEq,
340 TypeInfo,
341)]
342pub struct InboundMessagesData {
343 pub downward_messages: AbridgedInboundDownwardMessages,
344 pub horizontal_messages: AbridgedInboundHrmpMessages,
345}
346
347impl InboundMessagesData {
348 pub fn new(
350 dmq_msgs: AbridgedInboundDownwardMessages,
351 hrmp_msgs: AbridgedInboundHrmpMessages,
352 ) -> Self {
353 Self { downward_messages: dmq_msgs, horizontal_messages: hrmp_msgs }
354 }
355}
356
357pub fn deconstruct_parachain_inherent_data(
359 data: ParachainInherentData,
360) -> (BasicParachainInherentData, InboundDownwardMessages, InboundHrmpMessages) {
361 (
362 BasicParachainInherentData {
363 validation_data: data.validation_data,
364 relay_chain_state: data.relay_chain_state,
365 relay_parent_descendants: data.relay_parent_descendants,
366 collator_peer_id: data.collator_peer_id,
367 },
368 InboundDownwardMessages::new(data.downward_messages),
369 InboundHrmpMessages::from_map(data.horizontal_messages),
370 )
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 fn build_inbound_dm_vec(
378 info: &[(RelayChainBlockNumber, usize)],
379 ) -> Vec<InboundDownwardMessage<RelayChainBlockNumber>> {
380 let mut messages = vec![];
381 for (sent_at, size) in info.iter() {
382 let data = vec![1; *size];
383 messages.push(InboundDownwardMessage { sent_at: *sent_at, msg: data })
384 }
385 messages
386 }
387
388 #[test]
389 fn drop_processed_messages_works() {
390 let msgs_vec =
391 build_inbound_dm_vec(&[(0, 0), (0, 0), (2, 0), (2, 0), (2, 0), (2, 0), (3, 0)]);
392
393 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
394
395 msgs.drop_processed_messages(&InboundMessageId { sent_at: 3, reverse_idx: 0 });
396 assert_eq!(msgs.messages, []);
397
398 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
399 msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 0 });
400 assert_eq!(msgs.messages, msgs_vec[6..]);
401 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
402 msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 1 });
403 assert_eq!(msgs.messages, msgs_vec[5..]);
404 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
405 msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 4 });
406 assert_eq!(msgs.messages, msgs_vec[2..]);
407
408 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
411 msgs.drop_processed_messages(&InboundMessageId { sent_at: 2, reverse_idx: 5 });
412 assert_eq!(msgs.messages, msgs_vec[1..]);
413
414 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
415 msgs.drop_processed_messages(&InboundMessageId { sent_at: 0, reverse_idx: 1 });
416 assert_eq!(msgs.messages, msgs_vec[1..]);
417 let mut msgs = InboundDownwardMessages::new(msgs_vec.clone());
420 msgs.drop_processed_messages(&InboundMessageId { sent_at: 0, reverse_idx: 3 });
421 assert_eq!(msgs.messages, msgs_vec);
422 }
423
424 #[test]
425 fn into_abridged_works() {
426 let msgs = InboundDownwardMessages::new(vec![]);
427 let mut size_limit = 0;
428 let abridged_msgs = msgs.into_abridged(&mut size_limit);
429 assert_eq!(size_limit, 0);
430 assert_eq!(&abridged_msgs.full_messages, &vec![]);
431 assert_eq!(abridged_msgs.hashed_messages, vec![]);
432
433 let msgs_vec = build_inbound_dm_vec(&[(0, 100), (0, 100), (0, 150), (0, 50)]);
434 let msgs = InboundDownwardMessages::new(msgs_vec.clone());
435
436 let mut size_limit = 150;
437 let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
438 assert_eq!(size_limit, 50);
439 assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..1]);
440 assert_eq!(
441 abridged_msgs.hashed_messages,
442 vec![(&msgs_vec[1]).into(), (&msgs_vec[2]).into(), (&msgs_vec[3]).into()]
443 );
444
445 let mut size_limit = 200;
446 let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
447 assert_eq!(size_limit, 0);
448 assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..2]);
449 assert_eq!(
450 abridged_msgs.hashed_messages,
451 vec![(&msgs_vec[2]).into(), (&msgs_vec[3]).into()]
452 );
453
454 let mut size_limit = 399;
455 let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
456 assert_eq!(size_limit, 49);
457 assert_eq!(&abridged_msgs.full_messages, &msgs_vec[..3]);
458 assert_eq!(abridged_msgs.hashed_messages, vec![(&msgs_vec[3]).into()]);
459
460 let mut size_limit = 400;
461 let abridged_msgs = msgs.clone().into_abridged(&mut size_limit);
462 assert_eq!(size_limit, 0);
463 assert_eq!(&abridged_msgs.full_messages, &msgs_vec);
464 assert_eq!(abridged_msgs.hashed_messages, vec![]);
465 }
466
467 #[test]
468 fn from_map_works() {
469 let mut messages_map: BTreeMap<ParaId, Vec<InboundHrmpMessage>> = BTreeMap::new();
470 messages_map.insert(
471 1000.into(),
472 vec![
473 InboundHrmpMessage { sent_at: 0, data: vec![0] },
474 InboundHrmpMessage { sent_at: 0, data: vec![1] },
475 InboundHrmpMessage { sent_at: 1, data: vec![2] },
476 ],
477 );
478 messages_map.insert(
479 2000.into(),
480 vec![
481 InboundHrmpMessage { sent_at: 0, data: vec![3] },
482 InboundHrmpMessage { sent_at: 0, data: vec![4] },
483 InboundHrmpMessage { sent_at: 1, data: vec![5] },
484 ],
485 );
486 messages_map.insert(
487 3000.into(),
488 vec![
489 InboundHrmpMessage { sent_at: 0, data: vec![6] },
490 InboundHrmpMessage { sent_at: 1, data: vec![7] },
491 InboundHrmpMessage { sent_at: 2, data: vec![8] },
492 InboundHrmpMessage { sent_at: 3, data: vec![9] },
493 InboundHrmpMessage { sent_at: 4, data: vec![10] },
494 ],
495 );
496
497 let msgs = InboundHrmpMessages::from_map(messages_map);
498 assert_eq!(
499 msgs.messages,
500 [
501 (1000.into(), InboundHrmpMessage { sent_at: 0, data: vec![0] }),
502 (1000.into(), InboundHrmpMessage { sent_at: 0, data: vec![1] }),
503 (2000.into(), InboundHrmpMessage { sent_at: 0, data: vec![3] }),
504 (2000.into(), InboundHrmpMessage { sent_at: 0, data: vec![4] }),
505 (3000.into(), InboundHrmpMessage { sent_at: 0, data: vec![6] }),
506 (1000.into(), InboundHrmpMessage { sent_at: 1, data: vec![2] }),
507 (2000.into(), InboundHrmpMessage { sent_at: 1, data: vec![5] }),
508 (3000.into(), InboundHrmpMessage { sent_at: 1, data: vec![7] }),
509 (3000.into(), InboundHrmpMessage { sent_at: 2, data: vec![8] }),
510 (3000.into(), InboundHrmpMessage { sent_at: 3, data: vec![9] }),
511 (3000.into(), InboundHrmpMessage { sent_at: 4, data: vec![10] })
512 ]
513 )
514 }
515
516 #[test]
517 fn check_enough_messages_included_works() {
518 let mut messages = AbridgedInboundHrmpMessages {
519 full_messages: vec![(
520 1000.into(),
521 InboundHrmpMessage { sent_at: 0, data: vec![1; 100] },
522 )],
523 hashed_messages: vec![(
524 2000.into(),
525 HashedMessage { sent_at: 1, msg_hash: Default::default() },
526 )],
527 };
528
529 messages.check_enough_messages_included("Test");
530
531 messages.full_messages = vec![];
532 let result = std::panic::catch_unwind(|| messages.check_enough_messages_included("Test"));
533 assert!(result.is_err());
534
535 messages.hashed_messages = vec![];
536 messages.check_enough_messages_included("Test");
537 }
538}