Skip to main content

anvil_polkadot/substrate_node/
in_mem_rpc.rs

1use futures::stream;
2use polkadot_sdk::sc_service::RpcHandlers;
3use serde_json::Value;
4use subxt::{
5    backend::rpc::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT},
6    ext::{jsonrpsee::core::traits::ToRpcParams, subxt_rpcs::Error as SubxtRpcError},
7};
8
9pub struct InMemoryRpcClient(pub RpcHandlers);
10
11pub struct Params(Option<Box<RawValue>>);
12
13impl ToRpcParams for Params {
14    fn to_rpc_params(self) -> std::result::Result<Option<Box<RawValue>>, serde_json::Error> {
15        Ok(self.0)
16    }
17}
18
19impl RpcClientT for InMemoryRpcClient {
20    fn request_raw<'a>(
21        &'a self,
22        method: &'a str,
23        params: Option<Box<RawValue>>,
24    ) -> RawRpcFuture<'a, Box<RawValue>> {
25        Box::pin(async move {
26            self.0
27                .handle()
28                .call(method, Params(params))
29                .await
30                .map_err(|err| SubxtRpcError::Client(Box::new(err)))
31        })
32    }
33
34    fn subscribe_raw<'a>(
35        &'a self,
36        sub: &'a str,
37        params: Option<Box<RawValue>>,
38        _unsub: &'a str,
39    ) -> RawRpcFuture<'a, RawRpcSubscription> {
40        Box::pin(async move {
41            let subscription = self
42                .0
43                .handle()
44                .subscribe_unbounded(sub, Params(params))
45                .await
46                .map_err(|err| SubxtRpcError::Client(Box::new(err)))?;
47            let id = Value::from(subscription.subscription_id().to_owned())
48                .as_str()
49                .map(|s| s.to_string());
50            let raw_stream = stream::unfold(subscription, |mut sub| async move {
51                match sub.next::<Box<RawValue>>().await {
52                    Some(Ok((notification, _sub_id))) => Some((Ok(notification), sub)),
53                    Some(Err(e)) => Some((Err(SubxtRpcError::Client(Box::new(e))), sub)),
54                    None => None, // Subscription ended, Do something here? :-??
55                }
56            });
57            Ok(RawRpcSubscription { stream: Box::pin(raw_stream), id })
58        })
59    }
60}