use crate::SubscriptionTaskExecutor;
use futures::{
future::{self, Either, Fuse, FusedFuture},
Future, FutureExt, Stream, StreamExt,
};
use jsonrpsee::{
types::SubscriptionId, DisconnectError, PendingSubscriptionSink, SubscriptionMessage,
SubscriptionSink,
};
use sp_runtime::Serialize;
use std::collections::VecDeque;
const DEFAULT_BUF_SIZE: usize = 16;
pub trait Buffer {
type Item;
fn push(&mut self, item: Self::Item) -> Result<(), ()>;
fn pop(&mut self) -> Option<Self::Item>;
}
pub struct BoundedVecDeque<T> {
inner: VecDeque<T>,
max_cap: usize,
}
impl<T> Default for BoundedVecDeque<T> {
fn default() -> Self {
Self { inner: VecDeque::with_capacity(DEFAULT_BUF_SIZE), max_cap: DEFAULT_BUF_SIZE }
}
}
impl<T> BoundedVecDeque<T> {
pub fn new(cap: usize) -> Self {
Self { inner: VecDeque::with_capacity(cap), max_cap: cap }
}
}
impl<T> Buffer for BoundedVecDeque<T> {
type Item = T;
fn push(&mut self, item: Self::Item) -> Result<(), ()> {
if self.inner.len() >= self.max_cap {
Err(())
} else {
self.inner.push_back(item);
Ok(())
}
}
fn pop(&mut self) -> Option<T> {
self.inner.pop_front()
}
}
#[derive(Debug)]
pub struct RingBuffer<T> {
inner: VecDeque<T>,
cap: usize,
}
impl<T> RingBuffer<T> {
pub fn new(cap: usize) -> Self {
Self { inner: VecDeque::with_capacity(cap), cap }
}
}
impl<T> Buffer for RingBuffer<T> {
type Item = T;
fn push(&mut self, item: T) -> Result<(), ()> {
if self.inner.len() >= self.cap {
self.inner.pop_front();
}
self.inner.push_back(item);
Ok(())
}
fn pop(&mut self) -> Option<T> {
self.inner.pop_front()
}
}
pub struct PendingSubscription(PendingSubscriptionSink);
impl From<PendingSubscriptionSink> for PendingSubscription {
fn from(p: PendingSubscriptionSink) -> Self {
Self(p)
}
}
impl PendingSubscription {
pub async fn pipe_from_stream<S, T, B>(self, mut stream: S, mut buf: B)
where
S: Stream<Item = T> + Unpin + Send + 'static,
T: Serialize + Send + 'static,
B: Buffer<Item = T>,
{
let method = self.0.method_name().to_string();
let conn_id = self.0.connection_id().0;
let accept_fut = self.0.accept();
futures::pin_mut!(accept_fut);
let sink = loop {
match future::select(accept_fut, stream.next()).await {
Either::Left((Ok(sink), _)) => break sink,
Either::Right((Some(msg), f)) => {
if buf.push(msg).is_err() {
log::debug!(target: "rpc", "Subscription::accept buffer full for subscription={method} conn_id={conn_id}; dropping subscription");
return
}
accept_fut = f;
},
_ => return,
}
};
Subscription(sink).pipe_from_stream(stream, buf).await
}
}
#[derive(Clone, Debug)]
pub struct Subscription(SubscriptionSink);
impl From<SubscriptionSink> for Subscription {
fn from(sink: SubscriptionSink) -> Self {
Self(sink)
}
}
impl Subscription {
pub async fn pipe_from_stream<S, T, B>(self, mut stream: S, mut buf: B)
where
S: Stream<Item = T> + Unpin + Send + 'static,
T: Serialize + Send + 'static,
B: Buffer<Item = T>,
{
let mut next_fut = Box::pin(Fuse::terminated());
let mut next_item = stream.next();
let closed = self.0.closed();
futures::pin_mut!(closed);
loop {
if next_fut.is_terminated() {
if let Some(v) = buf.pop() {
let val = self.to_sub_message(&v);
next_fut.set(async { self.0.send(val).await }.fuse());
}
}
match future::select(closed, future::select(next_fut, next_item)).await {
Either::Right((Either::Left((_, n)), c)) => {
next_item = n;
closed = c;
next_fut = Box::pin(Fuse::terminated());
},
Either::Right((Either::Right((Some(v), n)), c)) => {
if buf.push(v).is_err() {
log::debug!(
target: "rpc",
"Subscription buffer full for subscription={} conn_id={}; dropping subscription",
self.0.method_name(),
self.0.connection_id().0
);
return
}
next_fut = n;
closed = c;
next_item = stream.next();
},
Either::Right((Either::Right((None, pending_fut)), _)) => {
if !pending_fut.is_terminated() && pending_fut.await.is_err() {
return;
}
while let Some(v) = buf.pop() {
if self.send(&v).await.is_err() {
return;
}
}
return;
},
Either::Left(_) => return,
}
}
}
pub async fn send(&self, result: &impl Serialize) -> Result<(), DisconnectError> {
self.0.send(self.to_sub_message(result)).await
}
pub fn subscription_id(&self) -> SubscriptionId {
self.0.subscription_id()
}
pub async fn closed(&self) {
self.0.closed().await
}
fn to_sub_message(&self, result: &impl Serialize) -> SubscriptionMessage {
SubscriptionMessage::new(self.0.method_name(), self.0.subscription_id(), result)
.expect("Serialize infallible; qed")
}
}
pub fn spawn_subscription_task(
executor: &SubscriptionTaskExecutor,
fut: impl Future<Output = ()> + Send + 'static,
) {
executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use jsonrpsee::{core::EmptyServerParams, RpcModule, Subscription};
async fn subscribe() -> Subscription {
let mut module = RpcModule::new(());
module
.register_subscription("sub", "my_sub", "unsub", |_, pending, _, _| async move {
let stream = futures::stream::iter([0; 16]);
PendingSubscription::from(pending)
.pipe_from_stream(stream, BoundedVecDeque::new(16))
.await;
Ok(())
})
.unwrap();
module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap()
}
#[tokio::test]
async fn pipe_from_stream_works() {
let mut sub = subscribe().await;
let mut rx = 0;
while let Some(Ok(_)) = sub.next::<usize>().await {
rx += 1;
}
assert_eq!(rx, 16);
}
#[tokio::test]
async fn pipe_from_stream_with_bounded_vec() {
let (tx, mut rx) = futures::channel::mpsc::unbounded::<()>();
let mut module = RpcModule::new(tx);
module
.register_subscription("sub", "my_sub", "unsub", |_, pending, ctx, _| async move {
let stream = futures::stream::iter([0; 32]);
PendingSubscription::from(pending)
.pipe_from_stream(stream, BoundedVecDeque::new(16))
.await;
_ = ctx.unbounded_send(());
Ok(())
})
.unwrap();
let mut sub = module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
_ = rx.next().await.unwrap();
assert!(sub.next::<usize>().await.is_none());
}
#[tokio::test]
async fn subscription_is_dropped_when_stream_is_empty() {
let notify_rx = std::sync::Arc::new(tokio::sync::Notify::new());
let notify_tx = notify_rx.clone();
let mut module = RpcModule::new(notify_tx);
module
.register_subscription(
"sub",
"my_sub",
"unsub",
|_, pending, notify_tx, _| async move {
let stream = futures::stream::empty::<()>();
PendingSubscription::from(pending)
.pipe_from_stream(stream, BoundedVecDeque::default())
.await;
notify_tx.notify_one();
Ok(())
},
)
.unwrap();
module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
notify_rx.notified().await;
}
#[tokio::test]
async fn subscription_replace_old_messages() {
let mut module = RpcModule::new(());
module
.register_subscription("sub", "my_sub", "unsub", |_, pending, _, _| async move {
let stream = futures::stream::iter(0..20);
PendingSubscription::from(pending)
.pipe_from_stream(stream, RingBuffer::new(3))
.await;
Ok(())
})
.unwrap();
let mut sub = module.subscribe("sub", EmptyServerParams::new(), 1).await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
let mut res = Vec::new();
while let Some(Ok((v, _))) = sub.next::<usize>().await {
res.push(v);
}
assert_eq!(res, vec![0, 17, 18, 19]);
}
}