1use crate::SubscriptionTaskExecutor;
22use futures::{
23	future::{self, Either, Fuse, FusedFuture},
24	Future, FutureExt, Stream, StreamExt, TryStream, TryStreamExt,
25};
26use jsonrpsee::{
27	types::SubscriptionId, DisconnectError, PendingSubscriptionSink, SubscriptionMessage,
28	SubscriptionSink,
29};
30use sp_runtime::Serialize;
31use std::collections::VecDeque;
32
33const DEFAULT_BUF_SIZE: usize = 16;
34
35pub trait Buffer {
38	type Item;
40
41	fn push(&mut self, item: Self::Item) -> Result<(), ()>;
45	fn pop(&mut self) -> Option<Self::Item>;
47}
48
49pub struct BoundedVecDeque<T> {
51	inner: VecDeque<T>,
52	max_cap: usize,
53}
54
55impl<T> Default for BoundedVecDeque<T> {
56	fn default() -> Self {
57		Self { inner: VecDeque::with_capacity(DEFAULT_BUF_SIZE), max_cap: DEFAULT_BUF_SIZE }
58	}
59}
60
61impl<T> BoundedVecDeque<T> {
62	pub fn new(cap: usize) -> Self {
64		Self { inner: VecDeque::with_capacity(cap), max_cap: cap }
65	}
66}
67
68impl<T> Buffer for BoundedVecDeque<T> {
69	type Item = T;
70
71	fn push(&mut self, item: Self::Item) -> Result<(), ()> {
72		if self.inner.len() >= self.max_cap {
73			Err(())
74		} else {
75			self.inner.push_back(item);
76			Ok(())
77		}
78	}
79
80	fn pop(&mut self) -> Option<T> {
81		self.inner.pop_front()
82	}
83}
84
85#[derive(Debug)]
87pub struct RingBuffer<T> {
88	inner: VecDeque<T>,
89	cap: usize,
90}
91
92impl<T> RingBuffer<T> {
93	pub fn new(cap: usize) -> Self {
95		Self { inner: VecDeque::with_capacity(cap), cap }
96	}
97}
98
99impl<T> Buffer for RingBuffer<T> {
100	type Item = T;
101
102	fn push(&mut self, item: T) -> Result<(), ()> {
103		if self.inner.len() >= self.cap {
104			self.inner.pop_front();
105		}
106
107		self.inner.push_back(item);
108
109		Ok(())
110	}
111
112	fn pop(&mut self) -> Option<T> {
113		self.inner.pop_front()
114	}
115}
116
117pub struct PendingSubscription(PendingSubscriptionSink);
119
120impl From<PendingSubscriptionSink> for PendingSubscription {
121	fn from(p: PendingSubscriptionSink) -> Self {
122		Self(p)
123	}
124}
125
126impl PendingSubscription {
127	pub async fn pipe_from_stream<S, T, B>(self, mut stream: S, mut buf: B)
130	where
131		S: Stream<Item = T> + Unpin + Send + 'static,
132		T: Serialize + Send + 'static,
133		B: Buffer<Item = T>,
134	{
135		let method = self.0.method_name().to_string();
136		let conn_id = self.0.connection_id().0;
137		let accept_fut = self.0.accept();
138
139		futures::pin_mut!(accept_fut);
140
141		let sink = loop {
145			match future::select(accept_fut, stream.next()).await {
146				Either::Left((Ok(sink), _)) => break sink,
147				Either::Right((Some(msg), f)) => {
148					if buf.push(msg).is_err() {
149						log::debug!(target: "rpc", "Subscription::accept buffer full for subscription={method} conn_id={conn_id}; dropping subscription");
150						return
151					}
152					accept_fut = f;
153				},
154				_ => return,
156			}
157		};
158
159		Subscription(sink).pipe_from_stream(stream, buf).await
160	}
161}
162
163#[derive(Clone, Debug)]
165pub struct Subscription(SubscriptionSink);
166
167impl From<SubscriptionSink> for Subscription {
168	fn from(sink: SubscriptionSink) -> Self {
169		Self(sink)
170	}
171}
172
173impl Subscription {
174	pub async fn pipe_from_stream<S, T, B>(&self, stream: S, buf: B)
177	where
178		S: Stream<Item = T> + Unpin,
179		T: Serialize + Send,
180		B: Buffer<Item = T>,
181	{
182		self.pipe_from_try_stream(stream.map(Ok::<T, ()>), buf)
183			.await
184			.expect("No Err will be ever encountered.qed");
185	}
186
187	pub async fn pipe_from_try_stream<S, T, B, E>(&self, mut stream: S, mut buf: B) -> Result<(), E>
190	where
191		S: TryStream<Ok = T, Error = E> + Unpin,
192		T: Serialize + Send,
193		B: Buffer<Item = T>,
194	{
195		let mut next_fut = Box::pin(Fuse::terminated());
196		let mut next_item = stream.try_next();
197		let closed = self.0.closed();
198
199		futures::pin_mut!(closed);
200
201		loop {
202			if next_fut.is_terminated() {
203				if let Some(v) = buf.pop() {
204					let val = self.to_sub_message(&v);
205					next_fut.set(async { self.0.send(val).await }.fuse());
206				}
207			}
208
209			match future::select(closed, future::select(next_fut, next_item)).await {
210				Either::Right((Either::Left((_, n)), c)) => {
212					next_item = n;
213					closed = c;
214					next_fut = Box::pin(Fuse::terminated());
215				},
216				Either::Right((Either::Right((Ok(Some(v)), n)), c)) => {
218					if buf.push(v).is_err() {
219						log::debug!(
220							target: "rpc",
221							"Subscription buffer full for subscription={} conn_id={}; dropping subscription",
222							self.0.method_name(),
223							self.0.connection_id().0
224						);
225						return Ok(());
226					}
227
228					next_fut = n;
229					closed = c;
230					next_item = stream.try_next();
231				},
232				Either::Right((Either::Right((Err(e), _)), _)) => return Err(e),
236				Either::Right((Either::Right((Ok(None), pending_fut)), _)) => {
240					if !pending_fut.is_terminated() && pending_fut.await.is_err() {
241						return Ok(());
242					}
243
244					while let Some(v) = buf.pop() {
245						if self.send(&v).await.is_err() {
246							return Ok(());
247						}
248					}
249
250					return Ok(());
251				},
252				Either::Left(_) => return Ok(()),
254			}
255		}
256	}
257
258	pub async fn send(&self, result: &impl Serialize) -> Result<(), DisconnectError> {
260		self.0.send(self.to_sub_message(result)).await
261	}
262
263	pub fn subscription_id(&self) -> SubscriptionId {
265		self.0.subscription_id()
266	}
267
268	pub async fn closed(&self) {
270		self.0.closed().await
271	}
272
273	fn to_sub_message(&self, result: &impl Serialize) -> SubscriptionMessage {
275		SubscriptionMessage::new(self.0.method_name(), self.0.subscription_id(), result)
276			.expect("Serialize infallible; qed")
277	}
278}
279
280pub fn spawn_subscription_task(
282	executor: &SubscriptionTaskExecutor,
283	fut: impl Future<Output = ()> + Send + 'static,
284) {
285	executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
286}
287
288#[cfg(test)]
289mod tests {
290	use super::*;
291	use futures::StreamExt;
292	use jsonrpsee::{core::EmptyServerParams, RpcModule, Subscription};
293
294	async fn subscribe() -> Subscription {
295		let mut module = RpcModule::new(());
296		module
297			.register_subscription("sub", "my_sub", "unsub", |_, pending, _, _| async move {
298				let stream = futures::stream::iter([0; 16]);
299				PendingSubscription::from(pending)
300					.pipe_from_stream(stream, BoundedVecDeque::new(16))
301					.await;
302				Ok(())
303			})
304			.unwrap();
305
306		module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap()
307	}
308
309	#[tokio::test]
310	async fn pipe_from_stream_works() {
311		let mut sub = subscribe().await;
312		let mut rx = 0;
313
314		while let Some(Ok(_)) = sub.next::<usize>().await {
315			rx += 1;
316		}
317
318		assert_eq!(rx, 16);
319	}
320
321	#[tokio::test]
322	async fn pipe_from_stream_with_bounded_vec() {
323		let (tx, mut rx) = futures::channel::mpsc::unbounded::<()>();
324
325		let mut module = RpcModule::new(tx);
326		module
327			.register_subscription("sub", "my_sub", "unsub", |_, pending, ctx, _| async move {
328				let stream = futures::stream::iter([0; 32]);
329				PendingSubscription::from(pending)
330					.pipe_from_stream(stream, BoundedVecDeque::new(16))
331					.await;
332				_ = ctx.unbounded_send(());
333				Ok(())
334			})
335			.unwrap();
336
337		let mut sub = module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
338
339		_ = rx.next().await.unwrap();
341		assert!(sub.next::<usize>().await.is_none());
342	}
343
344	#[tokio::test]
345	async fn subscription_is_dropped_when_stream_is_empty() {
346		let notify_rx = std::sync::Arc::new(tokio::sync::Notify::new());
347		let notify_tx = notify_rx.clone();
348
349		let mut module = RpcModule::new(notify_tx);
350		module
351			.register_subscription(
352				"sub",
353				"my_sub",
354				"unsub",
355				|_, pending, notify_tx, _| async move {
356					let stream = futures::stream::empty::<()>();
359					PendingSubscription::from(pending)
361						.pipe_from_stream(stream, BoundedVecDeque::default())
362						.await;
363					notify_tx.notify_one();
365					Ok(())
366				},
367			)
368			.unwrap();
369		module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
370
371		notify_rx.notified().await;
373	}
374
375	#[tokio::test]
376	async fn subscription_replace_old_messages() {
377		let mut module = RpcModule::new(());
378		module
379			.register_subscription("sub", "my_sub", "unsub", |_, pending, _, _| async move {
380				let stream = futures::stream::iter(0..20);
382				PendingSubscription::from(pending)
383					.pipe_from_stream(stream, RingBuffer::new(3))
384					.await;
385				Ok(())
386			})
387			.unwrap();
388
389		let mut sub = module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
390
391		tokio::time::sleep(std::time::Duration::from_secs(10)).await;
394
395		let mut res = Vec::new();
396
397		while let Some(Ok((v, _))) = sub.next::<usize>().await {
398			res.push(v);
399		}
400
401		assert_eq!(res, vec![0, 17, 18, 19]);
404	}
405}