use crate::error::Result as ClientResult;
use async_std::{
channel::{bounded, Receiver, Sender},
stream::StreamExt,
};
use futures::{FutureExt, Stream};
use sp_runtime::DeserializeOwned;
use std::{
fmt::Debug,
pin::Pin,
result::Result as StdResult,
task::{Context, Poll},
};
const CHANNEL_CAPACITY: usize = 128;
#[derive(Clone)]
pub struct StreamDescription {
stream_name: String,
chain_name: String,
}
impl StreamDescription {
pub fn new(stream_name: String, chain_name: String) -> Self {
Self { stream_name, chain_name }
}
fn get(&self) -> String {
format!("{} stream of {}", self.stream_name, self.chain_name)
}
}
struct Unwrap<S: Stream<Item = StdResult<T, E>>, T, E> {
desc: StreamDescription,
stream: Option<S>,
}
impl<S: Stream<Item = StdResult<T, E>>, T, E> Unwrap<S, T, E> {
pub fn new(desc: StreamDescription, stream: S) -> Self {
Self { desc, stream: Some(stream) }
}
}
impl<S: Stream<Item = StdResult<T, E>> + Unpin, T: DeserializeOwned, E: Debug> Stream
for Unwrap<S, T, E>
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(match self.stream.as_mut() {
Some(subscription) => match futures::ready!(Pin::new(subscription).poll_next(cx)) {
Some(Ok(item)) => Some(item),
Some(Err(e)) => {
self.stream.take();
log::debug!(
target: "bridge",
"{} has returned error: {:?}. It may need to be restarted",
self.desc.get(),
e,
);
None
},
None => {
self.stream.take();
log::debug!(
target: "bridge",
"{} has returned `None`. It may need to be restarted",
self.desc.get()
);
None
},
},
None => None,
})
}
}
#[derive(Clone)]
pub struct SubscriptionBroadcaster<T> {
desc: StreamDescription,
subscribers_sender: Sender<Sender<T>>,
}
impl<T: 'static + Clone + DeserializeOwned + Send> SubscriptionBroadcaster<T> {
pub fn new(subscription: Subscription<T>) -> StdResult<Self, Subscription<T>> {
if subscription.is_broadcasted {
return Err(subscription)
}
let desc = subscription.desc().clone();
let (subscribers_sender, subscribers_receiver) = bounded(CHANNEL_CAPACITY);
async_std::task::spawn(background_worker(subscription, subscribers_receiver));
Ok(Self { desc, subscribers_sender })
}
pub async fn subscribe(&self) -> ClientResult<Subscription<T>> {
let (items_sender, items_receiver) = bounded(CHANNEL_CAPACITY);
self.subscribers_sender.try_send(items_sender)?;
Ok(Subscription::new_broadcasted(self.desc.clone(), items_receiver))
}
}
pub struct Subscription<T> {
desc: StreamDescription,
subscription: Box<dyn Stream<Item = T> + Unpin + Send>,
is_broadcasted: bool,
}
impl<T: 'static + Clone + DeserializeOwned + Send> Subscription<T> {
pub fn new_forwarded(
desc: StreamDescription,
subscription: impl Stream<Item = StdResult<T, serde_json::Error>> + Unpin + Send + 'static,
) -> Self {
Self {
desc: desc.clone(),
subscription: Box::new(Unwrap::new(desc, subscription)),
is_broadcasted: false,
}
}
pub fn new_broadcasted(
desc: StreamDescription,
subscription: impl Stream<Item = T> + Unpin + Send + 'static,
) -> Self {
Self { desc, subscription: Box::new(subscription), is_broadcasted: true }
}
pub fn desc(&self) -> &StreamDescription {
&self.desc
}
}
impl<T> Stream for Subscription<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(futures::ready!(Pin::new(&mut self.subscription).poll_next(cx)))
}
}
async fn background_worker<T: 'static + Clone + DeserializeOwned + Send>(
mut subscription: Subscription<T>,
mut subscribers_receiver: Receiver<Sender<T>>,
) {
fn log_task_exit(desc: &StreamDescription, reason: &str) {
log::debug!(
target: "bridge",
"Background task of subscription broadcaster for {} has stopped: {}",
desc.get(),
reason,
);
}
let subscriber = match subscribers_receiver.next().await {
Some(subscriber) => subscriber,
None => {
return log_task_exit(subscription.desc(), "client has stopped")
},
};
let mut subscribers = vec![subscriber];
loop {
futures::select! {
subscriber = subscribers_receiver.next().fuse() => {
match subscriber {
Some(subscriber) => subscribers.push(subscriber),
None => {
return log_task_exit(subscription.desc(), "client has stopped")
},
}
},
maybe_item = subscription.subscription.next().fuse() => {
match maybe_item {
Some(item) => {
subscribers.retain(|subscriber| {
let send_result = subscriber.try_send(item.clone());
send_result.is_ok()
});
}
None => {
return log_task_exit(subscription.desc(), "stream has finished");
}
}
},
}
}
}