prioritized_metered_channel/
unbounded.rs1use futures::{
20 stream::Stream,
21 task::{Context, Poll},
22};
23
24#[cfg(feature = "async_channel")]
25use async_channel::{unbounded as unbounded_channel, Receiver, Sender, TryRecvError, TrySendError};
26
27#[cfg(feature = "futures_channel")]
28use futures::{
29 channel::mpsc::unbounded as unbounded_channel,
30 channel::mpsc::{
31 TryRecvError, TrySendError, UnboundedReceiver as Receiver, UnboundedSender as Sender,
32 },
33};
34
35use std::{pin::Pin, result};
36
37use super::{measure_tof_check, CoarseInstant, MaybeTimeOfFlight, Meter};
38
39pub fn unbounded<T>() -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) {
41 let (tx, rx) = unbounded_channel::<MaybeTimeOfFlight<T>>();
42 let shared_meter = Meter::default();
43 let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx };
44 let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx };
45 (tx, rx)
46}
47
48#[derive(Debug)]
50pub struct UnboundedMeteredReceiver<T> {
51 meter: Meter,
53 inner: Receiver<MaybeTimeOfFlight<T>>,
54}
55
56impl<T> std::ops::Deref for UnboundedMeteredReceiver<T> {
57 type Target = Receiver<MaybeTimeOfFlight<T>>;
58 fn deref(&self) -> &Self::Target {
59 &self.inner
60 }
61}
62
63impl<T> std::ops::DerefMut for UnboundedMeteredReceiver<T> {
64 fn deref_mut(&mut self) -> &mut Self::Target {
65 &mut self.inner
66 }
67}
68
69impl<T> Stream for UnboundedMeteredReceiver<T> {
70 type Item = T;
71 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
72 match Receiver::poll_next(Pin::new(&mut self.inner), cx) {
73 Poll::Ready(maybe_value) => Poll::Ready(self.maybe_meter_tof(maybe_value)),
74 Poll::Pending => Poll::Pending,
75 }
76 }
77
78 fn size_hint(&self) -> (usize, Option<usize>) {
80 self.inner.size_hint()
81 }
82}
83
84impl<T> UnboundedMeteredReceiver<T> {
85 fn maybe_meter_tof(&mut self, maybe_value: Option<MaybeTimeOfFlight<T>>) -> Option<T> {
86 self.meter.note_received();
87 maybe_value.map(|value| {
88 match value {
89 MaybeTimeOfFlight::<T>::WithTimeOfFlight(value, tof_start) => {
90 let duration = tof_start.elapsed();
93 self.meter.note_time_of_flight(duration);
94 value
95 },
96 MaybeTimeOfFlight::<T>::Bare(value) => value,
97 }
98 .into()
99 })
100 }
101
102 pub fn meter(&self) -> &Meter {
104 &self.meter
105 }
106
107 #[cfg(feature = "futures_channel")]
109 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
110 match self.inner.try_next()? {
111 Some(value) => Ok(self.maybe_meter_tof(Some(value))),
112 None => Ok(None),
113 }
114 }
115
116 #[cfg(feature = "async_channel")]
118 pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
119 let result = match self.inner.try_recv() {
120 Ok(value) => Ok(self.maybe_meter_tof(Some(value))),
121 Err(err) => Err(err),
122 };
123
124 self.meter.note_channel_len(self.len());
125 result
126 }
127
128 #[cfg(feature = "async_channel")]
130 pub fn len(&self) -> usize {
131 self.inner.len()
132 }
133}
134
135impl<T> futures::stream::FusedStream for UnboundedMeteredReceiver<T> {
136 fn is_terminated(&self) -> bool {
137 self.inner.is_terminated()
138 }
139}
140
141#[derive(Debug)]
144pub struct UnboundedMeteredSender<T> {
145 meter: Meter,
146 inner: Sender<MaybeTimeOfFlight<T>>,
147}
148
149impl<T> Clone for UnboundedMeteredSender<T> {
150 fn clone(&self) -> Self {
151 Self { meter: self.meter.clone(), inner: self.inner.clone() }
152 }
153}
154
155impl<T> std::ops::Deref for UnboundedMeteredSender<T> {
156 type Target = Sender<MaybeTimeOfFlight<T>>;
157 fn deref(&self) -> &Self::Target {
158 &self.inner
159 }
160}
161
162impl<T> std::ops::DerefMut for UnboundedMeteredSender<T> {
163 fn deref_mut(&mut self) -> &mut Self::Target {
164 &mut self.inner
165 }
166}
167
168impl<T> UnboundedMeteredSender<T> {
169 fn prepare_with_tof(&self, item: T) -> MaybeTimeOfFlight<T> {
170 let previous = self.meter.note_sent();
171 let item = if measure_tof_check(previous) {
172 MaybeTimeOfFlight::WithTimeOfFlight(item, CoarseInstant::now())
173 } else {
174 MaybeTimeOfFlight::Bare(item)
175 };
176
177 item
178 }
179
180 pub fn meter(&self) -> &Meter {
182 &self.meter
183 }
184
185 #[cfg(feature = "futures_channel")]
187 pub fn unbounded_send(&self, msg: T) -> result::Result<(), TrySendError<MaybeTimeOfFlight<T>>> {
188 let msg = self.prepare_with_tof(msg);
189 self.inner.unbounded_send(msg).map_err(|e| {
190 self.meter.retract_sent();
191 e
192 })
193 }
194
195 #[cfg(feature = "async_channel")]
197 pub fn unbounded_send(&self, msg: T) -> result::Result<(), TrySendError<MaybeTimeOfFlight<T>>> {
198 let msg = self.prepare_with_tof(msg);
199 let result = self.inner.try_send(msg).map_err(|e| {
200 self.meter.retract_sent();
201 e
202 });
203
204 self.meter.note_channel_len(self.len());
205 result
206 }
207
208 #[cfg(feature = "async_channel")]
210 pub fn len(&self) -> usize {
211 self.inner.len()
212 }
213}