pub use async_channel::{TryRecvError, TrySendError};
use crate::metrics::UNBOUNDED_CHANNELS_COUNTER;
use async_channel::{Receiver, Sender};
use futures::{
	stream::{FusedStream, Stream},
	task::{Context, Poll},
};
use log::error;
use sp_arithmetic::traits::SaturatedConversion;
use std::{
	backtrace::Backtrace,
	pin::Pin,
	sync::{
		atomic::{AtomicBool, Ordering},
		Arc,
	},
};
#[derive(Debug)]
pub struct TracingUnboundedSender<T> {
	inner: Sender<T>,
	name: &'static str,
	queue_size_warning: usize,
	warning_fired: Arc<AtomicBool>,
	creation_backtrace: Arc<Backtrace>,
}
impl<T> Clone for TracingUnboundedSender<T> {
	fn clone(&self) -> Self {
		Self {
			inner: self.inner.clone(),
			name: self.name,
			queue_size_warning: self.queue_size_warning,
			warning_fired: self.warning_fired.clone(),
			creation_backtrace: self.creation_backtrace.clone(),
		}
	}
}
#[derive(Debug)]
pub struct TracingUnboundedReceiver<T> {
	inner: Receiver<T>,
	name: &'static str,
}
pub fn tracing_unbounded<T>(
	name: &'static str,
	queue_size_warning: usize,
) -> (TracingUnboundedSender<T>, TracingUnboundedReceiver<T>) {
	let (s, r) = async_channel::unbounded();
	let sender = TracingUnboundedSender {
		inner: s,
		name,
		queue_size_warning,
		warning_fired: Arc::new(AtomicBool::new(false)),
		creation_backtrace: Arc::new(Backtrace::force_capture()),
	};
	let receiver = TracingUnboundedReceiver { inner: r, name };
	(sender, receiver)
}
impl<T> TracingUnboundedSender<T> {
	pub fn is_closed(&self) -> bool {
		self.inner.is_closed()
	}
	pub fn close(&self) -> bool {
		self.inner.close()
	}
	pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
		self.inner.try_send(msg).map(|s| {
			UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "send"]).inc();
			if self.inner.len() >= self.queue_size_warning &&
				self.warning_fired
					.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
					.is_ok()
			{
				error!(
					"The number of unprocessed messages in channel `{}` exceeded {}.\n\
					 The channel was created at:\n{}\n
					 Last message was sent from:\n{}",
					self.name,
					self.queue_size_warning,
					self.creation_backtrace,
					Backtrace::force_capture(),
				);
			}
			s
		})
	}
}
impl<T> TracingUnboundedReceiver<T> {
	pub fn close(&mut self) -> bool {
		self.inner.close()
	}
	pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
		self.inner.try_recv().map(|s| {
			UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc();
			s
		})
	}
}
impl<T> Drop for TracingUnboundedReceiver<T> {
	fn drop(&mut self) {
		self.close();
		let count = self.inner.len();
		if count > 0 {
			UNBOUNDED_CHANNELS_COUNTER
				.with_label_values(&[self.name, "dropped"])
				.inc_by(count.saturated_into());
		}
		while let Ok(_) = self.inner.try_recv() {}
	}
}
impl<T> Unpin for TracingUnboundedReceiver<T> {}
impl<T> Stream for TracingUnboundedReceiver<T> {
	type Item = T;
	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
		let s = self.get_mut();
		match Pin::new(&mut s.inner).poll_next(cx) {
			Poll::Ready(msg) => {
				if msg.is_some() {
					UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, "received"]).inc();
				}
				Poll::Ready(msg)
			},
			Poll::Pending => Poll::Pending,
		}
	}
}
impl<T> FusedStream for TracingUnboundedReceiver<T> {
	fn is_terminated(&self) -> bool {
		self.inner.is_terminated()
	}
}
#[cfg(test)]
mod tests {
	use super::tracing_unbounded;
	use async_channel::{self, RecvError, TryRecvError};
	#[test]
	fn test_tracing_unbounded_receiver_drop() {
		let (tracing_unbounded_sender, tracing_unbounded_receiver) =
			tracing_unbounded("test-receiver-drop", 10);
		let (tx, rx) = async_channel::unbounded::<usize>();
		tracing_unbounded_sender.unbounded_send(tx).unwrap();
		drop(tracing_unbounded_receiver);
		assert_eq!(rx.try_recv(), Err(TryRecvError::Closed));
		assert_eq!(rx.recv_blocking(), Err(RecvError));
	}
}