anvil_polkadot/substrate_node/
in_mem_rpc.rs1use futures::stream;
2use polkadot_sdk::sc_service::RpcHandlers;
3use serde_json::Value;
4use subxt::{
5 backend::rpc::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT},
6 ext::{jsonrpsee::core::traits::ToRpcParams, subxt_rpcs::Error as SubxtRpcError},
7};
8
9pub struct InMemoryRpcClient(pub RpcHandlers);
10
11pub struct Params(Option<Box<RawValue>>);
12
13impl ToRpcParams for Params {
14 fn to_rpc_params(self) -> std::result::Result<Option<Box<RawValue>>, serde_json::Error> {
15 Ok(self.0)
16 }
17}
18
19impl RpcClientT for InMemoryRpcClient {
20 fn request_raw<'a>(
21 &'a self,
22 method: &'a str,
23 params: Option<Box<RawValue>>,
24 ) -> RawRpcFuture<'a, Box<RawValue>> {
25 Box::pin(async move {
26 self.0
27 .handle()
28 .call(method, Params(params))
29 .await
30 .map_err(|err| SubxtRpcError::Client(Box::new(err)))
31 })
32 }
33
34 fn subscribe_raw<'a>(
35 &'a self,
36 sub: &'a str,
37 params: Option<Box<RawValue>>,
38 _unsub: &'a str,
39 ) -> RawRpcFuture<'a, RawRpcSubscription> {
40 Box::pin(async move {
41 let subscription = self
42 .0
43 .handle()
44 .subscribe_unbounded(sub, Params(params))
45 .await
46 .map_err(|err| SubxtRpcError::Client(Box::new(err)))?;
47 let id = Value::from(subscription.subscription_id().to_owned())
48 .as_str()
49 .map(|s| s.to_string());
50 let raw_stream = stream::unfold(subscription, |mut sub| async move {
51 match sub.next::<Box<RawValue>>().await {
52 Some(Ok((notification, _sub_id))) => Some((Ok(notification), sub)),
53 Some(Err(e)) => Some((Err(SubxtRpcError::Client(Box::new(e))), sub)),
54 None => None, }
56 });
57 Ok(RawRpcSubscription { stream: Box::pin(raw_stream), id })
58 })
59 }
60}