jsonrpsee_core/client/async_client/
utils.rs1use std::pin::Pin;
28use std::task::{Context, Poll, Waker};
29use std::time::Duration;
30
31use futures_util::stream::FuturesUnordered;
32use futures_util::{Stream, StreamExt, Future};
33use pin_project::pin_project;
34
35#[pin_project]
36pub(crate) struct IntervalStream<S>(#[pin] Option<S>);
37
38impl<S> IntervalStream<S> {
39 pub(crate) fn pending() -> Self {
41 Self(None)
42 }
43
44 #[cfg(feature = "async-client")]
46 pub(crate) fn new(s: S) -> Self {
47 Self(Some(s))
48 }
49}
50
51impl<S: Stream> Stream for IntervalStream<S> {
52 type Item = ();
53
54 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55 if let Some(mut stream) = self.project().0.as_pin_mut() {
56 match stream.poll_next_unpin(cx) {
57 Poll::Pending => Poll::Pending,
58 Poll::Ready(Some(_)) => Poll::Ready(Some(())),
59 Poll::Ready(None) => Poll::Ready(None),
60 }
61 } else {
62 Poll::Pending
65 }
66 }
67}
68
69#[allow(unused)]
70pub(crate) enum InactivityCheck {
71 Disabled,
72 Enabled { inactive_dur: Duration, last_active: std::time::Instant, count: usize, max_count: usize }
73}
74
75impl InactivityCheck {
76 #[cfg(feature = "async-client")]
77 pub(crate) fn new(_inactive_dur: Duration, _max_count: usize) -> Self {
78 Self::Enabled { inactive_dur: _inactive_dur, last_active: std::time::Instant::now(), count: 0, max_count: _max_count }
79 }
80
81 pub(crate) fn is_inactive(&mut self) -> bool {
82 match self {
83 Self::Disabled => false,
84 Self::Enabled { inactive_dur, last_active, count, max_count, .. } => {
85 if last_active.elapsed() >= *inactive_dur {
86 *count += 1;
87 }
88
89 count >= max_count
90 }
91 }
92 }
93
94 pub(crate) fn mark_as_active(&mut self) {
95 if let Self::Enabled { last_active, .. } = self {
96 *last_active = std::time::Instant::now();
97 }
98 }
99}
100
101
102
103pub(crate) struct MaybePendingFutures<Fut> {
105 futs: FuturesUnordered<Fut>,
106 waker: Option<Waker>,
107}
108
109impl<Fut> MaybePendingFutures<Fut> {
110 pub(crate) fn new() -> Self {
111 Self { futs: FuturesUnordered::new(), waker: None }
112 }
113
114 pub(crate) fn push(&mut self, fut: Fut) {
115 self.futs.push(fut);
116
117 if let Some(w) = self.waker.take() {
118 w.wake();
119 }
120 }
121}
122
123impl<Fut: Future> Stream for MaybePendingFutures<Fut> {
124 type Item = Fut::Output;
125
126 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127 if self.futs.is_empty() {
128 self.waker = Some(cx.waker().clone());
129 return Poll::Pending;
130 }
131
132 self.futs.poll_next_unpin(cx)
133 }
134}