mixnet/core/
packet_queues.rs

1// Copyright 2022 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Mixnet packet queues.
22
23use super::sphinx::{Packet, PeerId};
24use std::{
25	cmp::Ordering,
26	collections::{BinaryHeap, VecDeque},
27	time::Instant,
28};
29
30/// A packet plus the ID of the peer it should be sent to.
31pub struct AddressedPacket {
32	/// Where the packet should be sent.
33	pub peer_id: PeerId,
34	/// The packet contents.
35	pub packet: Box<Packet>,
36}
37
38/// `Eq` and `Ord` are implemented for this to support use in `BinaryHeap`s. Only `deadline` is
39/// compared.
40struct ForwardPacket {
41	/// When the packet should be sent.
42	deadline: Instant,
43	/// The packet and destination.
44	packet: AddressedPacket,
45}
46
47impl PartialEq for ForwardPacket {
48	fn eq(&self, other: &Self) -> bool {
49		self.deadline == other.deadline
50	}
51}
52
53impl Eq for ForwardPacket {}
54
55impl PartialOrd for ForwardPacket {
56	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
57		Some(self.cmp(other))
58	}
59}
60
61impl Ord for ForwardPacket {
62	fn cmp(&self, other: &Self) -> Ordering {
63		// Packets with the earliest deadline considered greatest
64		self.deadline.cmp(&other.deadline).reverse()
65	}
66}
67
68pub struct ForwardPacketQueue {
69	/// Maximum number of packets in the queue. This should match the capacity of `queue`, but we
70	/// don't rely on that.
71	capacity: usize,
72	queue: BinaryHeap<ForwardPacket>,
73}
74
75impl ForwardPacketQueue {
76	pub fn new(capacity: usize) -> Self {
77		Self { capacity, queue: BinaryHeap::with_capacity(capacity) }
78	}
79
80	pub fn next_deadline(&self) -> Option<Instant> {
81		self.queue.peek().map(|packet| packet.deadline)
82	}
83
84	pub fn has_space(&self) -> bool {
85		self.queue.len() < self.capacity
86	}
87
88	/// Insert a packet into the queue. Returns `true` iff the deadline of the item at the head of
89	/// the queue changed. Should only be called if there is space in the queue (see
90	/// [`has_space`](Self::has_space)).
91	pub fn insert(&mut self, deadline: Instant, packet: AddressedPacket) -> bool {
92		debug_assert!(self.has_space());
93		let prev_deadline = self.next_deadline();
94		self.queue.push(ForwardPacket { deadline, packet });
95		self.next_deadline() != prev_deadline
96	}
97
98	pub fn pop(&mut self) -> Option<AddressedPacket> {
99		self.queue.pop().map(|packet| packet.packet)
100	}
101}
102
103#[derive(Clone, Copy, Debug)]
104pub struct AuthoredPacketQueueConfig {
105	/// Maximum number of packets in the queue. Note that cover packets do not go in the queue;
106	/// they are generated on demand.
107	pub capacity: usize,
108	/// Allow packets for multiple messages in the queue?
109	pub multiple_messages: bool,
110}
111
112pub enum CheckSpaceErr {
113	/// There will never be enough space.
114	Capacity,
115	/// There are too many other packets in the queue at the moment.
116	Len,
117}
118
119pub struct AuthoredPacketQueue {
120	config: AuthoredPacketQueueConfig,
121	queue: VecDeque<AddressedPacket>,
122}
123
124impl AuthoredPacketQueue {
125	pub fn new(config: AuthoredPacketQueueConfig) -> Self {
126		Self { config, queue: VecDeque::with_capacity(config.capacity) }
127	}
128
129	pub fn len(&self) -> usize {
130		self.queue.len()
131	}
132
133	pub fn check_space(&self, num_packets: usize) -> Result<(), CheckSpaceErr> {
134		let Some(mut max_len) = self.config.capacity.checked_sub(num_packets) else {
135			return Err(CheckSpaceErr::Capacity)
136		};
137		if !self.config.multiple_messages {
138			max_len = 0;
139		}
140		if self.queue.len() > max_len {
141			Err(CheckSpaceErr::Len)
142		} else {
143			Ok(())
144		}
145	}
146
147	/// Push a packet onto the queue. Should only be called if there is space in the queue (see
148	/// [`check_space`](Self::check_space)).
149	pub fn push(&mut self, packet: AddressedPacket) {
150		debug_assert!(self.queue.len() < self.config.capacity);
151		self.queue.push_back(packet);
152	}
153
154	/// Pop the packet at the head of the queue and return it, or, if the queue is empty, return
155	/// `None`. Also returns `true` if [`check_space`](Self::check_space) might now succeed where
156	/// it wouldn't before.
157	pub fn pop(&mut self) -> (Option<AddressedPacket>, bool) {
158		let packet = self.queue.pop_front();
159		let space = packet.is_some() && (self.config.multiple_messages || self.queue.is_empty());
160		(packet, space)
161	}
162}