futures_bounded/
set.rs

1use std::future::Future;
2use std::task::{ready, Context, Poll};
3use std::time::Duration;
4
5use futures_util::future::BoxFuture;
6
7use crate::{FuturesMap, PushError, Timeout};
8
9/// Represents a list of [Future]s.
10///
11/// Each future must finish within the specified time and the list never outgrows its capacity.
12pub struct FuturesSet<O> {
13    id: u32,
14    inner: FuturesMap<u32, O>,
15}
16
17impl<O> FuturesSet<O> {
18    pub fn new(timeout: Duration, capacity: usize) -> Self {
19        Self {
20            id: 0,
21            inner: FuturesMap::new(timeout, capacity),
22        }
23    }
24}
25
26impl<O> FuturesSet<O> {
27    /// Push a future into the list.
28    ///
29    /// This method adds the given future to the list.
30    /// If the length of the list is equal to the capacity, this method returns a error that contains the passed future.
31    /// In that case, the future is not added to the set.
32    pub fn try_push<F>(&mut self, future: F) -> Result<(), BoxFuture<O>>
33    where
34        F: Future<Output = O> + Send + 'static,
35    {
36        self.id = self.id.wrapping_add(1);
37
38        match self.inner.try_push(self.id, future) {
39            Ok(()) => Ok(()),
40            Err(PushError::BeyondCapacity(w)) => Err(w),
41            Err(PushError::ReplacedFuture(_)) => unreachable!("we never reuse IDs"),
42        }
43    }
44
45    pub fn is_empty(&self) -> bool {
46        self.inner.is_empty()
47    }
48
49    pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> {
50        self.inner.poll_ready_unpin(cx)
51    }
52
53    pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<O, Timeout>> {
54        let (_, res) = ready!(self.inner.poll_unpin(cx));
55
56        Poll::Ready(res)
57    }
58}