referrerpolicy=no-referrer-when-downgrade

relay_substrate_client/client/
subscription.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2// This file is part of Parity Bridges Common.
3
4// Parity Bridges Common is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity Bridges Common is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::error::Result as ClientResult;
18
19use async_std::{
20	channel::{bounded, Receiver, Sender},
21	stream::StreamExt,
22};
23use futures::{FutureExt, Stream};
24use sp_runtime::DeserializeOwned;
25use std::{
26	fmt::Debug,
27	pin::Pin,
28	result::Result as StdResult,
29	task::{Context, Poll},
30};
31
32/// Once channel reaches this capacity, the subscription breaks.
33const CHANNEL_CAPACITY: usize = 128;
34
35/// Structure describing a stream.
36#[derive(Clone)]
37pub struct StreamDescription {
38	stream_name: String,
39	chain_name: String,
40}
41
42impl StreamDescription {
43	/// Create a new instance of `StreamDescription`.
44	pub fn new(stream_name: String, chain_name: String) -> Self {
45		Self { stream_name, chain_name }
46	}
47
48	/// Get a stream description.
49	fn get(&self) -> String {
50		format!("{} stream of {}", self.stream_name, self.chain_name)
51	}
52}
53
54/// Chainable stream that transforms items of type `Result<T, E>` to items of type `T`.
55///
56/// If it encounters an item of type `Err`, it returns `Poll::Ready(None)`
57/// and terminates the underlying stream.
58struct Unwrap<S: Stream<Item = StdResult<T, E>>, T, E> {
59	desc: StreamDescription,
60	stream: Option<S>,
61}
62
63impl<S: Stream<Item = StdResult<T, E>>, T, E> Unwrap<S, T, E> {
64	/// Create a new instance of `Unwrap`.
65	pub fn new(desc: StreamDescription, stream: S) -> Self {
66		Self { desc, stream: Some(stream) }
67	}
68}
69
70impl<S: Stream<Item = StdResult<T, E>> + Unpin, T: DeserializeOwned, E: Debug> Stream
71	for Unwrap<S, T, E>
72{
73	type Item = T;
74
75	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76		Poll::Ready(match self.stream.as_mut() {
77			Some(subscription) => match futures::ready!(Pin::new(subscription).poll_next(cx)) {
78				Some(Ok(item)) => Some(item),
79				Some(Err(e)) => {
80					self.stream.take();
81					log::debug!(
82						target: "bridge",
83						"{} has returned error: {:?}. It may need to be restarted",
84						self.desc.get(),
85						e,
86					);
87					None
88				},
89				None => {
90					self.stream.take();
91					log::debug!(
92						target: "bridge",
93						"{} has returned `None`. It may need to be restarted",
94						self.desc.get()
95					);
96					None
97				},
98			},
99			None => None,
100		})
101	}
102}
103
104/// Subscription factory that produces subscriptions, sharing the same background thread.
105#[derive(Clone)]
106pub struct SubscriptionBroadcaster<T> {
107	desc: StreamDescription,
108	subscribers_sender: Sender<Sender<T>>,
109}
110
111impl<T: 'static + Clone + DeserializeOwned + Send> SubscriptionBroadcaster<T> {
112	/// Create new subscription factory.
113	pub fn new(subscription: Subscription<T>) -> StdResult<Self, Subscription<T>> {
114		// It doesn't make sense to further broadcast a broadcasted subscription.
115		if subscription.is_broadcasted {
116			return Err(subscription)
117		}
118
119		let desc = subscription.desc().clone();
120		let (subscribers_sender, subscribers_receiver) = bounded(CHANNEL_CAPACITY);
121		async_std::task::spawn(background_worker(subscription, subscribers_receiver));
122		Ok(Self { desc, subscribers_sender })
123	}
124
125	/// Produce new subscription.
126	pub async fn subscribe(&self) -> ClientResult<Subscription<T>> {
127		let (items_sender, items_receiver) = bounded(CHANNEL_CAPACITY);
128		self.subscribers_sender.try_send(items_sender)?;
129
130		Ok(Subscription::new_broadcasted(self.desc.clone(), items_receiver))
131	}
132}
133
134/// Subscription to some chain events.
135pub struct Subscription<T> {
136	desc: StreamDescription,
137	subscription: Box<dyn Stream<Item = T> + Unpin + Send>,
138	is_broadcasted: bool,
139}
140
141impl<T: 'static + Clone + DeserializeOwned + Send> Subscription<T> {
142	/// Create new forwarded subscription.
143	pub fn new_forwarded(
144		desc: StreamDescription,
145		subscription: impl Stream<Item = StdResult<T, serde_json::Error>> + Unpin + Send + 'static,
146	) -> Self {
147		Self {
148			desc: desc.clone(),
149			subscription: Box::new(Unwrap::new(desc, subscription)),
150			is_broadcasted: false,
151		}
152	}
153
154	/// Create new broadcasted subscription.
155	pub fn new_broadcasted(
156		desc: StreamDescription,
157		subscription: impl Stream<Item = T> + Unpin + Send + 'static,
158	) -> Self {
159		Self { desc, subscription: Box::new(subscription), is_broadcasted: true }
160	}
161
162	/// Get the description of the underlying stream
163	pub fn desc(&self) -> &StreamDescription {
164		&self.desc
165	}
166}
167
168impl<T> Stream for Subscription<T> {
169	type Item = T;
170
171	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172		Poll::Ready(futures::ready!(Pin::new(&mut self.subscription).poll_next(cx)))
173	}
174}
175
176/// Background worker that is executed in tokio context as `jsonrpsee` requires.
177///
178/// This task may exit under some circumstances. It'll send the correspondent
179/// message (`Err` or `None`) to all known listeners. Also, when it stops, all
180/// subsequent reads and new subscribers will get the connection error (`ChannelError`).
181async fn background_worker<T: 'static + Clone + DeserializeOwned + Send>(
182	mut subscription: Subscription<T>,
183	mut subscribers_receiver: Receiver<Sender<T>>,
184) {
185	fn log_task_exit(desc: &StreamDescription, reason: &str) {
186		log::debug!(
187			target: "bridge",
188			"Background task of subscription broadcaster for {} has stopped: {}",
189			desc.get(),
190			reason,
191		);
192	}
193
194	// wait for first subscriber until actually starting subscription
195	let subscriber = match subscribers_receiver.next().await {
196		Some(subscriber) => subscriber,
197		None => {
198			// it means that the last subscriber/factory has been dropped, so we need to
199			// exit too
200			return log_task_exit(subscription.desc(), "client has stopped")
201		},
202	};
203
204	// actually subscribe
205	let mut subscribers = vec![subscriber];
206
207	// start listening for new items and receivers
208	loop {
209		futures::select! {
210			subscriber = subscribers_receiver.next().fuse() => {
211				match subscriber {
212					Some(subscriber) => subscribers.push(subscriber),
213					None => {
214						// it means that the last subscriber/factory has been dropped, so we need to
215						// exit too
216						return log_task_exit(subscription.desc(), "client has stopped")
217					},
218				}
219			},
220			maybe_item = subscription.subscription.next().fuse() => {
221				match maybe_item {
222					Some(item) => {
223						// notify subscribers
224						subscribers.retain(|subscriber| {
225							let send_result = subscriber.try_send(item.clone());
226							send_result.is_ok()
227						});
228					}
229					None => {
230						// The underlying client has dropped, so we can't do anything here
231						// and need to stop the task.
232						return log_task_exit(subscription.desc(), "stream has finished");
233					}
234				}
235			},
236		}
237	}
238}