referrerpolicy=no-referrer-when-downgrade

polkadot_availability_recovery/
futures_undead.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! FuturesUndead: A `FuturesUnordered` with support for semi canceled futures. Those undead
18//! futures will still get polled, but will not count towards length. So length will only count
19//! futures, which are still considered live.
20//!
21//! Use case: If futures take longer than we would like them too, we may be able to request the data
22//! from somewhere else as well. We don't really want to cancel the old future, because maybe it
23//! was almost done, thus we would have wasted time with our impatience. By simply making them
24//! not count towards length, we can make sure to have enough "live" requests ongoing, while at the
25//! same time taking advantage of some maybe "late" response from the undead.
26
27use std::{
28	pin::Pin,
29	task::{Context, Poll},
30	time::Duration,
31};
32
33use futures::{future::BoxFuture, stream::FuturesUnordered, Future, Stream, StreamExt};
34use polkadot_node_subsystem_util::TimeoutExt;
35
36/// FuturesUndead - `FuturesUnordered` with semi canceled (undead) futures.
37///
38/// Limitations: Keeps track of undead futures by means of a counter, which is limited to 64
39/// bits, so after `1.8*10^19` pushed futures, this implementation will panic.
40pub struct FuturesUndead<Output> {
41	/// Actual `FuturesUnordered`.
42	inner: FuturesUnordered<Undead<Output>>,
43	/// Next sequence number to assign to the next future that gets pushed.
44	next_sequence: SequenceNumber,
45	/// Sequence number of first future considered live.
46	first_live: Option<SequenceNumber>,
47	/// How many undead are there right now.
48	undead: usize,
49}
50
51/// All futures get a number, to determine which are live.
52#[derive(Eq, PartialEq, Copy, Clone, Debug, PartialOrd)]
53struct SequenceNumber(usize);
54
55struct Undead<Output> {
56	inner: BoxFuture<'static, Output>,
57	our_sequence: SequenceNumber,
58}
59
60impl<Output> FuturesUndead<Output> {
61	pub fn new() -> Self {
62		Self {
63			inner: FuturesUnordered::new(),
64			next_sequence: SequenceNumber(0),
65			first_live: None,
66			undead: 0,
67		}
68	}
69
70	pub fn push(&mut self, f: BoxFuture<'static, Output>) {
71		self.inner.push(Undead { inner: f, our_sequence: self.next_sequence });
72		self.next_sequence.inc();
73	}
74
75	/// Make all contained futures undead.
76	///
77	/// They will no longer be counted on a call to `len`.
78	pub fn soft_cancel(&mut self) {
79		self.undead = self.inner.len();
80		self.first_live = Some(self.next_sequence);
81	}
82
83	/// Number of contained futures minus undead.
84	pub fn len(&self) -> usize {
85		self.inner.len() - self.undead
86	}
87
88	/// Total number of futures, including undead.
89	pub fn total_len(&self) -> usize {
90		self.inner.len()
91	}
92
93	/// Wait for next future to return with timeout.
94	///
95	/// When timeout passes, return `None` and make all currently contained futures undead.
96	pub async fn next_with_timeout(&mut self, timeout: Duration) -> Option<Output> {
97		match self.next().timeout(timeout).await {
98			// Timeout:
99			None => {
100				self.soft_cancel();
101				None
102			},
103			Some(inner) => inner,
104		}
105	}
106}
107
108impl<Output> Stream for FuturesUndead<Output> {
109	type Item = Output;
110
111	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112		match self.inner.poll_next_unpin(cx) {
113			Poll::Pending => Poll::Pending,
114			Poll::Ready(None) => Poll::Ready(None),
115			Poll::Ready(Some((sequence, v))) => {
116				// Cleanup in case we became completely empty:
117				if self.inner.len() == 0 {
118					*self = Self::new();
119					return Poll::Ready(Some(v))
120				}
121
122				let first_live = match self.first_live {
123					None => return Poll::Ready(Some(v)),
124					Some(first_live) => first_live,
125				};
126				// An undead came back:
127				if sequence < first_live {
128					self.undead = self.undead.saturating_sub(1);
129				}
130				Poll::Ready(Some(v))
131			},
132		}
133	}
134}
135
136impl SequenceNumber {
137	pub fn inc(&mut self) {
138		self.0 = self.0.checked_add(1).expect(
139			"We don't expect an `UndeadFuture` to live long enough for 2^64 entries ever getting inserted."
140		);
141	}
142}
143
144impl<T> Future for Undead<T> {
145	type Output = (SequenceNumber, T);
146	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
147		match self.inner.as_mut().poll(cx) {
148			Poll::Pending => Poll::Pending,
149			Poll::Ready(v) => Poll::Ready((self.our_sequence, v)),
150		}
151	}
152}
153
154#[cfg(test)]
155mod tests {
156	use super::*;
157	use futures::{executor, pending, FutureExt};
158
159	#[test]
160	fn cancel_sets_len_to_zero() {
161		let mut undead = FuturesUndead::new();
162		undead.push((async { () }).boxed());
163		assert_eq!(undead.len(), 1);
164		undead.soft_cancel();
165		assert_eq!(undead.len(), 0);
166	}
167
168	#[test]
169	fn finished_undead_does_not_change_len() {
170		executor::block_on(async {
171			let mut undead = FuturesUndead::new();
172			undead.push(async { 1_i32 }.boxed());
173			undead.push(async { 2_i32 }.boxed());
174			assert_eq!(undead.len(), 2);
175			undead.soft_cancel();
176			assert_eq!(undead.len(), 0);
177			undead.push(
178				async {
179					pending!();
180					0_i32
181				}
182				.boxed(),
183			);
184			undead.next().await;
185			assert_eq!(undead.len(), 1);
186			undead.push(async { 9_i32 }.boxed());
187			undead.soft_cancel();
188			assert_eq!(undead.len(), 0);
189		});
190	}
191
192	#[test]
193	fn len_stays_correct_when_live_future_ends() {
194		executor::block_on(async {
195			let mut undead = FuturesUndead::new();
196			undead.push(
197				async {
198					pending!();
199					1_i32
200				}
201				.boxed(),
202			);
203			undead.push(
204				async {
205					pending!();
206					2_i32
207				}
208				.boxed(),
209			);
210			assert_eq!(undead.len(), 2);
211			undead.soft_cancel();
212			assert_eq!(undead.len(), 0);
213			undead.push(async { 0_i32 }.boxed());
214			undead.push(async { 1_i32 }.boxed());
215			undead.next().await;
216			assert_eq!(undead.len(), 1);
217			undead.next().await;
218			assert_eq!(undead.len(), 0);
219			undead.push(async { 9_i32 }.boxed());
220			assert_eq!(undead.len(), 1);
221		});
222	}
223
224	#[test]
225	fn cleanup_works() {
226		executor::block_on(async {
227			let mut undead = FuturesUndead::new();
228			undead.push(async { 1_i32 }.boxed());
229			undead.soft_cancel();
230			undead.push(async { 2_i32 }.boxed());
231			undead.next().await;
232			undead.next().await;
233			assert_eq!(undead.first_live, None);
234		});
235	}
236}