1use crate::SubscriptionTaskExecutor;
22use futures::{
23 future::{self, Either, Fuse, FusedFuture},
24 Future, FutureExt, Stream, StreamExt, TryStream, TryStreamExt,
25};
26use jsonrpsee::{
27 types::SubscriptionId, DisconnectError, PendingSubscriptionSink, SubscriptionMessage,
28 SubscriptionSink,
29};
30use sp_runtime::Serialize;
31use std::collections::VecDeque;
32
33const DEFAULT_BUF_SIZE: usize = 16;
34
35pub trait Buffer {
38 type Item;
40
41 fn push(&mut self, item: Self::Item) -> Result<(), ()>;
45 fn pop(&mut self) -> Option<Self::Item>;
47}
48
49pub struct BoundedVecDeque<T> {
51 inner: VecDeque<T>,
52 max_cap: usize,
53}
54
55impl<T> Default for BoundedVecDeque<T> {
56 fn default() -> Self {
57 Self { inner: VecDeque::with_capacity(DEFAULT_BUF_SIZE), max_cap: DEFAULT_BUF_SIZE }
58 }
59}
60
61impl<T> BoundedVecDeque<T> {
62 pub fn new(cap: usize) -> Self {
64 Self { inner: VecDeque::with_capacity(cap), max_cap: cap }
65 }
66}
67
68impl<T> Buffer for BoundedVecDeque<T> {
69 type Item = T;
70
71 fn push(&mut self, item: Self::Item) -> Result<(), ()> {
72 if self.inner.len() >= self.max_cap {
73 Err(())
74 } else {
75 self.inner.push_back(item);
76 Ok(())
77 }
78 }
79
80 fn pop(&mut self) -> Option<T> {
81 self.inner.pop_front()
82 }
83}
84
85#[derive(Debug)]
87pub struct RingBuffer<T> {
88 inner: VecDeque<T>,
89 cap: usize,
90}
91
92impl<T> RingBuffer<T> {
93 pub fn new(cap: usize) -> Self {
95 Self { inner: VecDeque::with_capacity(cap), cap }
96 }
97}
98
99impl<T> Buffer for RingBuffer<T> {
100 type Item = T;
101
102 fn push(&mut self, item: T) -> Result<(), ()> {
103 if self.inner.len() >= self.cap {
104 self.inner.pop_front();
105 }
106
107 self.inner.push_back(item);
108
109 Ok(())
110 }
111
112 fn pop(&mut self) -> Option<T> {
113 self.inner.pop_front()
114 }
115}
116
117pub struct PendingSubscription(PendingSubscriptionSink);
119
120impl From<PendingSubscriptionSink> for PendingSubscription {
121 fn from(p: PendingSubscriptionSink) -> Self {
122 Self(p)
123 }
124}
125
126impl PendingSubscription {
127 pub async fn pipe_from_stream<S, T, B>(self, mut stream: S, mut buf: B)
130 where
131 S: Stream<Item = T> + Unpin + Send + 'static,
132 T: Serialize + Send + 'static,
133 B: Buffer<Item = T>,
134 {
135 let method = self.0.method_name().to_string();
136 let conn_id = self.0.connection_id().0;
137 let accept_fut = self.0.accept();
138
139 futures::pin_mut!(accept_fut);
140
141 let sink = loop {
145 match future::select(accept_fut, stream.next()).await {
146 Either::Left((Ok(sink), _)) => break sink,
147 Either::Right((Some(msg), f)) => {
148 if buf.push(msg).is_err() {
149 log::debug!(target: "rpc", "Subscription::accept buffer full for subscription={method} conn_id={conn_id}; dropping subscription");
150 return
151 }
152 accept_fut = f;
153 },
154 _ => return,
156 }
157 };
158
159 Subscription(sink).pipe_from_stream(stream, buf).await
160 }
161}
162
163#[derive(Clone, Debug)]
165pub struct Subscription(SubscriptionSink);
166
167impl From<SubscriptionSink> for Subscription {
168 fn from(sink: SubscriptionSink) -> Self {
169 Self(sink)
170 }
171}
172
173impl Subscription {
174 pub async fn pipe_from_stream<S, T, B>(&self, stream: S, buf: B)
177 where
178 S: Stream<Item = T> + Unpin,
179 T: Serialize + Send,
180 B: Buffer<Item = T>,
181 {
182 self.pipe_from_try_stream(stream.map(Ok::<T, ()>), buf)
183 .await
184 .expect("No Err will be ever encountered.qed");
185 }
186
187 pub async fn pipe_from_try_stream<S, T, B, E>(&self, mut stream: S, mut buf: B) -> Result<(), E>
190 where
191 S: TryStream<Ok = T, Error = E> + Unpin,
192 T: Serialize + Send,
193 B: Buffer<Item = T>,
194 {
195 let mut next_fut = Box::pin(Fuse::terminated());
196 let mut next_item = stream.try_next();
197 let closed = self.0.closed();
198
199 futures::pin_mut!(closed);
200
201 loop {
202 if next_fut.is_terminated() {
203 if let Some(v) = buf.pop() {
204 let val = self.to_sub_message(&v);
205 next_fut.set(async { self.0.send(val).await }.fuse());
206 }
207 }
208
209 match future::select(closed, future::select(next_fut, next_item)).await {
210 Either::Right((Either::Left((_, n)), c)) => {
212 next_item = n;
213 closed = c;
214 next_fut = Box::pin(Fuse::terminated());
215 },
216 Either::Right((Either::Right((Ok(Some(v)), n)), c)) => {
218 if buf.push(v).is_err() {
219 log::debug!(
220 target: "rpc",
221 "Subscription buffer full for subscription={} conn_id={}; dropping subscription",
222 self.0.method_name(),
223 self.0.connection_id().0
224 );
225 return Ok(());
226 }
227
228 next_fut = n;
229 closed = c;
230 next_item = stream.try_next();
231 },
232 Either::Right((Either::Right((Err(e), _)), _)) => return Err(e),
236 Either::Right((Either::Right((Ok(None), pending_fut)), _)) => {
240 if !pending_fut.is_terminated() && pending_fut.await.is_err() {
241 return Ok(());
242 }
243
244 while let Some(v) = buf.pop() {
245 if self.send(&v).await.is_err() {
246 return Ok(());
247 }
248 }
249
250 return Ok(());
251 },
252 Either::Left(_) => return Ok(()),
254 }
255 }
256 }
257
258 pub async fn send(&self, result: &impl Serialize) -> Result<(), DisconnectError> {
260 self.0.send(self.to_sub_message(result)).await
261 }
262
263 pub fn subscription_id(&self) -> SubscriptionId {
265 self.0.subscription_id()
266 }
267
268 pub async fn closed(&self) {
270 self.0.closed().await
271 }
272
273 fn to_sub_message(&self, result: &impl Serialize) -> SubscriptionMessage {
275 SubscriptionMessage::new(self.0.method_name(), self.0.subscription_id(), result)
276 .expect("Serialize infallible; qed")
277 }
278}
279
280pub fn spawn_subscription_task(
282 executor: &SubscriptionTaskExecutor,
283 fut: impl Future<Output = ()> + Send + 'static,
284) {
285 executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291 use futures::StreamExt;
292 use jsonrpsee::{core::EmptyServerParams, RpcModule, Subscription};
293
294 async fn subscribe() -> Subscription {
295 let mut module = RpcModule::new(());
296 module
297 .register_subscription("sub", "my_sub", "unsub", |_, pending, _, _| async move {
298 let stream = futures::stream::iter([0; 16]);
299 PendingSubscription::from(pending)
300 .pipe_from_stream(stream, BoundedVecDeque::new(16))
301 .await;
302 Ok(())
303 })
304 .unwrap();
305
306 module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap()
307 }
308
309 #[tokio::test]
310 async fn pipe_from_stream_works() {
311 let mut sub = subscribe().await;
312 let mut rx = 0;
313
314 while let Some(Ok(_)) = sub.next::<usize>().await {
315 rx += 1;
316 }
317
318 assert_eq!(rx, 16);
319 }
320
321 #[tokio::test]
322 async fn pipe_from_stream_with_bounded_vec() {
323 let (tx, mut rx) = futures::channel::mpsc::unbounded::<()>();
324
325 let mut module = RpcModule::new(tx);
326 module
327 .register_subscription("sub", "my_sub", "unsub", |_, pending, ctx, _| async move {
328 let stream = futures::stream::iter([0; 32]);
329 PendingSubscription::from(pending)
330 .pipe_from_stream(stream, BoundedVecDeque::new(16))
331 .await;
332 _ = ctx.unbounded_send(());
333 Ok(())
334 })
335 .unwrap();
336
337 let mut sub = module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
338
339 _ = rx.next().await.unwrap();
341 assert!(sub.next::<usize>().await.is_none());
342 }
343
344 #[tokio::test]
345 async fn subscription_is_dropped_when_stream_is_empty() {
346 let notify_rx = std::sync::Arc::new(tokio::sync::Notify::new());
347 let notify_tx = notify_rx.clone();
348
349 let mut module = RpcModule::new(notify_tx);
350 module
351 .register_subscription(
352 "sub",
353 "my_sub",
354 "unsub",
355 |_, pending, notify_tx, _| async move {
356 let stream = futures::stream::empty::<()>();
359 PendingSubscription::from(pending)
361 .pipe_from_stream(stream, BoundedVecDeque::default())
362 .await;
363 notify_tx.notify_one();
365 Ok(())
366 },
367 )
368 .unwrap();
369 module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
370
371 notify_rx.notified().await;
373 }
374
375 #[tokio::test]
376 async fn subscription_replace_old_messages() {
377 let mut module = RpcModule::new(());
378 module
379 .register_subscription("sub", "my_sub", "unsub", |_, pending, _, _| async move {
380 let stream = futures::stream::iter(0..20);
382 PendingSubscription::from(pending)
383 .pipe_from_stream(stream, RingBuffer::new(3))
384 .await;
385 Ok(())
386 })
387 .unwrap();
388
389 let mut sub = module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
390
391 tokio::time::sleep(std::time::Duration::from_secs(10)).await;
394
395 let mut res = Vec::new();
396
397 while let Some(Ok((v, _))) = sub.next::<usize>().await {
398 res.push(v);
399 }
400
401 assert_eq!(res, vec![0, 17, 18, 19]);
404 }
405}