referrerpolicy=no-referrer-when-downgrade

sc_rpc/
utils.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! JSON-RPC helpers.
20
21use crate::SubscriptionTaskExecutor;
22use futures::{
23	future::{self, Either, Fuse, FusedFuture},
24	Future, FutureExt, Stream, StreamExt, TryStream, TryStreamExt,
25};
26use jsonrpsee::{
27	types::SubscriptionId, DisconnectError, PendingSubscriptionSink, SubscriptionMessage,
28	SubscriptionSink,
29};
30use sp_runtime::Serialize;
31use std::collections::VecDeque;
32
33const DEFAULT_BUF_SIZE: usize = 16;
34
35/// A trait representing a buffer which may or may not support
36/// to replace items when the buffer is full.
37pub trait Buffer {
38	/// The item type that the buffer holds.
39	type Item;
40
41	/// Push an item to the buffer.
42	///
43	/// Returns `Err` if the buffer doesn't support replacing older items
44	fn push(&mut self, item: Self::Item) -> Result<(), ()>;
45	/// Pop the next item from the buffer.
46	fn pop(&mut self) -> Option<Self::Item>;
47}
48
49/// A simple bounded buffer that will terminate the subscription if the buffer becomes full.
50pub struct BoundedVecDeque<T> {
51	inner: VecDeque<T>,
52	max_cap: usize,
53}
54
55impl<T> Default for BoundedVecDeque<T> {
56	fn default() -> Self {
57		Self { inner: VecDeque::with_capacity(DEFAULT_BUF_SIZE), max_cap: DEFAULT_BUF_SIZE }
58	}
59}
60
61impl<T> BoundedVecDeque<T> {
62	/// Create a new bounded VecDeque.
63	pub fn new(cap: usize) -> Self {
64		Self { inner: VecDeque::with_capacity(cap), max_cap: cap }
65	}
66}
67
68impl<T> Buffer for BoundedVecDeque<T> {
69	type Item = T;
70
71	fn push(&mut self, item: Self::Item) -> Result<(), ()> {
72		if self.inner.len() >= self.max_cap {
73			Err(())
74		} else {
75			self.inner.push_back(item);
76			Ok(())
77		}
78	}
79
80	fn pop(&mut self) -> Option<T> {
81		self.inner.pop_front()
82	}
83}
84
85/// Fixed size ring buffer that replaces the oldest item when full.
86#[derive(Debug)]
87pub struct RingBuffer<T> {
88	inner: VecDeque<T>,
89	cap: usize,
90}
91
92impl<T> RingBuffer<T> {
93	/// Create a new ring buffer.
94	pub fn new(cap: usize) -> Self {
95		Self { inner: VecDeque::with_capacity(cap), cap }
96	}
97}
98
99impl<T> Buffer for RingBuffer<T> {
100	type Item = T;
101
102	fn push(&mut self, item: T) -> Result<(), ()> {
103		if self.inner.len() >= self.cap {
104			self.inner.pop_front();
105		}
106
107		self.inner.push_back(item);
108
109		Ok(())
110	}
111
112	fn pop(&mut self) -> Option<T> {
113		self.inner.pop_front()
114	}
115}
116
117/// A pending subscription.
118pub struct PendingSubscription(PendingSubscriptionSink);
119
120impl From<PendingSubscriptionSink> for PendingSubscription {
121	fn from(p: PendingSubscriptionSink) -> Self {
122		Self(p)
123	}
124}
125
126impl PendingSubscription {
127	/// Feed items to the subscription from the underlying stream
128	/// with specified buffer strategy.
129	pub async fn pipe_from_stream<S, T, B>(self, mut stream: S, mut buf: B)
130	where
131		S: Stream<Item = T> + Unpin + Send + 'static,
132		T: Serialize + Send + 'static,
133		B: Buffer<Item = T>,
134	{
135		let method = self.0.method_name().to_string();
136		let conn_id = self.0.connection_id().0;
137		let accept_fut = self.0.accept();
138
139		futures::pin_mut!(accept_fut);
140
141		// Poll the stream while waiting for the subscription to be accepted
142		//
143		// If the `max_cap` is exceeded then the subscription is dropped.
144		let sink = loop {
145			match future::select(accept_fut, stream.next()).await {
146				Either::Left((Ok(sink), _)) => break sink,
147				Either::Right((Some(msg), f)) => {
148					if buf.push(msg).is_err() {
149						log::debug!(target: "rpc", "Subscription::accept buffer full for subscription={method} conn_id={conn_id}; dropping subscription");
150						return
151					}
152					accept_fut = f;
153				},
154				// The connection was closed or the stream was closed.
155				_ => return,
156			}
157		};
158
159		Subscription(sink).pipe_from_stream(stream, buf).await
160	}
161}
162
163/// An active subscription.
164#[derive(Clone, Debug)]
165pub struct Subscription(SubscriptionSink);
166
167impl From<SubscriptionSink> for Subscription {
168	fn from(sink: SubscriptionSink) -> Self {
169		Self(sink)
170	}
171}
172
173impl Subscription {
174	/// Feed items to the subscription from the underlying stream
175	/// with specified buffer strategy.
176	pub async fn pipe_from_stream<S, T, B>(&self, stream: S, buf: B)
177	where
178		S: Stream<Item = T> + Unpin,
179		T: Serialize + Send,
180		B: Buffer<Item = T>,
181	{
182		self.pipe_from_try_stream(stream.map(Ok::<T, ()>), buf)
183			.await
184			.expect("No Err will be ever encountered.qed");
185	}
186
187	/// Feed items to the subscription from the underlying stream
188	/// with specified buffer strategy.
189	pub async fn pipe_from_try_stream<S, T, B, E>(&self, mut stream: S, mut buf: B) -> Result<(), E>
190	where
191		S: TryStream<Ok = T, Error = E> + Unpin,
192		T: Serialize + Send,
193		B: Buffer<Item = T>,
194	{
195		let mut next_fut = Box::pin(Fuse::terminated());
196		let mut next_item = stream.try_next();
197		let closed = self.0.closed();
198
199		futures::pin_mut!(closed);
200
201		loop {
202			if next_fut.is_terminated() {
203				if let Some(v) = buf.pop() {
204					let val = self.to_sub_message(&v);
205					next_fut.set(async { self.0.send(val).await }.fuse());
206				}
207			}
208
209			match future::select(closed, future::select(next_fut, next_item)).await {
210				// Send operation finished.
211				Either::Right((Either::Left((_, n)), c)) => {
212					next_item = n;
213					closed = c;
214					next_fut = Box::pin(Fuse::terminated());
215				},
216				// New item from the stream
217				Either::Right((Either::Right((Ok(Some(v)), n)), c)) => {
218					if buf.push(v).is_err() {
219						log::debug!(
220							target: "rpc",
221							"Subscription buffer full for subscription={} conn_id={}; dropping subscription",
222							self.0.method_name(),
223							self.0.connection_id().0
224						);
225						return Ok(());
226					}
227
228					next_fut = n;
229					closed = c;
230					next_item = stream.try_next();
231				},
232				// Error occured while processing the stream.
233				//
234				// terminate the stream.
235				Either::Right((Either::Right((Err(e), _)), _)) => return Err(e),
236				// Stream "finished".
237				//
238				// Process remaining items and terminate.
239				Either::Right((Either::Right((Ok(None), pending_fut)), _)) => {
240					if !pending_fut.is_terminated() && pending_fut.await.is_err() {
241						return Ok(());
242					}
243
244					while let Some(v) = buf.pop() {
245						if self.send(&v).await.is_err() {
246							return Ok(());
247						}
248					}
249
250					return Ok(());
251				},
252				// Subscription was closed.
253				Either::Left(_) => return Ok(()),
254			}
255		}
256	}
257
258	/// Send a message on the subscription.
259	pub async fn send(&self, result: &impl Serialize) -> Result<(), DisconnectError> {
260		self.0.send(self.to_sub_message(result)).await
261	}
262
263	/// Get the subscription id.
264	pub fn subscription_id(&self) -> SubscriptionId {
265		self.0.subscription_id()
266	}
267
268	/// Completes when the subscription is closed.
269	pub async fn closed(&self) {
270		self.0.closed().await
271	}
272
273	/// Convert a result to a subscription message.
274	fn to_sub_message(&self, result: &impl Serialize) -> SubscriptionMessage {
275		SubscriptionMessage::new(self.0.method_name(), self.0.subscription_id(), result)
276			.expect("Serialize infallible; qed")
277	}
278}
279
280/// Helper for spawning non-blocking rpc subscription task.
281pub fn spawn_subscription_task(
282	executor: &SubscriptionTaskExecutor,
283	fut: impl Future<Output = ()> + Send + 'static,
284) {
285	executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
286}
287
288#[cfg(test)]
289mod tests {
290	use super::*;
291	use futures::StreamExt;
292	use jsonrpsee::{core::EmptyServerParams, RpcModule, Subscription};
293
294	async fn subscribe() -> Subscription {
295		let mut module = RpcModule::new(());
296		module
297			.register_subscription("sub", "my_sub", "unsub", |_, pending, _, _| async move {
298				let stream = futures::stream::iter([0; 16]);
299				PendingSubscription::from(pending)
300					.pipe_from_stream(stream, BoundedVecDeque::new(16))
301					.await;
302				Ok(())
303			})
304			.unwrap();
305
306		module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap()
307	}
308
309	#[tokio::test]
310	async fn pipe_from_stream_works() {
311		let mut sub = subscribe().await;
312		let mut rx = 0;
313
314		while let Some(Ok(_)) = sub.next::<usize>().await {
315			rx += 1;
316		}
317
318		assert_eq!(rx, 16);
319	}
320
321	#[tokio::test]
322	async fn pipe_from_stream_with_bounded_vec() {
323		let (tx, mut rx) = futures::channel::mpsc::unbounded::<()>();
324
325		let mut module = RpcModule::new(tx);
326		module
327			.register_subscription("sub", "my_sub", "unsub", |_, pending, ctx, _| async move {
328				let stream = futures::stream::iter([0; 32]);
329				PendingSubscription::from(pending)
330					.pipe_from_stream(stream, BoundedVecDeque::new(16))
331					.await;
332				_ = ctx.unbounded_send(());
333				Ok(())
334			})
335			.unwrap();
336
337		let mut sub = module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
338
339		// When the 17th item arrives the subscription is dropped
340		_ = rx.next().await.unwrap();
341		assert!(sub.next::<usize>().await.is_none());
342	}
343
344	#[tokio::test]
345	async fn subscription_is_dropped_when_stream_is_empty() {
346		let notify_rx = std::sync::Arc::new(tokio::sync::Notify::new());
347		let notify_tx = notify_rx.clone();
348
349		let mut module = RpcModule::new(notify_tx);
350		module
351			.register_subscription(
352				"sub",
353				"my_sub",
354				"unsub",
355				|_, pending, notify_tx, _| async move {
356					// emulate empty stream for simplicity: otherwise we need some mechanism
357					// to sync buffer and channel send operations
358					let stream = futures::stream::empty::<()>();
359					// this should exit immediately
360					PendingSubscription::from(pending)
361						.pipe_from_stream(stream, BoundedVecDeque::default())
362						.await;
363					// notify that the `pipe_from_stream` has returned
364					notify_tx.notify_one();
365					Ok(())
366				},
367			)
368			.unwrap();
369		module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
370
371		// it should fire once `pipe_from_stream` returns
372		notify_rx.notified().await;
373	}
374
375	#[tokio::test]
376	async fn subscription_replace_old_messages() {
377		let mut module = RpcModule::new(());
378		module
379			.register_subscription("sub", "my_sub", "unsub", |_, pending, _, _| async move {
380				// Send items 0..20 and ensure that only the last 3 are kept in the buffer.
381				let stream = futures::stream::iter(0..20);
382				PendingSubscription::from(pending)
383					.pipe_from_stream(stream, RingBuffer::new(3))
384					.await;
385				Ok(())
386			})
387			.unwrap();
388
389		let mut sub = module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
390
391		// This is a hack simulate a very slow client
392		// and all older messages are replaced.
393		tokio::time::sleep(std::time::Duration::from_secs(10)).await;
394
395		let mut res = Vec::new();
396
397		while let Some(Ok((v, _))) = sub.next::<usize>().await {
398			res.push(v);
399		}
400
401		// There is no way to cancel pending send operations so
402		// that's why 0 is included here.
403		assert_eq!(res, vec![0, 17, 18, 19]);
404	}
405}