litep2p/utils/
futures_stream.rs

1// Copyright 2024 litep2p developers
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use futures::{stream::FuturesUnordered, Stream, StreamExt};
22
23use std::{
24    future::Future,
25    pin::Pin,
26    task::{Context, Poll, Waker},
27};
28
29/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically.
30/// The [`Stream`] implemented by [`FuturesStream`] never terminates and can be
31/// polled when contains no futures.
32#[derive(Default)]
33pub struct FuturesStream<F> {
34    futures: FuturesUnordered<F>,
35    waker: Option<Waker>,
36}
37
38impl<F> FuturesStream<F> {
39    /// Create new [`FuturesStream`].
40    pub fn new() -> Self {
41        Self {
42            futures: FuturesUnordered::new(),
43            waker: None,
44        }
45    }
46
47    /// Number of futures in the stream.
48    pub fn len(&self) -> usize {
49        self.futures.len()
50    }
51
52    /// Check if the stream is empty.
53    pub fn is_empty(&self) -> bool {
54        self.futures.is_empty()
55    }
56
57    /// Push a future for processing.
58    pub fn push(&mut self, future: F) {
59        self.futures.push(future);
60
61        if let Some(waker) = self.waker.take() {
62            waker.wake();
63        }
64    }
65}
66
67impl<F: Future> Stream for FuturesStream<F> {
68    type Item = <F as Future>::Output;
69
70    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71        let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else {
72            // We must save the current waker to wake up the task when new futures are inserted.
73            //
74            // Otherwise, simply returning `Poll::Pending` here would cause the task to never be
75            // woken up again.
76            //
77            // We were previously relying on some other task from the `loop tokio::select!` to
78            // finish.
79            self.waker = Some(cx.waker().clone());
80
81            return Poll::Pending;
82        };
83
84        Poll::Ready(Some(result))
85    }
86}