// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::error::Result as ClientResult;
use async_std::{
channel::{bounded, Receiver, Sender},
use futures::{FutureExt, Stream};
use sp_runtime::DeserializeOwned;
use std::{
result::Result as StdResult,
task::{Context, Poll},
/// Once channel reaches this capacity, the subscription breaks.
const CHANNEL_CAPACITY: usize = 128;
/// Structure describing a stream.
pub struct StreamDescription {
stream_name: String,
chain_name: String,
impl StreamDescription {
/// Create a new instance of `StreamDescription`.
pub fn new(stream_name: String, chain_name: String) -> Self {
Self { stream_name, chain_name }
/// Get a stream description.
fn get(&self) -> String {
format!("{} stream of {}", self.stream_name, self.chain_name)
/// Chainable stream that transforms items of type `Result<T, E>` to items of type `T`.
/// If it encounters an item of type `Err`, it returns `Poll::Ready(None)`
/// and terminates the underlying stream.
struct Unwrap<S: Stream<Item = StdResult<T, E>>, T, E> {
desc: StreamDescription,
stream: Option<S>,
impl<S: Stream<Item = StdResult<T, E>>, T, E> Unwrap<S, T, E> {
/// Create a new instance of `Unwrap`.
pub fn new(desc: StreamDescription, stream: S) -> Self {
Self { desc, stream: Some(stream) }
impl<S: Stream<Item = StdResult<T, E>> + Unpin, T: DeserializeOwned, E: Debug> Stream
for Unwrap<S, T, E>
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(match self.stream.as_mut() {
Some(subscription) => match futures::ready!(Pin::new(subscription).poll_next(cx)) {
Some(Ok(item)) => Some(item),
Some(Err(e)) => {
target: "bridge",
"{} has returned error: {:?}. It may need to be restarted",
None => {
target: "bridge",
"{} has returned `None`. It may need to be restarted",
None => None,
/// Subscription factory that produces subscriptions, sharing the same background thread.
pub struct SubscriptionBroadcaster<T> {
desc: StreamDescription,
subscribers_sender: Sender<Sender<T>>,
impl<T: 'static + Clone + DeserializeOwned + Send> SubscriptionBroadcaster<T> {
/// Create new subscription factory.
pub fn new(subscription: Subscription<T>) -> StdResult<Self, Subscription<T>> {
// It doesn't make sense to further broadcast a broadcasted subscription.
if subscription.is_broadcasted {
return Err(subscription)
let desc = subscription.desc().clone();
let (subscribers_sender, subscribers_receiver) = bounded(CHANNEL_CAPACITY);
async_std::task::spawn(background_worker(subscription, subscribers_receiver));
Ok(Self { desc, subscribers_sender })
/// Produce new subscription.
pub async fn subscribe(&self) -> ClientResult<Subscription<T>> {
let (items_sender, items_receiver) = bounded(CHANNEL_CAPACITY);
Ok(Subscription::new_broadcasted(self.desc.clone(), items_receiver))
/// Subscription to some chain events.
pub struct Subscription<T> {
desc: StreamDescription,
subscription: Box<dyn Stream<Item = T> + Unpin + Send>,
is_broadcasted: bool,
impl<T: 'static + Clone + DeserializeOwned + Send> Subscription<T> {
/// Create new forwarded subscription.
pub fn new_forwarded(
desc: StreamDescription,
subscription: impl Stream<Item = StdResult<T, serde_json::Error>> + Unpin + Send + 'static,
) -> Self {
Self {
desc: desc.clone(),
subscription: Box::new(Unwrap::new(desc, subscription)),
is_broadcasted: false,
/// Create new broadcasted subscription.
pub fn new_broadcasted(
desc: StreamDescription,
subscription: impl Stream<Item = T> + Unpin + Send + 'static,
) -> Self {
Self { desc, subscription: Box::new(subscription), is_broadcasted: true }
/// Get the description of the underlying stream
pub fn desc(&self) -> &StreamDescription {
impl<T> Stream for Subscription<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(futures::ready!(Pin::new(&mut self.subscription).poll_next(cx)))
/// Background worker that is executed in tokio context as `jsonrpsee` requires.
/// This task may exit under some circumstances. It'll send the correspondent
/// message (`Err` or `None`) to all known listeners. Also, when it stops, all
/// subsequent reads and new subscribers will get the connection error (`ChannelError`).
async fn background_worker<T: 'static + Clone + DeserializeOwned + Send>(
mut subscription: Subscription<T>,
mut subscribers_receiver: Receiver<Sender<T>>,
) {
fn log_task_exit(desc: &StreamDescription, reason: &str) {
target: "bridge",
"Background task of subscription broadcaster for {} has stopped: {}",
// wait for first subscriber until actually starting subscription
let subscriber = match subscribers_receiver.next().await {
Some(subscriber) => subscriber,
None => {
// it means that the last subscriber/factory has been dropped, so we need to
// exit too
return log_task_exit(subscription.desc(), "client has stopped")
// actually subscribe
let mut subscribers = vec![subscriber];
// start listening for new items and receivers
loop {
futures::select! {
subscriber = subscribers_receiver.next().fuse() => {
match subscriber {
Some(subscriber) => subscribers.push(subscriber),
None => {
// it means that the last subscriber/factory has been dropped, so we need to
// exit too
return log_task_exit(subscription.desc(), "client has stopped")
maybe_item = subscription.subscription.next().fuse() => {
match maybe_item {
Some(item) => {
// notify subscribers
subscribers.retain(|subscriber| {
let send_result = subscriber.try_send(item.clone());
None => {
// The underlying client has dropped, so we can't do anything here
// and need to stop the task.
return log_task_exit(subscription.desc(), "stream has finished");