referrerpolicy=no-referrer-when-downgrade

cumulus_pallet_parachain_system/
unincluded_segment.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17//! Primitives used for tracking message queues constraints in an unincluded block segment
18//! of the parachain.
19//!
20//! Unincluded segment describes a chain of latest included block descendants, which are not yet
21//! sent to relay chain.
22
23use super::relay_state_snapshot::{MessagingStateSnapshot, RelayDispatchQueueRemainingCapacity};
24use alloc::collections::btree_map::BTreeMap;
25use codec::{Decode, Encode};
26use core::marker::PhantomData;
27use cumulus_primitives_core::{relay_chain, ParaId};
28use scale_info::TypeInfo;
29use sp_runtime::RuntimeDebug;
30
31/// Constraints on outbound HRMP channel.
32#[derive(Clone, RuntimeDebug)]
33pub struct HrmpOutboundLimits {
34	/// The maximum bytes that can be written to the channel.
35	pub bytes_remaining: u32,
36	/// The maximum messages that can be written to the channel.
37	pub messages_remaining: u32,
38}
39
40/// Limits on outbound message bandwidth.
41#[derive(Clone, RuntimeDebug)]
42pub struct OutboundBandwidthLimits {
43	/// The amount of UMP messages remaining.
44	pub ump_messages_remaining: u32,
45	/// The amount of UMP bytes remaining.
46	pub ump_bytes_remaining: u32,
47	/// The limitations of all registered outbound HRMP channels.
48	pub hrmp_outgoing: BTreeMap<ParaId, HrmpOutboundLimits>,
49}
50
51impl OutboundBandwidthLimits {
52	/// Creates new limits from the messaging state and upward message queue maximums fetched
53	/// from the host configuration.
54	///
55	/// These will be the total bandwidth limits across the entire unincluded segment.
56	pub fn from_relay_chain_state(messaging_state: &MessagingStateSnapshot) -> Self {
57		let RelayDispatchQueueRemainingCapacity { remaining_count, remaining_size } =
58			messaging_state.relay_dispatch_queue_remaining_capacity;
59
60		let hrmp_outgoing = messaging_state
61			.egress_channels
62			.iter()
63			.map(|(id, channel)| {
64				(
65					*id,
66					HrmpOutboundLimits {
67						bytes_remaining: channel.max_total_size.saturating_sub(channel.total_size),
68						messages_remaining: channel.max_capacity.saturating_sub(channel.msg_count),
69					},
70				)
71			})
72			.collect();
73
74		Self {
75			ump_messages_remaining: remaining_count,
76			ump_bytes_remaining: remaining_size,
77			hrmp_outgoing,
78		}
79	}
80}
81
82/// The error type for updating bandwidth used by a segment.
83#[derive(RuntimeDebug)]
84#[cfg_attr(test, derive(PartialEq))]
85pub enum BandwidthUpdateError {
86	/// Too many messages submitted to HRMP channel.
87	HrmpMessagesOverflow {
88		/// Parachain id of the recipient.
89		recipient: ParaId,
90		/// The amount of remaining messages in the capacity of the channel.
91		messages_remaining: u32,
92		/// The amount of messages submitted to the channel.
93		messages_submitted: u32,
94	},
95	/// Too many bytes submitted to HRMP channel.
96	HrmpBytesOverflow {
97		/// Parachain id of the recipient.
98		recipient: ParaId,
99		/// The amount of remaining bytes in the capacity of the channel.
100		bytes_remaining: u32,
101		/// The amount of bytes submitted to the channel.
102		bytes_submitted: u32,
103	},
104	/// Too many messages submitted to UMP queue.
105	UmpMessagesOverflow {
106		/// The amount of remaining messages in the capacity of UMP.
107		messages_remaining: u32,
108		/// The amount of messages submitted to UMP.
109		messages_submitted: u32,
110	},
111	/// Too many bytes submitted to UMP.
112	UmpBytesOverflow {
113		/// The amount of remaining bytes in the capacity of UMP.
114		bytes_remaining: u32,
115		/// The amount of bytes submitted to UMP.
116		bytes_submitted: u32,
117	},
118	/// Invalid HRMP watermark.
119	InvalidHrmpWatermark {
120		/// HRMP watermark submitted by the candidate.
121		submitted: relay_chain::BlockNumber,
122		/// Latest tracked HRMP watermark.
123		latest: relay_chain::BlockNumber,
124	},
125	/// Upgrade signal sent by relay chain was already processed by
126	/// some ancestor from the segment.
127	UpgradeGoAheadAlreadyProcessed,
128}
129
130/// The number of messages and size in bytes submitted to HRMP channel.
131#[derive(RuntimeDebug, Default, Copy, Clone, Encode, Decode, TypeInfo)]
132pub struct HrmpChannelUpdate {
133	/// The amount of messages submitted to the channel.
134	pub msg_count: u32,
135	/// The amount of bytes submitted to the channel.
136	pub total_bytes: u32,
137}
138
139impl HrmpChannelUpdate {
140	/// Returns `true` if the update is empty, `false` otherwise.
141	fn is_empty(&self) -> bool {
142		self.msg_count == 0 && self.total_bytes == 0
143	}
144
145	/// Tries to append another update, respecting given bandwidth limits.
146	fn append(
147		&self,
148		other: &Self,
149		recipient: ParaId,
150		limits: &OutboundBandwidthLimits,
151	) -> Result<Self, BandwidthUpdateError> {
152		let limits = limits
153			.hrmp_outgoing
154			.get(&recipient)
155			.expect("limit for declared hrmp channel must be present; qed");
156
157		let mut new = *self;
158
159		new.msg_count = new.msg_count.saturating_add(other.msg_count);
160		if new.msg_count > limits.messages_remaining {
161			return Err(BandwidthUpdateError::HrmpMessagesOverflow {
162				recipient,
163				messages_remaining: limits.messages_remaining,
164				messages_submitted: new.msg_count,
165			})
166		}
167		new.total_bytes = new.total_bytes.saturating_add(other.total_bytes);
168		if new.total_bytes > limits.bytes_remaining {
169			return Err(BandwidthUpdateError::HrmpBytesOverflow {
170				recipient,
171				bytes_remaining: limits.bytes_remaining,
172				bytes_submitted: new.total_bytes,
173			})
174		}
175
176		Ok(new)
177	}
178
179	/// Subtracts previously added channel update.
180	fn subtract(&mut self, other: &Self) {
181		self.msg_count -= other.msg_count;
182		self.total_bytes -= other.total_bytes;
183	}
184}
185
186/// Bandwidth used by a parachain block(s).
187///
188/// This struct can be created with pub items, however, it should
189/// never hit the storage directly to avoid bypassing limitations checks.
190#[derive(Default, Clone, Encode, Decode, TypeInfo, RuntimeDebug)]
191pub struct UsedBandwidth {
192	/// The amount of UMP messages sent.
193	pub ump_msg_count: u32,
194	/// The amount of UMP bytes sent.
195	pub ump_total_bytes: u32,
196	/// Outbound HRMP channels updates.
197	pub hrmp_outgoing: BTreeMap<ParaId, HrmpChannelUpdate>,
198}
199
200impl UsedBandwidth {
201	/// Tries to append another update, respecting given bandwidth limits.
202	fn append(
203		&self,
204		other: &Self,
205		limits: &OutboundBandwidthLimits,
206	) -> Result<Self, BandwidthUpdateError> {
207		let mut new = self.clone();
208
209		new.ump_msg_count = new.ump_msg_count.saturating_add(other.ump_msg_count);
210		if new.ump_msg_count > limits.ump_messages_remaining {
211			return Err(BandwidthUpdateError::UmpMessagesOverflow {
212				messages_remaining: limits.ump_messages_remaining,
213				messages_submitted: new.ump_msg_count,
214			})
215		}
216		new.ump_total_bytes = new.ump_total_bytes.saturating_add(other.ump_total_bytes);
217		if new.ump_total_bytes > limits.ump_bytes_remaining {
218			return Err(BandwidthUpdateError::UmpBytesOverflow {
219				bytes_remaining: limits.ump_bytes_remaining,
220				bytes_submitted: new.ump_total_bytes,
221			})
222		}
223
224		for (id, channel) in other.hrmp_outgoing.iter() {
225			let current = new.hrmp_outgoing.entry(*id).or_default();
226			*current = current.append(channel, *id, limits)?;
227		}
228
229		Ok(new)
230	}
231
232	/// Subtracts previously added bandwidth update.
233	fn subtract(&mut self, other: &Self) {
234		self.ump_msg_count -= other.ump_msg_count;
235		self.ump_total_bytes -= other.ump_total_bytes;
236
237		for (id, channel) in other.hrmp_outgoing.iter() {
238			let entry = self
239				.hrmp_outgoing
240				.get_mut(id)
241				.expect("entry's been inserted earlier with `append`; qed");
242			entry.subtract(channel);
243		}
244
245		self.hrmp_outgoing.retain(|_, channel| !channel.is_empty());
246	}
247}
248
249/// Ancestor of the block being currently executed, not yet included
250/// into the relay chain.
251#[derive(Encode, Decode, TypeInfo, RuntimeDebug)]
252pub struct Ancestor<H> {
253	/// Bandwidth used by this block.
254	used_bandwidth: UsedBandwidth,
255	/// Output head data hash of this block. This may be optional in case the head data has not
256	/// yet been posted on chain, but should be updated during initialization of the next block.
257	para_head_hash: Option<H>,
258	/// Optional go-ahead signal sent by the relay-chain this ancestor has processed.
259	consumed_go_ahead_signal: Option<relay_chain::UpgradeGoAhead>,
260}
261
262impl<H> Ancestor<H> {
263	/// Creates new ancestor without validating the bandwidth used.
264	pub fn new_unchecked(
265		used_bandwidth: UsedBandwidth,
266		consumed_go_ahead_signal: Option<relay_chain::UpgradeGoAhead>,
267	) -> Self {
268		Self { used_bandwidth, para_head_hash: None, consumed_go_ahead_signal }
269	}
270
271	/// Returns [`UsedBandwidth`] of this block.
272	pub fn used_bandwidth(&self) -> &UsedBandwidth {
273		&self.used_bandwidth
274	}
275
276	/// Returns hashed [output head data](`relay_chain::HeadData`) of this block.
277	pub fn para_head_hash(&self) -> Option<&H> {
278		self.para_head_hash.as_ref()
279	}
280
281	/// Set para head hash of this block.
282	pub fn replace_para_head_hash(&mut self, para_head_hash: H) {
283		self.para_head_hash.replace(para_head_hash);
284	}
285}
286
287/// An update to the HRMP watermark. This is always a relay-chain block number,
288/// but the two variants have different semantic meanings.
289pub enum HrmpWatermarkUpdate {
290	/// An update to the HRMP watermark where the new value is set to be equal to the
291	/// relay-parent's block number, i.e. the "head" of the relay chain.
292	/// This is always legal.
293	Head(relay_chain::BlockNumber),
294	/// An update to the HRMP watermark where the new value falls into the "trunk" of the
295	/// relay-chain. In this case, the watermark must be greater than the previous value.
296	Trunk(relay_chain::BlockNumber),
297}
298
299impl HrmpWatermarkUpdate {
300	/// Create a new update based on the desired watermark value and the current
301	/// relay-parent number.
302	pub fn new(
303		watermark: relay_chain::BlockNumber,
304		relay_parent_number: relay_chain::BlockNumber,
305	) -> Self {
306		// Hard constrain the watermark to the relay-parent number.
307		if watermark >= relay_parent_number {
308			HrmpWatermarkUpdate::Head(relay_parent_number)
309		} else {
310			HrmpWatermarkUpdate::Trunk(watermark)
311		}
312	}
313}
314
315/// Struct that keeps track of bandwidth used by the unincluded part of the chain
316/// along with the latest HRMP watermark.
317#[derive(Default, Encode, Decode, TypeInfo, RuntimeDebug)]
318pub struct SegmentTracker<H> {
319	/// Bandwidth used by the segment.
320	used_bandwidth: UsedBandwidth,
321	/// The mark which specifies the block number up to which all inbound HRMP messages are
322	/// processed.
323	hrmp_watermark: Option<relay_chain::BlockNumber>,
324	/// Optional go-ahead signal sent by the relay-chain some ancestor from the segment has
325	/// processed. Only single block is allowed to have this set within the whole segment.
326	consumed_go_ahead_signal: Option<relay_chain::UpgradeGoAhead>,
327	/// `H` is the type of para head hash.
328	phantom_data: PhantomData<H>,
329}
330
331impl<H> SegmentTracker<H> {
332	/// Tries to append another block to the tracker, respecting given bandwidth limits.
333	/// In practice, the bandwidth limits supplied should be the total allowed within the
334	/// block.
335	pub fn append(
336		&mut self,
337		block: &Ancestor<H>,
338		new_watermark: HrmpWatermarkUpdate,
339		limits: &OutboundBandwidthLimits,
340	) -> Result<(), BandwidthUpdateError> {
341		if self.consumed_go_ahead_signal.is_some() && block.consumed_go_ahead_signal.is_some() {
342			return Err(BandwidthUpdateError::UpgradeGoAheadAlreadyProcessed)
343		}
344		if let Some(watermark) = self.hrmp_watermark.as_ref() {
345			if let HrmpWatermarkUpdate::Trunk(new) = new_watermark {
346				if &new < watermark {
347					return Err(BandwidthUpdateError::InvalidHrmpWatermark {
348						submitted: new,
349						latest: *watermark,
350					})
351				}
352			}
353		}
354
355		self.used_bandwidth = self.used_bandwidth.append(block.used_bandwidth(), limits)?;
356
357		if let Some(consumed) = block.consumed_go_ahead_signal.as_ref() {
358			self.consumed_go_ahead_signal.replace(*consumed);
359		}
360		self.hrmp_watermark.replace(match new_watermark {
361			HrmpWatermarkUpdate::Trunk(w) | HrmpWatermarkUpdate::Head(w) => w,
362		});
363
364		Ok(())
365	}
366
367	/// Removes previously added block from the tracker.
368	pub fn subtract(&mut self, block: &Ancestor<H>) {
369		self.used_bandwidth.subtract(block.used_bandwidth());
370		if let Some(consumed) = block.consumed_go_ahead_signal.as_ref() {
371			// This is the same signal stored in the tracker.
372			let signal_in_segment = self.consumed_go_ahead_signal.take();
373			assert_eq!(signal_in_segment, Some(*consumed));
374		}
375		// Watermark doesn't need to be updated since the is always dropped
376		// from the tail of the segment.
377	}
378
379	/// Return a reference to the used bandwidth across the entire segment.
380	pub fn used_bandwidth(&self) -> &UsedBandwidth {
381		&self.used_bandwidth
382	}
383
384	/// Return go ahead signal consumed by some ancestor in a segment, if any.
385	pub fn consumed_go_ahead_signal(&self) -> Option<relay_chain::UpgradeGoAhead> {
386		self.consumed_go_ahead_signal
387	}
388}
389
390pub(crate) fn size_after_included<H: PartialEq>(included_hash: H, segment: &[Ancestor<H>]) -> u32 {
391	let pivot = segment
392		.iter()
393		.position(|ancestor| ancestor.para_head_hash() == Some(&included_hash))
394		.map(|p| p + 1)
395		.unwrap_or(0);
396
397	(segment.len() - pivot) as u32
398}
399
400#[cfg(test)]
401mod tests {
402	use super::*;
403	use alloc::{vec, vec::Vec};
404	use assert_matches::assert_matches;
405
406	#[test]
407	fn outbound_limits_constructed_correctly() {
408		let para_a = ParaId::from(0);
409		let para_a_channel = relay_chain::AbridgedHrmpChannel {
410			max_message_size: 15,
411
412			// Msg count capacity left is 2.
413			msg_count: 5,
414			max_capacity: 7,
415
416			// Bytes capacity left is 10.
417			total_size: 50,
418			max_total_size: 60,
419			mqc_head: None,
420		};
421
422		let para_b = ParaId::from(1);
423		let para_b_channel = relay_chain::AbridgedHrmpChannel {
424			max_message_size: 15,
425
426			// Msg count capacity left is 10.
427			msg_count: 40,
428			max_capacity: 50,
429
430			// Bytes capacity left is 0.
431			total_size: 500,
432			max_total_size: 500,
433			mqc_head: None,
434		};
435		let relay_dispatch_queue_remaining_capacity =
436			RelayDispatchQueueRemainingCapacity { remaining_count: 1, remaining_size: 50 };
437		let messaging_state = MessagingStateSnapshot {
438			dmq_mqc_head: relay_chain::Hash::zero(),
439			relay_dispatch_queue_remaining_capacity,
440			ingress_channels: Vec::new(),
441			egress_channels: vec![(para_a, para_a_channel), (para_b, para_b_channel)],
442		};
443
444		let limits = OutboundBandwidthLimits::from_relay_chain_state(&messaging_state);
445
446		// UMP.
447		assert_eq!(limits.ump_messages_remaining, 1);
448		assert_eq!(limits.ump_bytes_remaining, 50);
449
450		// HRMP.
451		let para_a_limits = limits.hrmp_outgoing.get(&para_a).expect("channel must be present");
452		let para_b_limits = limits.hrmp_outgoing.get(&para_b).expect("channel must be present");
453		assert_eq!(para_a_limits.bytes_remaining, 10);
454		assert_eq!(para_a_limits.messages_remaining, 2);
455		assert_eq!(para_b_limits.bytes_remaining, 0);
456		assert_eq!(para_b_limits.messages_remaining, 10);
457	}
458
459	#[test]
460	fn hrmp_msg_count_limits() {
461		let para_0 = ParaId::from(0);
462		let para_0_limits = HrmpOutboundLimits { bytes_remaining: u32::MAX, messages_remaining: 5 };
463
464		let para_1 = ParaId::from(1);
465		let para_1_limits = HrmpOutboundLimits { bytes_remaining: u32::MAX, messages_remaining: 3 };
466		let hrmp_outgoing = [(para_0, para_0_limits), (para_1, para_1_limits)].into();
467		let limits = OutboundBandwidthLimits {
468			ump_messages_remaining: 0,
469			ump_bytes_remaining: 0,
470			hrmp_outgoing,
471		};
472
473		let mut hrmp_update = HrmpChannelUpdate::default();
474		assert!(hrmp_update.is_empty());
475
476		for _ in 0..5 {
477			hrmp_update = hrmp_update
478				.append(&HrmpChannelUpdate { msg_count: 1, total_bytes: 10 }, para_0, &limits)
479				.expect("update is within the limits");
480		}
481		assert_matches!(
482			hrmp_update.append(
483				&HrmpChannelUpdate { msg_count: 1, total_bytes: 10 },
484				para_0,
485				&limits,
486			),
487			Err(BandwidthUpdateError::HrmpMessagesOverflow {
488				recipient,
489				messages_remaining,
490				messages_submitted,
491			}) if recipient == para_0 && messages_remaining == 5 && messages_submitted == 6
492		);
493
494		let mut hrmp_update = HrmpChannelUpdate::default();
495		hrmp_update = hrmp_update
496			.append(&HrmpChannelUpdate { msg_count: 2, total_bytes: 10 }, para_1, &limits)
497			.expect("update is within the limits");
498		assert_matches!(
499			hrmp_update.append(
500				&HrmpChannelUpdate { msg_count: 3, total_bytes: 10 },
501				para_1,
502				&limits,
503			),
504			Err(BandwidthUpdateError::HrmpMessagesOverflow {
505				recipient,
506				messages_remaining,
507				messages_submitted,
508			}) if recipient == para_1 && messages_remaining == 3 && messages_submitted == 5
509		);
510	}
511
512	#[test]
513	fn hrmp_bytes_limits() {
514		let para_0 = ParaId::from(0);
515		let para_0_limits =
516			HrmpOutboundLimits { bytes_remaining: 25, messages_remaining: u32::MAX };
517
518		let hrmp_outgoing = [(para_0, para_0_limits)].into();
519		let limits = OutboundBandwidthLimits {
520			ump_messages_remaining: 0,
521			ump_bytes_remaining: 0,
522			hrmp_outgoing,
523		};
524
525		let mut hrmp_update = HrmpChannelUpdate::default();
526		assert!(hrmp_update.is_empty());
527
528		for _ in 0..5 {
529			hrmp_update = hrmp_update
530				.append(&HrmpChannelUpdate { msg_count: 1, total_bytes: 4 }, para_0, &limits)
531				.expect("update is within the limits");
532		}
533		assert_matches!(
534			hrmp_update.append(
535				&HrmpChannelUpdate { msg_count: 1, total_bytes: 6 },
536				para_0,
537				&limits,
538			),
539			Err(BandwidthUpdateError::HrmpBytesOverflow {
540				recipient,
541				bytes_remaining,
542				bytes_submitted,
543			}) if recipient == para_0 && bytes_remaining == 25 && bytes_submitted == 26
544		);
545	}
546
547	#[test]
548	fn hrmp_limits_with_segment() {
549		let create_used_hrmp =
550			|hrmp_outgoing| UsedBandwidth { ump_msg_count: 0, ump_total_bytes: 0, hrmp_outgoing };
551
552		let para_0 = ParaId::from(0);
553		let para_0_limits = HrmpOutboundLimits { bytes_remaining: 30, messages_remaining: 10 };
554
555		let para_1 = ParaId::from(1);
556		let para_1_limits = HrmpOutboundLimits { bytes_remaining: 20, messages_remaining: 3 };
557		let hrmp_outgoing = [(para_0, para_0_limits), (para_1, para_1_limits)].into();
558		let limits = OutboundBandwidthLimits {
559			ump_messages_remaining: 0,
560			ump_bytes_remaining: 0,
561			hrmp_outgoing,
562		};
563
564		let mut segment = SegmentTracker::default();
565
566		let para_0_update = HrmpChannelUpdate { msg_count: 1, total_bytes: 6 };
567		let ancestor_0 = Ancestor {
568			used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()),
569			para_head_hash: None::<relay_chain::Hash>,
570			consumed_go_ahead_signal: None,
571		};
572		segment
573			.append(&ancestor_0, HrmpWatermarkUpdate::Trunk(0), &limits)
574			.expect("update is within the limits");
575
576		for watermark in 1..5 {
577			let ancestor = Ancestor {
578				used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()),
579				para_head_hash: None::<relay_chain::Hash>,
580				consumed_go_ahead_signal: None,
581			};
582			segment
583				.append(&ancestor, HrmpWatermarkUpdate::Trunk(watermark), &limits)
584				.expect("update is within the limits");
585		}
586
587		let para_0_update = HrmpChannelUpdate { msg_count: 1, total_bytes: 1 };
588		let ancestor_5 = Ancestor {
589			used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()),
590			para_head_hash: None::<relay_chain::Hash>,
591			consumed_go_ahead_signal: None,
592		};
593		assert_matches!(
594			segment.append(&ancestor_5, HrmpWatermarkUpdate::Trunk(5), &limits),
595			Err(BandwidthUpdateError::HrmpBytesOverflow {
596				recipient,
597				bytes_remaining,
598				bytes_submitted,
599			}) if recipient == para_0 && bytes_remaining == 30 && bytes_submitted == 31
600		);
601		// Remove the first ancestor from the segment to make space.
602		segment.subtract(&ancestor_0);
603		segment
604			.append(&ancestor_5, HrmpWatermarkUpdate::Trunk(5), &limits)
605			.expect("update is within the limits");
606
607		let para_1_update = HrmpChannelUpdate { msg_count: 3, total_bytes: 10 };
608		let ancestor = Ancestor {
609			used_bandwidth: create_used_hrmp([(para_1, para_1_update)].into()),
610			para_head_hash: None::<relay_chain::Hash>,
611			consumed_go_ahead_signal: None,
612		};
613		segment
614			.append(&ancestor, HrmpWatermarkUpdate::Trunk(6), &limits)
615			.expect("update is within the limits");
616
617		assert_matches!(
618			segment.append(&ancestor, HrmpWatermarkUpdate::Trunk(7), &limits),
619			Err(BandwidthUpdateError::HrmpMessagesOverflow {
620				recipient,
621				messages_remaining,
622				messages_submitted,
623			}) if recipient == para_1 && messages_remaining == 3 && messages_submitted == 6
624		);
625	}
626
627	#[test]
628	fn ump_limits_with_segment() {
629		let create_used_ump = |(ump_msg_count, ump_total_bytes)| UsedBandwidth {
630			ump_msg_count,
631			ump_total_bytes,
632			hrmp_outgoing: BTreeMap::default(),
633		};
634
635		let limits = OutboundBandwidthLimits {
636			ump_messages_remaining: 5,
637			ump_bytes_remaining: 50,
638			hrmp_outgoing: BTreeMap::default(),
639		};
640
641		let mut segment = SegmentTracker::default();
642
643		let ancestor_0 = Ancestor {
644			used_bandwidth: create_used_ump((1, 10)),
645			para_head_hash: None::<relay_chain::Hash>,
646			consumed_go_ahead_signal: None,
647		};
648		segment
649			.append(&ancestor_0, HrmpWatermarkUpdate::Trunk(0), &limits)
650			.expect("update is within the limits");
651
652		for watermark in 1..4 {
653			let ancestor = Ancestor {
654				used_bandwidth: create_used_ump((1, 10)),
655				para_head_hash: None::<relay_chain::Hash>,
656				consumed_go_ahead_signal: None,
657			};
658			segment
659				.append(&ancestor, HrmpWatermarkUpdate::Trunk(watermark), &limits)
660				.expect("update is within the limits");
661		}
662
663		let ancestor_4 = Ancestor {
664			used_bandwidth: create_used_ump((1, 30)),
665			para_head_hash: None::<relay_chain::Hash>,
666			consumed_go_ahead_signal: None,
667		};
668		assert_matches!(
669			segment.append(&ancestor_4, HrmpWatermarkUpdate::Trunk(4), &limits),
670			Err(BandwidthUpdateError::UmpBytesOverflow {
671				bytes_remaining,
672				bytes_submitted,
673			}) if bytes_remaining == 50 && bytes_submitted == 70
674		);
675
676		let ancestor = Ancestor {
677			used_bandwidth: create_used_ump((1, 5)),
678			para_head_hash: None::<relay_chain::Hash>,
679			consumed_go_ahead_signal: None,
680		};
681		segment
682			.append(&ancestor, HrmpWatermarkUpdate::Trunk(4), &limits)
683			.expect("update is within the limits");
684		assert_matches!(
685			segment.append(&ancestor, HrmpWatermarkUpdate::Trunk(5), &limits),
686			Err(BandwidthUpdateError::UmpMessagesOverflow {
687				messages_remaining,
688				messages_submitted,
689			}) if messages_remaining == 5 && messages_submitted == 6
690		);
691	}
692
693	#[test]
694	fn segment_hrmp_watermark() {
695		let mut segment = SegmentTracker::default();
696
697		let ancestor = Ancestor {
698			used_bandwidth: UsedBandwidth::default(),
699			para_head_hash: None::<relay_chain::Hash>,
700			consumed_go_ahead_signal: None,
701		};
702		let limits = OutboundBandwidthLimits {
703			ump_messages_remaining: 0,
704			ump_bytes_remaining: 0,
705			hrmp_outgoing: BTreeMap::default(),
706		};
707
708		segment
709			.append(&ancestor, HrmpWatermarkUpdate::Head(0), &limits)
710			.expect("nothing to compare the watermark with in default segment");
711		assert_matches!(segment.append(&ancestor, HrmpWatermarkUpdate::Trunk(0), &limits), Ok(()));
712
713		// Trunk updates are allowed when the new watermark is greater than the current one
714		for watermark in 1..5 {
715			segment
716				.append(&ancestor, HrmpWatermarkUpdate::Trunk(watermark), &limits)
717				.expect("hrmp watermark is valid");
718		}
719		// Trunk updates are not allowed when the new watermark is smaller than the current one
720		for watermark in 0..4 {
721			assert_eq!(
722				segment.append(&ancestor, HrmpWatermarkUpdate::Trunk(watermark), &limits),
723				Err(BandwidthUpdateError::InvalidHrmpWatermark { submitted: watermark, latest: 4 }),
724			);
725		}
726		// The current watermark is still valid.
727		segment
728			.append(&ancestor, HrmpWatermarkUpdate::Trunk(5), &limits)
729			.expect("hrmp watermark is valid");
730
731		// Head updates are allowed even if the new watermark is smaller than the current one
732		segment
733			.append(&ancestor, HrmpWatermarkUpdate::Head(4), &limits)
734			.expect("head updates always valid");
735	}
736
737	#[test]
738	fn segment_drops_empty_hrmp_channels() {
739		let create_used_hrmp =
740			|hrmp_outgoing| UsedBandwidth { ump_msg_count: 0, ump_total_bytes: 0, hrmp_outgoing };
741
742		let para_0 = ParaId::from(0);
743		let para_0_limits =
744			HrmpOutboundLimits { bytes_remaining: u32::MAX, messages_remaining: u32::MAX };
745
746		let para_1 = ParaId::from(1);
747		let para_1_limits =
748			HrmpOutboundLimits { bytes_remaining: u32::MAX, messages_remaining: u32::MAX };
749		let hrmp_outgoing = [(para_0, para_0_limits), (para_1, para_1_limits)].into();
750		let limits = OutboundBandwidthLimits {
751			ump_messages_remaining: 0,
752			ump_bytes_remaining: 0,
753			hrmp_outgoing,
754		};
755
756		let mut segment = SegmentTracker::default();
757
758		let para_0_update = HrmpChannelUpdate { msg_count: 1, total_bytes: 1 };
759		let ancestor_0 = Ancestor {
760			used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()),
761			para_head_hash: None::<relay_chain::Hash>,
762			consumed_go_ahead_signal: None,
763		};
764		segment
765			.append(&ancestor_0, HrmpWatermarkUpdate::Head(0), &limits)
766			.expect("update is within the limits");
767		let para_1_update = HrmpChannelUpdate { msg_count: 3, total_bytes: 10 };
768		let ancestor_1 = Ancestor {
769			used_bandwidth: create_used_hrmp([(para_1, para_1_update)].into()),
770			para_head_hash: None::<relay_chain::Hash>,
771			consumed_go_ahead_signal: None,
772		};
773		segment
774			.append(&ancestor_1, HrmpWatermarkUpdate::Head(1), &limits)
775			.expect("update is within the limits");
776
777		assert_eq!(segment.used_bandwidth.hrmp_outgoing.len(), 2);
778
779		segment.subtract(&ancestor_0);
780		assert_eq!(segment.used_bandwidth.hrmp_outgoing.len(), 1);
781
782		segment.subtract(&ancestor_1);
783		assert_eq!(segment.used_bandwidth.hrmp_outgoing.len(), 0);
784	}
785
786	#[test]
787	fn segment_go_ahead_signal_is_unique() {
788		let limits = OutboundBandwidthLimits {
789			ump_messages_remaining: 0,
790			ump_bytes_remaining: 0,
791			hrmp_outgoing: BTreeMap::default(),
792		};
793
794		let mut segment = SegmentTracker::default();
795
796		let ancestor_0 = Ancestor {
797			used_bandwidth: UsedBandwidth::default(),
798			para_head_hash: None::<relay_chain::Hash>,
799			consumed_go_ahead_signal: Some(relay_chain::UpgradeGoAhead::GoAhead),
800		};
801		segment
802			.append(&ancestor_0, HrmpWatermarkUpdate::Head(0), &limits)
803			.expect("update is within the limits");
804
805		let ancestor_1 = Ancestor {
806			used_bandwidth: UsedBandwidth::default(),
807			para_head_hash: None::<relay_chain::Hash>,
808			consumed_go_ahead_signal: None,
809		};
810		segment
811			.append(&ancestor_1, HrmpWatermarkUpdate::Head(1), &limits)
812			.expect("update is within the limits");
813
814		let ancestor_2 = Ancestor {
815			used_bandwidth: UsedBandwidth::default(),
816			para_head_hash: None::<relay_chain::Hash>,
817			consumed_go_ahead_signal: Some(relay_chain::UpgradeGoAhead::Abort),
818		};
819		assert_matches!(
820			segment.append(&ancestor_2, HrmpWatermarkUpdate::Head(2), &limits),
821			Err(BandwidthUpdateError::UpgradeGoAheadAlreadyProcessed)
822		);
823
824		segment.subtract(&ancestor_0);
825		segment
826			.append(&ancestor_2, HrmpWatermarkUpdate::Head(1), &limits)
827			.expect("update is within the limits");
828	}
829
830	#[test]
831	fn size_after_included_works() {
832		let segment = vec![
833			Ancestor {
834				used_bandwidth: Default::default(),
835				para_head_hash: Some("a"),
836				consumed_go_ahead_signal: None,
837			},
838			Ancestor {
839				used_bandwidth: Default::default(),
840				para_head_hash: Some("b"),
841				consumed_go_ahead_signal: None,
842			},
843			Ancestor {
844				used_bandwidth: Default::default(),
845				para_head_hash: Some("c"),
846				consumed_go_ahead_signal: None,
847			},
848		];
849
850		assert_eq!(size_after_included("a", &segment), 2,);
851		assert_eq!(size_after_included("b", &segment), 1,);
852		assert_eq!(size_after_included("c", &segment), 0,);
853		assert_eq!(size_after_included("d", &segment), 3,);
854
855		assert_eq!(size_after_included("x", &[]), 0,);
856	}
857}