relay_substrate_client/client/
subscription.rs1use crate::error::Result as ClientResult;
18
19use async_std::{
20 channel::{bounded, Receiver, Sender},
21 stream::StreamExt,
22};
23use futures::{FutureExt, Stream};
24use sp_runtime::DeserializeOwned;
25use std::{
26 fmt::Debug,
27 pin::Pin,
28 result::Result as StdResult,
29 task::{Context, Poll},
30};
31
32const CHANNEL_CAPACITY: usize = 128;
34
35#[derive(Clone)]
37pub struct StreamDescription {
38 stream_name: String,
39 chain_name: String,
40}
41
42impl StreamDescription {
43 pub fn new(stream_name: String, chain_name: String) -> Self {
45 Self { stream_name, chain_name }
46 }
47
48 fn get(&self) -> String {
50 format!("{} stream of {}", self.stream_name, self.chain_name)
51 }
52}
53
54struct Unwrap<S: Stream<Item = StdResult<T, E>>, T, E> {
59 desc: StreamDescription,
60 stream: Option<S>,
61}
62
63impl<S: Stream<Item = StdResult<T, E>>, T, E> Unwrap<S, T, E> {
64 pub fn new(desc: StreamDescription, stream: S) -> Self {
66 Self { desc, stream: Some(stream) }
67 }
68}
69
70impl<S: Stream<Item = StdResult<T, E>> + Unpin, T: DeserializeOwned, E: Debug> Stream
71 for Unwrap<S, T, E>
72{
73 type Item = T;
74
75 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76 Poll::Ready(match self.stream.as_mut() {
77 Some(subscription) => match futures::ready!(Pin::new(subscription).poll_next(cx)) {
78 Some(Ok(item)) => Some(item),
79 Some(Err(e)) => {
80 self.stream.take();
81 log::debug!(
82 target: "bridge",
83 "{} has returned error: {:?}. It may need to be restarted",
84 self.desc.get(),
85 e,
86 );
87 None
88 },
89 None => {
90 self.stream.take();
91 log::debug!(
92 target: "bridge",
93 "{} has returned `None`. It may need to be restarted",
94 self.desc.get()
95 );
96 None
97 },
98 },
99 None => None,
100 })
101 }
102}
103
104#[derive(Clone)]
106pub struct SubscriptionBroadcaster<T> {
107 desc: StreamDescription,
108 subscribers_sender: Sender<Sender<T>>,
109}
110
111impl<T: 'static + Clone + DeserializeOwned + Send> SubscriptionBroadcaster<T> {
112 pub fn new(subscription: Subscription<T>) -> StdResult<Self, Subscription<T>> {
114 if subscription.is_broadcasted {
116 return Err(subscription)
117 }
118
119 let desc = subscription.desc().clone();
120 let (subscribers_sender, subscribers_receiver) = bounded(CHANNEL_CAPACITY);
121 async_std::task::spawn(background_worker(subscription, subscribers_receiver));
122 Ok(Self { desc, subscribers_sender })
123 }
124
125 pub async fn subscribe(&self) -> ClientResult<Subscription<T>> {
127 let (items_sender, items_receiver) = bounded(CHANNEL_CAPACITY);
128 self.subscribers_sender.try_send(items_sender)?;
129
130 Ok(Subscription::new_broadcasted(self.desc.clone(), items_receiver))
131 }
132}
133
134pub struct Subscription<T> {
136 desc: StreamDescription,
137 subscription: Box<dyn Stream<Item = T> + Unpin + Send>,
138 is_broadcasted: bool,
139}
140
141impl<T: 'static + Clone + DeserializeOwned + Send> Subscription<T> {
142 pub fn new_forwarded(
144 desc: StreamDescription,
145 subscription: impl Stream<Item = StdResult<T, serde_json::Error>> + Unpin + Send + 'static,
146 ) -> Self {
147 Self {
148 desc: desc.clone(),
149 subscription: Box::new(Unwrap::new(desc, subscription)),
150 is_broadcasted: false,
151 }
152 }
153
154 pub fn new_broadcasted(
156 desc: StreamDescription,
157 subscription: impl Stream<Item = T> + Unpin + Send + 'static,
158 ) -> Self {
159 Self { desc, subscription: Box::new(subscription), is_broadcasted: true }
160 }
161
162 pub fn desc(&self) -> &StreamDescription {
164 &self.desc
165 }
166}
167
168impl<T> Stream for Subscription<T> {
169 type Item = T;
170
171 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172 Poll::Ready(futures::ready!(Pin::new(&mut self.subscription).poll_next(cx)))
173 }
174}
175
176async fn background_worker<T: 'static + Clone + DeserializeOwned + Send>(
182 mut subscription: Subscription<T>,
183 mut subscribers_receiver: Receiver<Sender<T>>,
184) {
185 fn log_task_exit(desc: &StreamDescription, reason: &str) {
186 log::debug!(
187 target: "bridge",
188 "Background task of subscription broadcaster for {} has stopped: {}",
189 desc.get(),
190 reason,
191 );
192 }
193
194 let subscriber = match subscribers_receiver.next().await {
196 Some(subscriber) => subscriber,
197 None => {
198 return log_task_exit(subscription.desc(), "client has stopped")
201 },
202 };
203
204 let mut subscribers = vec![subscriber];
206
207 loop {
209 futures::select! {
210 subscriber = subscribers_receiver.next().fuse() => {
211 match subscriber {
212 Some(subscriber) => subscribers.push(subscriber),
213 None => {
214 return log_task_exit(subscription.desc(), "client has stopped")
217 },
218 }
219 },
220 maybe_item = subscription.subscription.next().fuse() => {
221 match maybe_item {
222 Some(item) => {
223 subscribers.retain(|subscriber| {
225 let send_result = subscriber.try_send(item.clone());
226 send_result.is_ok()
227 });
228 }
229 None => {
230 return log_task_exit(subscription.desc(), "stream has finished");
233 }
234 }
235 },
236 }
237 }
238}