referrerpolicy=no-referrer-when-downgrade

sc_network_sync/
futures_stream.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! A wrapper for [`FuturesUnordered`] that wakes the task up once a new future is pushed
20//! for it to be polled automatically. It's [`Stream`] never terminates.
21
22use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
23use std::{
24	pin::Pin,
25	task::{Context, Poll, Waker},
26};
27
28/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically.
29pub struct FuturesStream<F> {
30	futures: FuturesUnordered<F>,
31	waker: Option<Waker>,
32}
33
34/// Surprizingly, `#[derive(Default)]` doesn't work on [`FuturesStream`].
35impl<F> Default for FuturesStream<F> {
36	fn default() -> FuturesStream<F> {
37		FuturesStream { futures: Default::default(), waker: None }
38	}
39}
40
41impl<F> FuturesStream<F> {
42	/// Push a future for processing.
43	pub fn push(&mut self, future: F) {
44		self.futures.push(future);
45
46		if let Some(waker) = self.waker.take() {
47			waker.wake();
48		}
49	}
50
51	/// The number of futures in the stream.
52	pub fn len(&self) -> usize {
53		self.futures.len()
54	}
55}
56
57impl<F: Future> Stream for FuturesStream<F> {
58	type Item = <F as Future>::Output;
59
60	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61		let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else {
62			self.waker = Some(cx.waker().clone());
63
64			return Poll::Pending
65		};
66
67		Poll::Ready(Some(result))
68	}
69}
70
71#[cfg(test)]
72mod tests {
73	use super::*;
74	use futures::future::{BoxFuture, FutureExt};
75
76	/// [`Stream`] implementation for [`FuturesStream`] relies on the undocumented
77	/// feature that [`FuturesUnordered`] can be polled and repeatedly yield
78	/// `Poll::Ready(None)` before any futures are added into it.
79	#[tokio::test]
80	async fn empty_futures_unordered_can_be_polled() {
81		let mut unordered = FuturesUnordered::<BoxFuture<()>>::default();
82
83		futures::future::poll_fn(|cx| {
84			assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
85			assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
86
87			Poll::Ready(())
88		})
89		.await;
90	}
91
92	/// [`Stream`] implementation for [`FuturesStream`] relies on the undocumented
93	/// feature that [`FuturesUnordered`] can be polled and repeatedly yield
94	/// `Poll::Ready(None)` after all the futures in it have resolved.
95	#[tokio::test]
96	async fn deplenished_futures_unordered_can_be_polled() {
97		let mut unordered = FuturesUnordered::<BoxFuture<()>>::default();
98
99		unordered.push(futures::future::ready(()).boxed());
100		assert_eq!(unordered.next().await, Some(()));
101
102		futures::future::poll_fn(|cx| {
103			assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
104			assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
105
106			Poll::Ready(())
107		})
108		.await;
109	}
110
111	#[tokio::test]
112	async fn empty_futures_stream_yields_pending() {
113		let mut stream = FuturesStream::<BoxFuture<()>>::default();
114
115		futures::future::poll_fn(|cx| {
116			assert_eq!(stream.poll_next_unpin(cx), Poll::Pending);
117			Poll::Ready(())
118		})
119		.await;
120	}
121
122	#[tokio::test]
123	async fn futures_stream_resolves_futures_and_yields_pending() {
124		let mut stream = FuturesStream::default();
125		stream.push(futures::future::ready(17));
126
127		futures::future::poll_fn(|cx| {
128			assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(17)));
129			assert_eq!(stream.poll_next_unpin(cx), Poll::Pending);
130			Poll::Ready(())
131		})
132		.await;
133	}
134}