1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
//! Rpc client implementation for `Deref<Target=MetaIoHandler<Metadata>>`.

use crate::{RpcChannel, RpcError};
use failure::format_err;
use futures::prelude::*;
use futures::sync::mpsc;
use jsonrpc_core::{MetaIoHandler, Metadata};
use jsonrpc_pubsub::Session;
use std::collections::VecDeque;
use std::ops::Deref;
use std::sync::Arc;

/// Implements a rpc client for `MetaIoHandler`.
pub struct LocalRpc<THandler, TMetadata> {
	handler: THandler,
	meta: TMetadata,
	queue: VecDeque<String>,
}

impl<TMetadata, THandler> LocalRpc<THandler, TMetadata>
where
	TMetadata: Metadata,
	THandler: Deref<Target = MetaIoHandler<TMetadata>>,
{
	/// Creates a new `LocalRpc` with default metadata.
	pub fn new(handler: THandler) -> Self
	where
		TMetadata: Default,
	{
		Self::with_metadata(handler, Default::default())
	}

	/// Creates a new `LocalRpc` with given handler and metadata.
	pub fn with_metadata(handler: THandler, meta: TMetadata) -> Self {
		Self {
			handler,
			meta,
			queue: Default::default(),
		}
	}
}

impl<TMetadata, THandler> Stream for LocalRpc<THandler, TMetadata>
where
	TMetadata: Metadata,
	THandler: Deref<Target = MetaIoHandler<TMetadata>>,
{
	type Item = String;
	type Error = RpcError;

	fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
		match self.queue.pop_front() {
			Some(response) => Ok(Async::Ready(Some(response))),
			None => Ok(Async::NotReady),
		}
	}
}

impl<TMetadata, THandler> Sink for LocalRpc<THandler, TMetadata>
where
	TMetadata: Metadata,
	THandler: Deref<Target = MetaIoHandler<TMetadata>>,
{
	type SinkItem = String;
	type SinkError = RpcError;

	fn start_send(&mut self, request: Self::SinkItem) -> Result<AsyncSink<Self::SinkItem>, Self::SinkError> {
		match self.handler.handle_request_sync(&request, self.meta.clone()) {
			Some(response) => self.queue.push_back(response),
			None => {}
		};
		Ok(AsyncSink::Ready)
	}

	fn poll_complete(&mut self) -> Result<Async<()>, Self::SinkError> {
		Ok(Async::Ready(()))
	}
}

/// Connects to a `Deref<Target = MetaIoHandler<Metadata>`.
pub fn connect_with_metadata<TClient, THandler, TMetadata>(
	handler: THandler,
	meta: TMetadata,
) -> (TClient, impl Future<Item = (), Error = RpcError>)
where
	TClient: From<RpcChannel>,
	THandler: Deref<Target = MetaIoHandler<TMetadata>>,
	TMetadata: Metadata,
{
	let (sink, stream) = LocalRpc::with_metadata(handler, meta).split();
	let (rpc_client, sender) = crate::transports::duplex(sink, stream);
	let client = TClient::from(sender);
	(client, rpc_client)
}

/// Connects to a `Deref<Target = MetaIoHandler<Metadata + Default>`.
pub fn connect<TClient, THandler, TMetadata>(handler: THandler) -> (TClient, impl Future<Item = (), Error = RpcError>)
where
	TClient: From<RpcChannel>,
	THandler: Deref<Target = MetaIoHandler<TMetadata>>,
	TMetadata: Metadata + Default,
{
	connect_with_metadata(handler, Default::default())
}

/// Metadata for LocalRpc.
pub type LocalMeta = Arc<Session>;

/// Connects with pubsub.
pub fn connect_with_pubsub<TClient, THandler>(handler: THandler) -> (TClient, impl Future<Item = (), Error = RpcError>)
where
	TClient: From<RpcChannel>,
	THandler: Deref<Target = MetaIoHandler<LocalMeta>>,
{
	let (tx, rx) = mpsc::channel(0);
	let meta = Arc::new(Session::new(tx));
	let (sink, stream) = LocalRpc::with_metadata(handler, meta).split();
	let stream = stream.select(rx.map_err(|_| RpcError::Other(format_err!("Pubsub channel returned an error"))));
	let (rpc_client, sender) = crate::transports::duplex(sink, stream);
	let client = TClient::from(sender);
	(client, rpc_client)
}