referrerpolicy=no-referrer-when-downgrade

polkadot_dispute_distribution/receiver/batches/
waiting_queue.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use std::{cmp::Ordering, collections::BinaryHeap, time::Instant};
18
19use futures::future::pending;
20use futures_timer::Delay;
21
22/// Wait asynchronously for given `Instant`s one after the other.
23///
24/// `PendingWake`s can be inserted and `WaitingQueue` makes `wait_ready()` to always wait for the
25/// next `Instant` in the queue.
26pub struct WaitingQueue<Payload> {
27	/// All pending wakes we are supposed to wait on in order.
28	pending_wakes: BinaryHeap<PendingWake<Payload>>,
29	/// Wait for next `PendingWake`.
30	timer: Option<Delay>,
31}
32
33/// Represents some event waiting to be processed at `ready_at`.
34///
35/// This is an event in `WaitingQueue`. It provides an `Ord` instance, that sorts descending with
36/// regard to `Instant` (so we get a `min-heap` with the earliest `Instant` at the top).
37#[derive(Eq, PartialEq)]
38pub struct PendingWake<Payload> {
39	pub payload: Payload,
40	pub ready_at: Instant,
41}
42
43impl<Payload: Eq + Ord> WaitingQueue<Payload> {
44	/// Get a new empty `WaitingQueue`.
45	///
46	/// If you call `pop` on this queue immediately, it will always return `Poll::Pending`.
47	pub fn new() -> Self {
48		Self { pending_wakes: BinaryHeap::new(), timer: None }
49	}
50
51	/// Push a `PendingWake`.
52	///
53	/// The next call to `wait_ready` will make sure to wake soon enough to process that new event
54	/// in a timely manner.
55	pub fn push(&mut self, wake: PendingWake<Payload>) {
56		self.pending_wakes.push(wake);
57		// Reset timer as it is potentially obsolete now:
58		self.timer = None;
59	}
60
61	/// Pop the next ready item.
62	///
63	/// This function does not wait, if nothing is ready right now as determined by the passed
64	/// `now` time stamp, this function simply returns `None`.
65	pub fn pop_ready(&mut self, now: Instant) -> Option<PendingWake<Payload>> {
66		let is_ready = self.pending_wakes.peek().map_or(false, |p| p.ready_at <= now);
67		if is_ready {
68			Some(self.pending_wakes.pop().expect("We just peeked. qed."))
69		} else {
70			None
71		}
72	}
73
74	/// Don't pop, just wait until something is ready.
75	///
76	/// Once this function returns `Poll::Ready(())` `pop_ready()` will return `Some`, if passed
77	/// the same `Instant`.
78	///
79	/// Whether ready or not is determined based on the passed time stamp `now` which should be the
80	/// current time as returned by `Instant::now()`
81	///
82	/// This function waits asynchronously for an item to become ready. If there is no more item,
83	/// this call will wait forever (return Poll::Pending without scheduling a wake).
84	pub async fn wait_ready(&mut self, now: Instant) {
85		if let Some(timer) = &mut self.timer {
86			// Previous timer was not done yet.
87			timer.await
88		}
89
90		let next_waiting = self.pending_wakes.peek();
91		let is_ready = next_waiting.map_or(false, |p| p.ready_at <= now);
92		if is_ready {
93			return
94		}
95
96		self.timer = next_waiting.map(|p| Delay::new(p.ready_at.duration_since(now)));
97		match &mut self.timer {
98			None => return pending().await,
99			Some(timer) => timer.await,
100		}
101	}
102}
103
104impl<Payload: Eq + Ord> PartialOrd<PendingWake<Payload>> for PendingWake<Payload> {
105	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
106		Some(self.cmp(other))
107	}
108}
109
110impl<Payload: Ord> Ord for PendingWake<Payload> {
111	fn cmp(&self, other: &Self) -> Ordering {
112		// Reverse order for min-heap:
113		match other.ready_at.cmp(&self.ready_at) {
114			Ordering::Equal => other.payload.cmp(&self.payload),
115			o => o,
116		}
117	}
118}
119#[cfg(test)]
120mod tests {
121	use std::{
122		task::Poll,
123		time::{Duration, Instant},
124	};
125
126	use assert_matches::assert_matches;
127	use futures::{future::poll_fn, pin_mut, Future};
128
129	use crate::LOG_TARGET;
130
131	use super::{PendingWake, WaitingQueue};
132
133	#[test]
134	fn wait_ready_waits_for_earliest_event_always() {
135		sp_tracing::try_init_simple();
136		let mut queue = WaitingQueue::new();
137		let now = Instant::now();
138		let start = now;
139		queue.push(PendingWake { payload: 1u32, ready_at: now + Duration::from_millis(3) });
140		// Push another one in order:
141		queue.push(PendingWake { payload: 2u32, ready_at: now + Duration::from_millis(5) });
142		// Push one out of order:
143		queue.push(PendingWake { payload: 0u32, ready_at: now + Duration::from_millis(1) });
144		// Push another one at same timestamp (should become ready at the same time)
145		queue.push(PendingWake { payload: 10u32, ready_at: now + Duration::from_millis(1) });
146
147		futures::executor::block_on(async move {
148			// No time passed yet - nothing should be ready.
149			assert!(queue.pop_ready(now).is_none(), "No time has passed, nothing should be ready");
150
151			// Receive them in order at expected times:
152			queue.wait_ready(now).await;
153			gum::trace!(target: LOG_TARGET, "After first wait.");
154
155			let now = start + Duration::from_millis(1);
156			assert!(Instant::now() - start >= Duration::from_millis(1));
157			assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(0u32));
158			// One more should be ready:
159			assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(10u32));
160			assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
161
162			queue.wait_ready(now).await;
163			gum::trace!(target: LOG_TARGET, "After second wait.");
164			let now = start + Duration::from_millis(3);
165			assert!(Instant::now() - start >= Duration::from_millis(3));
166			assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(1u32));
167			assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
168
169			// Push in between wait:
170			poll_fn(|cx| {
171				let fut = queue.wait_ready(now);
172				pin_mut!(fut);
173				assert_matches!(fut.poll(cx), Poll::Pending);
174				Poll::Ready(())
175			})
176			.await;
177			queue.push(PendingWake { payload: 3u32, ready_at: start + Duration::from_millis(4) });
178
179			queue.wait_ready(now).await;
180			// Newly pushed element should have become ready:
181			gum::trace!(target: LOG_TARGET, "After third wait.");
182			let now = start + Duration::from_millis(4);
183			assert!(Instant::now() - start >= Duration::from_millis(4));
184			assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(3u32));
185			assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
186
187			queue.wait_ready(now).await;
188			gum::trace!(target: LOG_TARGET, "After fourth wait.");
189			let now = start + Duration::from_millis(5);
190			assert!(Instant::now() - start >= Duration::from_millis(5));
191			assert_eq!(queue.pop_ready(now).map(|p| p.payload), Some(2u32));
192			assert!(queue.pop_ready(now).is_none(), "No more entry expected to be ready.");
193
194			// queue empty - should wait forever now:
195			poll_fn(|cx| {
196				let fut = queue.wait_ready(now);
197				pin_mut!(fut);
198				assert_matches!(fut.poll(cx), Poll::Pending);
199				Poll::Ready(())
200			})
201			.await;
202		});
203	}
204}