#![allow(clippy::all)]
#[cfg(all(feature = "async_channel", feature = "futures_channel",))]
compile_error!("`async_channel` and `futures_channel` are mutually exclusive features");
#[cfg(not(any(feature = "async_channel", feature = "futures_channel")))]
compile_error!("Must build with either `async_channel` or `futures_channel` features");
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use derive_more::Display;
mod bounded;
pub mod oneshot;
mod unbounded;
pub use self::{bounded::*, unbounded::*};
pub use coarsetime::Duration as CoarseDuration;
use coarsetime::Instant as CoarseInstant;
#[cfg(test)]
mod tests;
const TOF_QUEUE_SIZE: usize = 100;
#[derive(Debug, Clone)]
pub struct Meter {
sent: Arc<AtomicUsize>,
received: Arc<AtomicUsize>,
#[cfg(feature = "async_channel")]
channel_len: Arc<AtomicUsize>,
blocked: Arc<AtomicUsize>,
tof: Arc<crossbeam_queue::ArrayQueue<CoarseDuration>>,
}
impl std::default::Default for Meter {
fn default() -> Self {
Self {
sent: Arc::new(AtomicUsize::new(0)),
received: Arc::new(AtomicUsize::new(0)),
#[cfg(feature = "async_channel")]
channel_len: Arc::new(AtomicUsize::new(0)),
blocked: Arc::new(AtomicUsize::new(0)),
tof: Arc::new(crossbeam_queue::ArrayQueue::new(TOF_QUEUE_SIZE)),
}
}
}
#[derive(Debug, Display, Clone, Default, PartialEq)]
#[display(fmt = "(sent={} received={})", sent, received)]
pub struct Readout {
pub sent: usize,
pub received: usize,
pub channel_len: usize,
pub blocked: usize,
pub tof: Vec<CoarseDuration>,
}
impl Meter {
pub fn read(&self) -> Readout {
let sent = self.sent.load(Ordering::Relaxed);
let received = self.received.load(Ordering::Relaxed);
#[cfg(feature = "async_channel")]
let channel_len = self.channel_len.load(Ordering::Relaxed);
#[cfg(feature = "futures_channel")]
let channel_len = sent.saturating_sub(received);
Readout {
sent,
received,
channel_len,
blocked: self.blocked.load(Ordering::Relaxed),
tof: {
let mut acc = Vec::with_capacity(self.tof.len());
while let Some(value) = self.tof.pop() {
acc.push(value)
}
acc
},
}
}
fn note_sent(&self) -> usize {
self.sent.fetch_add(1, Ordering::Relaxed)
}
#[cfg(feature = "async_channel")]
fn note_channel_len(&self, len: usize) {
self.channel_len.store(len, Ordering::Relaxed)
}
fn retract_sent(&self) {
self.sent.fetch_sub(1, Ordering::Relaxed);
}
fn note_received(&self) {
self.received.fetch_add(1, Ordering::Relaxed);
}
fn note_blocked(&self) {
self.blocked.fetch_add(1, Ordering::Relaxed);
}
fn note_time_of_flight(&self, tof: CoarseDuration) {
let _ = self.tof.force_push(tof);
}
#[cfg(feature = "futures_channel")]
fn calculate_channel_len(&self) -> usize {
let sent = self.sent.load(Ordering::Relaxed);
let received = self.received.load(Ordering::Relaxed);
sent.saturating_sub(received) as usize
}
}
#[inline(always)]
fn measure_tof_check(nth: usize) -> bool {
if cfg!(test) {
nth & 0x01 == 0
} else {
use nanorand::Rng;
let mut rng = nanorand::WyRand::new_seed(nth as u64);
const PROB: u64 = (u64::MAX as f64 * 0.053_f64) as u64;
let coin = rng.generate::<u64>();
coin >= PROB
}
}
#[derive(Debug)]
pub enum MaybeTimeOfFlight<T: Sized> {
Bare(T),
WithTimeOfFlight(T, CoarseInstant),
}
impl<T> From<T> for MaybeTimeOfFlight<T> {
fn from(value: T) -> Self {
Self::Bare(value)
}
}
impl<T> MaybeTimeOfFlight<T> {
pub fn into(self) -> T {
match self {
Self::Bare(value) => value,
Self::WithTimeOfFlight(value, _tof_start) => value,
}
}
}
pub fn prepare_with_tof<T>(meter: &Meter, item: T) -> MaybeTimeOfFlight<T> {
let previous = meter.note_sent();
let item = if measure_tof_check(previous) {
MaybeTimeOfFlight::WithTimeOfFlight(item, CoarseInstant::now())
} else {
MaybeTimeOfFlight::Bare(item)
};
item
}
impl<T> std::ops::Deref for MaybeTimeOfFlight<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
match self {
Self::Bare(ref value) => value,
Self::WithTimeOfFlight(ref value, _tof_start) => value,
}
}
}