polkadot_dispute_distribution/receiver/batches/
waiting_queue.rs1use std::{cmp::Ordering, collections::BinaryHeap, time::Instant};
18
19use futures::future::pending;
20use futures_timer::Delay;
21
22pub struct WaitingQueue<Payload> {
27 pending_wakes: BinaryHeap<PendingWake<Payload>>,
29 timer: Option<Delay>,
31}
32
33#[derive(Eq, PartialEq)]
38pub struct PendingWake<Payload> {
39 pub payload: Payload,
40 pub ready_at: Instant,
41}
42
43impl<Payload: Eq + Ord> WaitingQueue<Payload> {
44 pub fn new() -> Self {
48 Self { pending_wakes: BinaryHeap::new(), timer: None }
49 }
50
51 pub fn push(&mut self, wake: PendingWake<Payload>) {
56 self.pending_wakes.push(wake);
57 self.timer = None;
59 }
60
61 pub fn pop_ready(&mut self, now: Instant) -> Option<PendingWake<Payload>> {
66 let is_ready = self.pending_wakes.peek().map_or(false, |p| p.ready_at <= now);
67 if is_ready {
68 Some(self.pending_wakes.pop().expect("We just peeked. qed."))
69 } else {
70 None
71 }
72 }
73
74 pub async fn wait_ready(&mut self, now: Instant) {
85 if let Some(timer) = &mut self.timer {
86 timer.await
88 }
89
90 let next_waiting = self.pending_wakes.peek();
91 let is_ready = next_waiting.map_or(false, |p| p.ready_at <= now);
92 if is_ready {
93 return
94 }
95
96 self.timer = next_waiting.map(|p| Delay::new(p.ready_at.duration_since(now)));
97 match &mut self.timer {
98 None => return pending().await,
99 Some(timer) => timer.await,
100 }
101 }
102}
103
104impl<Payload: Eq + Ord> PartialOrd<PendingWake<Payload>> for PendingWake<Payload> {
105 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
106 Some(self.cmp(other))
107 }
108}
109
110impl<Payload: Ord> Ord for PendingWake<Payload> {
111 fn cmp(&self, other: &Self) -> Ordering {
112 match other.ready_at.cmp(&self.ready_at) {
114 Ordering::Equal => other.payload.cmp(&self.payload),
115 o => o,
116 }
117 }
118}
119#[cfg(test)]
120mod tests {
121 use std::{
122 task::Poll,
123 time::{Duration, Instant},
124 };
125
126 use assert_matches::assert_matches;
127 use futures::{future::poll_fn, pin_mut, Future};
128
129 use crate::LOG_TARGET;
130
131 use super::{PendingWake, WaitingQueue};
132
133 #[test]
134 fn wait_ready_waits_for_earliest_event_always() {
135 sp_tracing::try_init_simple();
136 let mut queue = WaitingQueue::new();
137 let now = Instant::now();
138 let start = now;
139 queue.push(PendingWake { payload: 1u32, ready_at: now + Duration::from_millis(3) });
140 queue.push(PendingWake { payload: 2u32, ready_at: now + Duration::from_millis(5) });
142 queue.push(PendingWake { payload: 0u32, ready_at: now + Duration::from_millis(1) });
144 queue.push(PendingWake { payload: 10u32, ready_at: now + Duration::from_millis(1) });
146
147 futures::executor::block_on(async move {
148 assert!(queue.pop_ready(now).is_none(), "No time has passed, nothing should be ready");
150
151 queue.wait_ready(now).await;
153 gum::trace!(target: LOG_TARGET, "After first wait.");
154
155 let now = start + Duration::from_millis(1);
156 assert!(Instant::now() - start >= Duration::from_millis(1));
157 assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(0u32));
158 assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(10u32));
160 assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
161
162 queue.wait_ready(now).await;
163 gum::trace!(target: LOG_TARGET, "After second wait.");
164 let now = start + Duration::from_millis(3);
165 assert!(Instant::now() - start >= Duration::from_millis(3));
166 assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(1u32));
167 assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
168
169 poll_fn(|cx| {
171 let fut = queue.wait_ready(now);
172 pin_mut!(fut);
173 assert_matches!(fut.poll(cx), Poll::Pending);
174 Poll::Ready(())
175 })
176 .await;
177 queue.push(PendingWake { payload: 3u32, ready_at: start + Duration::from_millis(4) });
178
179 queue.wait_ready(now).await;
180 gum::trace!(target: LOG_TARGET, "After third wait.");
182 let now = start + Duration::from_millis(4);
183 assert!(Instant::now() - start >= Duration::from_millis(4));
184 assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(3u32));
185 assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
186
187 queue.wait_ready(now).await;
188 gum::trace!(target: LOG_TARGET, "After fourth wait.");
189 let now = start + Duration::from_millis(5);
190 assert!(Instant::now() - start >= Duration::from_millis(5));
191 assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(2u32));
192 assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
193
194 poll_fn(|cx| {
196 let fut = queue.wait_ready(now);
197 pin_mut!(fut);
198 assert_matches!(fut.poll(cx), Poll::Pending);
199 Poll::Ready(())
200 })
201 .await;
202 });
203 }
204}