mixnet/core/
packet_queues.rs1use super::sphinx::{Packet, PeerId};
24use std::{
25 cmp::Ordering,
26 collections::{BinaryHeap, VecDeque},
27 time::Instant,
28};
29
30pub struct AddressedPacket {
32 pub peer_id: PeerId,
34 pub packet: Box<Packet>,
36}
37
38struct ForwardPacket {
41 deadline: Instant,
43 packet: AddressedPacket,
45}
46
47impl PartialEq for ForwardPacket {
48 fn eq(&self, other: &Self) -> bool {
49 self.deadline == other.deadline
50 }
51}
52
53impl Eq for ForwardPacket {}
54
55impl PartialOrd for ForwardPacket {
56 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
57 Some(self.cmp(other))
58 }
59}
60
61impl Ord for ForwardPacket {
62 fn cmp(&self, other: &Self) -> Ordering {
63 self.deadline.cmp(&other.deadline).reverse()
65 }
66}
67
68pub struct ForwardPacketQueue {
69 capacity: usize,
72 queue: BinaryHeap<ForwardPacket>,
73}
74
75impl ForwardPacketQueue {
76 pub fn new(capacity: usize) -> Self {
77 Self { capacity, queue: BinaryHeap::with_capacity(capacity) }
78 }
79
80 pub fn next_deadline(&self) -> Option<Instant> {
81 self.queue.peek().map(|packet| packet.deadline)
82 }
83
84 pub fn has_space(&self) -> bool {
85 self.queue.len() < self.capacity
86 }
87
88 pub fn insert(&mut self, deadline: Instant, packet: AddressedPacket) -> bool {
92 debug_assert!(self.has_space());
93 let prev_deadline = self.next_deadline();
94 self.queue.push(ForwardPacket { deadline, packet });
95 self.next_deadline() != prev_deadline
96 }
97
98 pub fn pop(&mut self) -> Option<AddressedPacket> {
99 self.queue.pop().map(|packet| packet.packet)
100 }
101}
102
103#[derive(Clone, Copy, Debug)]
104pub struct AuthoredPacketQueueConfig {
105 pub capacity: usize,
108 pub multiple_messages: bool,
110}
111
112pub enum CheckSpaceErr {
113 Capacity,
115 Len,
117}
118
119pub struct AuthoredPacketQueue {
120 config: AuthoredPacketQueueConfig,
121 queue: VecDeque<AddressedPacket>,
122}
123
124impl AuthoredPacketQueue {
125 pub fn new(config: AuthoredPacketQueueConfig) -> Self {
126 Self { config, queue: VecDeque::with_capacity(config.capacity) }
127 }
128
129 pub fn len(&self) -> usize {
130 self.queue.len()
131 }
132
133 pub fn check_space(&self, num_packets: usize) -> Result<(), CheckSpaceErr> {
134 let Some(mut max_len) = self.config.capacity.checked_sub(num_packets) else {
135 return Err(CheckSpaceErr::Capacity)
136 };
137 if !self.config.multiple_messages {
138 max_len = 0;
139 }
140 if self.queue.len() > max_len {
141 Err(CheckSpaceErr::Len)
142 } else {
143 Ok(())
144 }
145 }
146
147 pub fn push(&mut self, packet: AddressedPacket) {
150 debug_assert!(self.queue.len() < self.config.capacity);
151 self.queue.push_back(packet);
152 }
153
154 pub fn pop(&mut self) -> (Option<AddressedPacket>, bool) {
158 let packet = self.queue.pop_front();
159 let space = packet.is_some() && (self.config.multiple_messages || self.queue.is_empty());
160 (packet, space)
161 }
162}