sc_network_sync/
futures_stream.rs1use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
23use std::{
24 pin::Pin,
25 task::{Context, Poll, Waker},
26};
27
28pub struct FuturesStream<F> {
30 futures: FuturesUnordered<F>,
31 waker: Option<Waker>,
32}
33
34impl<F> Default for FuturesStream<F> {
36 fn default() -> FuturesStream<F> {
37 FuturesStream { futures: Default::default(), waker: None }
38 }
39}
40
41impl<F> FuturesStream<F> {
42 pub fn push(&mut self, future: F) {
44 self.futures.push(future);
45
46 if let Some(waker) = self.waker.take() {
47 waker.wake();
48 }
49 }
50
51 pub fn len(&self) -> usize {
53 self.futures.len()
54 }
55}
56
57impl<F: Future> Stream for FuturesStream<F> {
58 type Item = <F as Future>::Output;
59
60 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61 let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else {
62 self.waker = Some(cx.waker().clone());
63
64 return Poll::Pending
65 };
66
67 Poll::Ready(Some(result))
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use super::*;
74 use futures::future::{BoxFuture, FutureExt};
75
76 #[tokio::test]
80 async fn empty_futures_unordered_can_be_polled() {
81 let mut unordered = FuturesUnordered::<BoxFuture<()>>::default();
82
83 futures::future::poll_fn(|cx| {
84 assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
85 assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
86
87 Poll::Ready(())
88 })
89 .await;
90 }
91
92 #[tokio::test]
96 async fn deplenished_futures_unordered_can_be_polled() {
97 let mut unordered = FuturesUnordered::<BoxFuture<()>>::default();
98
99 unordered.push(futures::future::ready(()).boxed());
100 assert_eq!(unordered.next().await, Some(()));
101
102 futures::future::poll_fn(|cx| {
103 assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
104 assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
105
106 Poll::Ready(())
107 })
108 .await;
109 }
110
111 #[tokio::test]
112 async fn empty_futures_stream_yields_pending() {
113 let mut stream = FuturesStream::<BoxFuture<()>>::default();
114
115 futures::future::poll_fn(|cx| {
116 assert_eq!(stream.poll_next_unpin(cx), Poll::Pending);
117 Poll::Ready(())
118 })
119 .await;
120 }
121
122 #[tokio::test]
123 async fn futures_stream_resolves_futures_and_yields_pending() {
124 let mut stream = FuturesStream::default();
125 stream.push(futures::future::ready(17));
126
127 futures::future::poll_fn(|cx| {
128 assert_eq!(stream.poll_next_unpin(cx), Poll::Ready(Some(17)));
129 assert_eq!(stream.poll_next_unpin(cx), Poll::Pending);
130 Poll::Ready(())
131 })
132 .await;
133 }
134}