litep2p/utils/futures_stream.rs
1// Copyright 2024 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use futures::{stream::FuturesUnordered, Stream, StreamExt};
22
23use std::{
24 future::Future,
25 pin::Pin,
26 task::{Context, Poll, Waker},
27};
28
29/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically.
30/// The [`Stream`] implemented by [`FuturesStream`] never terminates and can be
31/// polled when contains no futures.
32#[derive(Default)]
33pub struct FuturesStream<F> {
34 futures: FuturesUnordered<F>,
35 waker: Option<Waker>,
36}
37
38impl<F> FuturesStream<F> {
39 /// Create new [`FuturesStream`].
40 pub fn new() -> Self {
41 Self {
42 futures: FuturesUnordered::new(),
43 waker: None,
44 }
45 }
46
47 /// Number of futures in the stream.
48 pub fn len(&self) -> usize {
49 self.futures.len()
50 }
51
52 /// Check if the stream is empty.
53 pub fn is_empty(&self) -> bool {
54 self.futures.is_empty()
55 }
56
57 /// Push a future for processing.
58 pub fn push(&mut self, future: F) {
59 self.futures.push(future);
60
61 if let Some(waker) = self.waker.take() {
62 waker.wake();
63 }
64 }
65}
66
67impl<F: Future> Stream for FuturesStream<F> {
68 type Item = <F as Future>::Output;
69
70 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71 let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else {
72 // We must save the current waker to wake up the task when new futures are inserted.
73 //
74 // Otherwise, simply returning `Poll::Pending` here would cause the task to never be
75 // woken up again.
76 //
77 // We were previously relying on some other task from the `loop tokio::select!` to
78 // finish.
79 self.waker = Some(cx.waker().clone());
80
81 return Poll::Pending;
82 };
83
84 Poll::Ready(Some(result))
85 }
86}