sc_mixnet/
extrinsic_queue.rs1use mixnet::reply_manager::ReplyContext;
24use std::{cmp::Ordering, collections::BinaryHeap, time::Instant};
25
26struct DelayedExtrinsic<E> {
29 deadline: Instant,
31 extrinsic: E,
32 reply_context: ReplyContext,
33}
34
35impl<E> PartialEq for DelayedExtrinsic<E> {
36 fn eq(&self, other: &Self) -> bool {
37 self.deadline == other.deadline
38 }
39}
40
41impl<E> Eq for DelayedExtrinsic<E> {}
42
43impl<E> PartialOrd for DelayedExtrinsic<E> {
44 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
45 Some(self.cmp(other))
46 }
47}
48
49impl<E> Ord for DelayedExtrinsic<E> {
50 fn cmp(&self, other: &Self) -> Ordering {
51 self.deadline.cmp(&other.deadline).reverse()
53 }
54}
55
56pub struct ExtrinsicQueue<E> {
57 capacity: usize,
58 queue: BinaryHeap<DelayedExtrinsic<E>>,
59 next_deadline_changed: bool,
60}
61
62impl<E> ExtrinsicQueue<E> {
63 pub fn new(capacity: usize) -> Self {
64 Self { capacity, queue: BinaryHeap::with_capacity(capacity), next_deadline_changed: false }
65 }
66
67 pub fn next_deadline(&self) -> Option<Instant> {
68 self.queue.peek().map(|extrinsic| extrinsic.deadline)
69 }
70
71 pub fn next_deadline_changed(&mut self) -> bool {
72 let changed = self.next_deadline_changed;
73 self.next_deadline_changed = false;
74 changed
75 }
76
77 pub fn has_space(&self) -> bool {
78 self.queue.len() < self.capacity
79 }
80
81 pub fn insert(&mut self, deadline: Instant, extrinsic: E, reply_context: ReplyContext) {
82 debug_assert!(self.has_space());
83 let prev_deadline = self.next_deadline();
84 self.queue.push(DelayedExtrinsic { deadline, extrinsic, reply_context });
85 if self.next_deadline() != prev_deadline {
86 self.next_deadline_changed = true;
87 }
88 }
89
90 pub fn pop(&mut self) -> Option<(E, ReplyContext)> {
91 self.next_deadline_changed = true;
92 self.queue.pop().map(|extrinsic| (extrinsic.extrinsic, extrinsic.reply_context))
93 }
94}