use std::{cmp::Ordering, collections::BinaryHeap, time::Instant};
use futures::future::pending;
use futures_timer::Delay;
pub struct WaitingQueue<Payload> {
pending_wakes: BinaryHeap<PendingWake<Payload>>,
timer: Option<Delay>,
}
#[derive(Eq, PartialEq)]
pub struct PendingWake<Payload> {
pub payload: Payload,
pub ready_at: Instant,
}
impl<Payload: Eq + Ord> WaitingQueue<Payload> {
pub fn new() -> Self {
Self { pending_wakes: BinaryHeap::new(), timer: None }
}
pub fn push(&mut self, wake: PendingWake<Payload>) {
self.pending_wakes.push(wake);
self.timer = None;
}
pub fn pop_ready(&mut self, now: Instant) -> Option<PendingWake<Payload>> {
let is_ready = self.pending_wakes.peek().map_or(false, |p| p.ready_at <= now);
if is_ready {
Some(self.pending_wakes.pop().expect("We just peeked. qed."))
} else {
None
}
}
pub async fn wait_ready(&mut self, now: Instant) {
if let Some(timer) = &mut self.timer {
timer.await
}
let next_waiting = self.pending_wakes.peek();
let is_ready = next_waiting.map_or(false, |p| p.ready_at <= now);
if is_ready {
return
}
self.timer = next_waiting.map(|p| Delay::new(p.ready_at.duration_since(now)));
match &mut self.timer {
None => return pending().await,
Some(timer) => timer.await,
}
}
}
impl<Payload: Eq + Ord> PartialOrd<PendingWake<Payload>> for PendingWake<Payload> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<Payload: Ord> Ord for PendingWake<Payload> {
fn cmp(&self, other: &Self) -> Ordering {
match other.ready_at.cmp(&self.ready_at) {
Ordering::Equal => other.payload.cmp(&self.payload),
o => o,
}
}
}
#[cfg(test)]
mod tests {
use std::{
task::Poll,
time::{Duration, Instant},
};
use assert_matches::assert_matches;
use futures::{future::poll_fn, pin_mut, Future};
use crate::LOG_TARGET;
use super::{PendingWake, WaitingQueue};
#[test]
fn wait_ready_waits_for_earliest_event_always() {
sp_tracing::try_init_simple();
let mut queue = WaitingQueue::new();
let now = Instant::now();
let start = now;
queue.push(PendingWake { payload: 1u32, ready_at: now + Duration::from_millis(3) });
queue.push(PendingWake { payload: 2u32, ready_at: now + Duration::from_millis(5) });
queue.push(PendingWake { payload: 0u32, ready_at: now + Duration::from_millis(1) });
queue.push(PendingWake { payload: 10u32, ready_at: now + Duration::from_millis(1) });
futures::executor::block_on(async move {
assert!(queue.pop_ready(now).is_none(), "No time has passed, nothing should be ready");
queue.wait_ready(now).await;
gum::trace!(target: LOG_TARGET, "After first wait.");
let now = start + Duration::from_millis(1);
assert!(Instant::now() - start >= Duration::from_millis(1));
assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(0u32));
assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(10u32));
assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
queue.wait_ready(now).await;
gum::trace!(target: LOG_TARGET, "After second wait.");
let now = start + Duration::from_millis(3);
assert!(Instant::now() - start >= Duration::from_millis(3));
assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(1u32));
assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
poll_fn(|cx| {
let fut = queue.wait_ready(now);
pin_mut!(fut);
assert_matches!(fut.poll(cx), Poll::Pending);
Poll::Ready(())
})
.await;
queue.push(PendingWake { payload: 3u32, ready_at: start + Duration::from_millis(4) });
queue.wait_ready(now).await;
gum::trace!(target: LOG_TARGET, "After third wait.");
let now = start + Duration::from_millis(4);
assert!(Instant::now() - start >= Duration::from_millis(4));
assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(3u32));
assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
queue.wait_ready(now).await;
gum::trace!(target: LOG_TARGET, "After fourth wait.");
let now = start + Duration::from_millis(5);
assert!(Instant::now() - start >= Duration::from_millis(5));
assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(2u32));
assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
poll_fn(|cx| {
let fut = queue.wait_ready(now);
pin_mut!(fut);
assert_matches!(fut.poll(cx), Poll::Pending);
Poll::Ready(())
})
.await;
});
}
}