1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use crate::{RpcChannel, RpcError};
use failure::format_err;
use futures::prelude::*;
use futures::sync::mpsc;
use jsonrpc_core::{MetaIoHandler, Metadata};
use jsonrpc_pubsub::Session;
use std::collections::VecDeque;
use std::ops::Deref;
use std::sync::Arc;
pub struct LocalRpc<THandler, TMetadata> {
handler: THandler,
meta: TMetadata,
queue: VecDeque<String>,
}
impl<TMetadata, THandler> LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata,
THandler: Deref<Target = MetaIoHandler<TMetadata>>,
{
pub fn new(handler: THandler) -> Self
where
TMetadata: Default,
{
Self::with_metadata(handler, Default::default())
}
pub fn with_metadata(handler: THandler, meta: TMetadata) -> Self {
Self {
handler,
meta,
queue: Default::default(),
}
}
}
impl<TMetadata, THandler> Stream for LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata,
THandler: Deref<Target = MetaIoHandler<TMetadata>>,
{
type Item = String;
type Error = RpcError;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
match self.queue.pop_front() {
Some(response) => Ok(Async::Ready(Some(response))),
None => Ok(Async::NotReady),
}
}
}
impl<TMetadata, THandler> Sink for LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata,
THandler: Deref<Target = MetaIoHandler<TMetadata>>,
{
type SinkItem = String;
type SinkError = RpcError;
fn start_send(&mut self, request: Self::SinkItem) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
match self.handler.handle_request_sync(&request, self.meta.clone()) {
Some(response) => self.queue.push_back(response),
None => {}
};
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
Ok(Async::Ready(()))
}
}
pub fn connect_with_metadata<TClient, THandler, TMetadata>(
handler: THandler,
meta: TMetadata,
) -> (TClient, impl Future<Item = (), Error = RpcError>)
where
TClient: From<RpcChannel>,
THandler: Deref<Target = MetaIoHandler<TMetadata>>,
TMetadata: Metadata,
{
let (sink, stream) = LocalRpc::with_metadata(handler, meta).split();
let (rpc_client, sender) = crate::transports::duplex(sink, stream);
let client = TClient::from(sender);
(client, rpc_client)
}
pub fn connect<TClient, THandler, TMetadata>(handler: THandler) -> (TClient, impl Future<Item = (), Error = RpcError>)
where
TClient: From<RpcChannel>,
THandler: Deref<Target = MetaIoHandler<TMetadata>>,
TMetadata: Metadata + Default,
{
connect_with_metadata(handler, Default::default())
}
pub type LocalMeta = Arc<Session>;
pub fn connect_with_pubsub<TClient, THandler>(handler: THandler) -> (TClient, impl Future<Item = (), Error = RpcError>)
where
TClient: From<RpcChannel>,
THandler: Deref<Target = MetaIoHandler<LocalMeta>>,
{
let (tx, rx) = mpsc::channel(0);
let meta = Arc::new(Session::new(tx));
let (sink, stream) = LocalRpc::with_metadata(handler, meta).split();
let stream = stream.select(rx.map_err(|_| RpcError::Other(format_err!("Pubsub channel returned an error"))));
let (rpc_client, sender) = crate::transports::duplex(sink, stream);
let client = TClient::from(sender);
(client, rpc_client)
}