jsonrpsee_core/client/async_client/
utils.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27use std::pin::Pin;
28use std::task::{Context, Poll, Waker};
29use std::time::Duration;
30
31use futures_util::stream::FuturesUnordered;
32use futures_util::{Stream, StreamExt, Future};
33use pin_project::pin_project;
34
35#[pin_project]
36pub(crate) struct IntervalStream<S>(#[pin] Option<S>);
37
38impl<S> IntervalStream<S> {
39	/// Creates a stream which never returns any elements.
40	pub(crate) fn pending() -> Self {
41		Self(None)
42	}
43
44	/// Creates a stream which produces elements with interval of `period`.
45	#[cfg(feature = "async-client")]
46	pub(crate) fn new(s: S) -> Self {
47		Self(Some(s))
48	}
49}
50
51impl<S: Stream> Stream for IntervalStream<S> {
52	type Item = ();
53
54	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55		if let Some(mut stream) = self.project().0.as_pin_mut() {
56			match stream.poll_next_unpin(cx) {
57				Poll::Pending => Poll::Pending,
58				Poll::Ready(Some(_)) => Poll::Ready(Some(())),
59				Poll::Ready(None) => Poll::Ready(None),
60			}
61		} else {
62			// NOTE: this will not be woken up again and it's by design
63			// to be a pending stream that never returns.
64			Poll::Pending
65		}
66	}
67}
68
69#[allow(unused)]
70pub(crate) enum InactivityCheck {
71	Disabled,
72	Enabled { inactive_dur: Duration, last_active: std::time::Instant, count: usize, max_count: usize }
73}
74
75impl InactivityCheck {
76	#[cfg(feature = "async-client")]
77	pub(crate) fn new(_inactive_dur: Duration, _max_count: usize) -> Self {
78		Self::Enabled { inactive_dur: _inactive_dur, last_active: std::time::Instant::now(), count: 0, max_count: _max_count }
79	}
80
81	pub(crate) fn is_inactive(&mut self) -> bool {
82		match self {
83			Self::Disabled => false,
84			Self::Enabled { inactive_dur, last_active, count, max_count, .. } => {
85				if last_active.elapsed() >= *inactive_dur {
86					*count += 1;
87				}
88
89				count >= max_count
90			}
91 		}
92	}
93
94	pub(crate) fn mark_as_active(&mut self) {
95		if let Self::Enabled { last_active, .. } = self {
96			*last_active = std::time::Instant::now();
97		}
98	}
99}
100
101
102
103/// A wrapper around `FuturesUnordered` that doesn't return `None` when it's empty.
104pub(crate) struct MaybePendingFutures<Fut> {
105	futs: FuturesUnordered<Fut>,
106	waker: Option<Waker>,
107}
108
109impl<Fut> MaybePendingFutures<Fut> {
110	pub(crate) fn new() -> Self {
111		Self { futs: FuturesUnordered::new(), waker: None }
112	}
113
114	pub(crate) fn push(&mut self, fut: Fut) {
115		self.futs.push(fut);
116
117		if let Some(w) = self.waker.take() {
118			w.wake();
119		}
120	}
121}
122
123impl<Fut: Future> Stream for MaybePendingFutures<Fut> {
124	type Item = Fut::Output;
125
126	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127		if self.futs.is_empty() {
128			self.waker = Some(cx.waker().clone());
129			return Poll::Pending;
130		}
131
132		self.futs.poll_next_unpin(cx)
133	}
134}