use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::time::Duration;
use futures_util::stream::FuturesUnordered;
use futures_util::{Stream, StreamExt, Future};
use pin_project::pin_project;
#[pin_project]
pub(crate) struct IntervalStream<S>(#[pin] Option<S>);
impl<S> IntervalStream<S> {
pub(crate) fn pending() -> Self {
Self(None)
}
#[cfg(feature = "async-client")]
pub(crate) fn new(s: S) -> Self {
Self(Some(s))
}
}
impl<S: Stream> Stream for IntervalStream<S> {
type Item = ();
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(mut stream) = self.project().0.as_pin_mut() {
match stream.poll_next_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(_)) => Poll::Ready(Some(())),
Poll::Ready(None) => Poll::Ready(None),
}
} else {
Poll::Pending
}
}
}
#[allow(unused)]
pub(crate) enum InactivityCheck {
Disabled,
Enabled { inactive_dur: Duration, last_active: std::time::Instant, count: usize, max_count: usize }
}
impl InactivityCheck {
#[cfg(feature = "async-client")]
pub(crate) fn new(_inactive_dur: Duration, _max_count: usize) -> Self {
Self::Enabled { inactive_dur: _inactive_dur, last_active: std::time::Instant::now(), count: 0, max_count: _max_count }
}
pub(crate) fn is_inactive(&mut self) -> bool {
match self {
Self::Disabled => false,
Self::Enabled { inactive_dur, last_active, count, max_count, .. } => {
if last_active.elapsed() >= *inactive_dur {
*count += 1;
}
count >= max_count
}
}
}
pub(crate) fn mark_as_active(&mut self) {
if let Self::Enabled { last_active, .. } = self {
*last_active = std::time::Instant::now();
}
}
}
pub(crate) struct MaybePendingFutures<Fut> {
futs: FuturesUnordered<Fut>,
waker: Option<Waker>,
}
impl<Fut> MaybePendingFutures<Fut> {
pub(crate) fn new() -> Self {
Self { futs: FuturesUnordered::new(), waker: None }
}
pub(crate) fn push(&mut self, fut: Fut) {
self.futs.push(fut);
if let Some(w) = self.waker.take() {
w.wake();
}
}
}
impl<Fut: Future> Stream for MaybePendingFutures<Fut> {
type Item = Fut::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.futs.is_empty() {
self.waker = Some(cx.waker().clone());
return Poll::Pending;
}
self.futs.poll_next_unpin(cx)
}
}