#![deny(missing_docs)]
use jsonrpc_server_utils as server_utils;
use net2;
pub use hyper;
pub use jsonrpc_core;
#[macro_use]
extern crate log;
mod handler;
mod response;
#[cfg(test)]
mod tests;
mod utils;
use std::io;
use std::net::SocketAddr;
use std::sync::{mpsc, Arc};
use std::thread;
use parking_lot::Mutex;
use crate::jsonrpc::futures::sync::oneshot;
use crate::jsonrpc::futures::{self, Future, Stream};
use crate::jsonrpc::MetaIoHandler;
use crate::server_utils::reactor::{Executor, UninitializedExecutor};
use hyper::{server, Body};
use jsonrpc_core as jsonrpc;
pub use crate::handler::ServerHandler;
pub use crate::response::Response;
pub use crate::server_utils::cors::{self, AccessControlAllowOrigin, AllowCors, Origin};
pub use crate::server_utils::hosts::{DomainsValidation, Host};
pub use crate::server_utils::{tokio, SuspendableStream};
pub use crate::utils::{cors_allow_headers, cors_allow_origin, is_host_allowed};
pub enum RequestMiddlewareAction {
Proceed {
should_continue_on_invalid_cors: bool,
request: hyper::Request<Body>,
},
Respond {
should_validate_hosts: bool,
response: Box<dyn Future<Item = hyper::Response<Body>, Error = hyper::Error> + Send>,
},
}
impl From<Response> for RequestMiddlewareAction {
fn from(o: Response) -> Self {
RequestMiddlewareAction::Respond {
should_validate_hosts: true,
response: Box::new(futures::future::ok(o.into())),
}
}
}
impl From<hyper::Response<Body>> for RequestMiddlewareAction {
fn from(response: hyper::Response<Body>) -> Self {
RequestMiddlewareAction::Respond {
should_validate_hosts: true,
response: Box::new(futures::future::ok(response)),
}
}
}
impl From<hyper::Request<Body>> for RequestMiddlewareAction {
fn from(request: hyper::Request<Body>) -> Self {
RequestMiddlewareAction::Proceed {
should_continue_on_invalid_cors: false,
request,
}
}
}
pub trait RequestMiddleware: Send + Sync + 'static {
fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction;
}
impl<F> RequestMiddleware for F
where
F: Fn(hyper::Request<Body>) -> RequestMiddlewareAction + Sync + Send + 'static,
{
fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction {
(*self)(request)
}
}
#[derive(Default)]
struct NoopRequestMiddleware;
impl RequestMiddleware for NoopRequestMiddleware {
fn on_request(&self, request: hyper::Request<Body>) -> RequestMiddlewareAction {
RequestMiddlewareAction::Proceed {
should_continue_on_invalid_cors: false,
request,
}
}
}
pub trait MetaExtractor<M: jsonrpc::Metadata>: Sync + Send + 'static {
fn read_metadata(&self, _: &hyper::Request<Body>) -> M;
}
impl<M, F> MetaExtractor<M> for F
where
M: jsonrpc::Metadata,
F: Fn(&hyper::Request<Body>) -> M + Sync + Send + 'static,
{
fn read_metadata(&self, req: &hyper::Request<Body>) -> M {
(*self)(req)
}
}
#[derive(Default)]
struct NoopExtractor;
impl<M: jsonrpc::Metadata + Default> MetaExtractor<M> for NoopExtractor {
fn read_metadata(&self, _: &hyper::Request<Body>) -> M {
M::default()
}
}
pub struct Rpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
pub handler: Arc<MetaIoHandler<M, S>>,
pub extractor: Arc<dyn MetaExtractor<M>>,
}
impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
fn clone(&self) -> Self {
Rpc {
handler: self.handler.clone(),
extractor: self.extractor.clone(),
}
}
}
type AllowedHosts = Option<Vec<Host>>;
type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum RestApi {
Secure,
Unsecure,
Disabled,
}
pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
handler: Arc<MetaIoHandler<M, S>>,
executor: UninitializedExecutor,
meta_extractor: Arc<dyn MetaExtractor<M>>,
request_middleware: Arc<dyn RequestMiddleware>,
cors_domains: CorsDomains,
cors_max_age: Option<u32>,
allowed_headers: cors::AccessControlAllowHeaders,
allowed_hosts: AllowedHosts,
rest_api: RestApi,
health_api: Option<(String, String)>,
keep_alive: bool,
threads: usize,
max_request_body_size: usize,
}
impl<M: jsonrpc::Metadata + Default, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
pub fn new<T>(handler: T) -> Self
where
T: Into<MetaIoHandler<M, S>>,
{
Self::with_meta_extractor(handler, NoopExtractor)
}
}
impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self
where
T: Into<MetaIoHandler<M, S>>,
E: MetaExtractor<M>,
{
ServerBuilder {
handler: Arc::new(handler.into()),
executor: UninitializedExecutor::Unspawned,
meta_extractor: Arc::new(extractor),
request_middleware: Arc::new(NoopRequestMiddleware::default()),
cors_domains: None,
cors_max_age: None,
allowed_headers: cors::AccessControlAllowHeaders::Any,
allowed_hosts: None,
rest_api: RestApi::Disabled,
health_api: None,
keep_alive: true,
threads: 1,
max_request_body_size: 5 * 1024 * 1024,
}
}
pub fn event_loop_executor(mut self, executor: tokio::runtime::TaskExecutor) -> Self {
self.executor = UninitializedExecutor::Shared(executor);
self
}
pub fn rest_api(mut self, rest_api: RestApi) -> Self {
self.rest_api = rest_api;
self
}
pub fn health_api<A, B, T>(mut self, health_api: T) -> Self
where
T: Into<Option<(A, B)>>,
A: Into<String>,
B: Into<String>,
{
self.health_api = health_api.into().map(|(a, b)| (a.into(), b.into()));
self
}
pub fn keep_alive(mut self, val: bool) -> Self {
self.keep_alive = val;
self
}
#[cfg(not(unix))]
#[allow(unused_mut)]
pub fn threads(mut self, _threads: usize) -> Self {
warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
self
}
#[cfg(unix)]
pub fn threads(mut self, threads: usize) -> Self {
self.threads = threads;
self
}
pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
self.cors_domains = cors_domains.into();
self
}
pub fn cors_max_age<T: Into<Option<u32>>>(mut self, cors_max_age: T) -> Self {
self.cors_max_age = cors_max_age.into();
self
}
pub fn cors_allow_headers(mut self, allowed_headers: cors::AccessControlAllowHeaders) -> Self {
self.allowed_headers = allowed_headers;
self
}
pub fn request_middleware<T: RequestMiddleware>(mut self, middleware: T) -> Self {
self.request_middleware = Arc::new(middleware);
self
}
pub fn meta_extractor<T: MetaExtractor<M>>(mut self, extractor: T) -> Self {
self.meta_extractor = Arc::new(extractor);
self
}
pub fn allow_only_bind_host(mut self) -> Self {
self.allowed_hosts = Some(Vec::new());
self
}
pub fn allowed_hosts(mut self, allowed_hosts: DomainsValidation<Host>) -> Self {
self.allowed_hosts = allowed_hosts.into();
self
}
pub fn max_request_body_size(mut self, val: usize) -> Self {
self.max_request_body_size = val;
self
}
pub fn start_http(self, addr: &SocketAddr) -> io::Result<Server> {
let cors_domains = self.cors_domains;
let cors_max_age = self.cors_max_age;
let allowed_headers = self.allowed_headers;
let request_middleware = self.request_middleware;
let allowed_hosts = self.allowed_hosts;
let jsonrpc_handler = Rpc {
handler: self.handler,
extractor: self.meta_extractor,
};
let rest_api = self.rest_api;
let health_api = self.health_api;
let keep_alive = self.keep_alive;
let reuse_port = self.threads > 1;
let (local_addr_tx, local_addr_rx) = mpsc::channel();
let (close, shutdown_signal) = oneshot::channel();
let (done_tx, done_rx) = oneshot::channel();
let eloop = self.executor.init_with_name("http.worker0")?;
let req_max_size = self.max_request_body_size;
serve(
(shutdown_signal, local_addr_tx, done_tx),
eloop.executor(),
addr.to_owned(),
cors_domains.clone(),
cors_max_age,
allowed_headers.clone(),
request_middleware.clone(),
allowed_hosts.clone(),
jsonrpc_handler.clone(),
rest_api,
health_api.clone(),
keep_alive,
reuse_port,
req_max_size,
);
let handles = (0..self.threads - 1)
.map(|i| {
let (local_addr_tx, local_addr_rx) = mpsc::channel();
let (close, shutdown_signal) = oneshot::channel();
let (done_tx, done_rx) = oneshot::channel();
let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
serve(
(shutdown_signal, local_addr_tx, done_tx),
eloop.executor(),
addr.to_owned(),
cors_domains.clone(),
cors_max_age,
allowed_headers.clone(),
request_middleware.clone(),
allowed_hosts.clone(),
jsonrpc_handler.clone(),
rest_api,
health_api.clone(),
keep_alive,
reuse_port,
req_max_size,
);
Ok((eloop, close, local_addr_rx, done_rx))
})
.collect::<io::Result<Vec<_>>>()?;
let local_addr = recv_address(local_addr_rx);
let mut handles: Vec<(Executor, oneshot::Sender<()>, oneshot::Receiver<()>)> = handles
.into_iter()
.map(|(eloop, close, local_addr_rx, done_rx)| {
let _ = recv_address(local_addr_rx)?;
Ok((eloop, close, done_rx))
})
.collect::<io::Result<(Vec<_>)>>()?;
handles.push((eloop, close, done_rx));
let (executors, done_rxs) = handles
.into_iter()
.fold((vec![], vec![]), |mut acc, (eloop, closer, done_rx)| {
acc.0.push((eloop, closer));
acc.1.push(done_rx);
acc
});
Ok(Server {
address: local_addr?,
executors: Arc::new(Mutex::new(Some(executors))),
done: Some(done_rxs),
})
}
}
fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
local_addr_rx
.recv()
.map_err(|_| io::Error::new(io::ErrorKind::Interrupted, ""))?
}
fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
signals: (
oneshot::Receiver<()>,
mpsc::Sender<io::Result<SocketAddr>>,
oneshot::Sender<()>,
),
executor: tokio::runtime::TaskExecutor,
addr: SocketAddr,
cors_domains: CorsDomains,
cors_max_age: Option<u32>,
allowed_headers: cors::AccessControlAllowHeaders,
request_middleware: Arc<dyn RequestMiddleware>,
allowed_hosts: AllowedHosts,
jsonrpc_handler: Rpc<M, S>,
rest_api: RestApi,
health_api: Option<(String, String)>,
keep_alive: bool,
reuse_port: bool,
max_request_body_size: usize,
) {
let (shutdown_signal, local_addr_tx, done_tx) = signals;
executor.spawn({
let handle = tokio::reactor::Handle::default();
let bind = move || {
let listener = match addr {
SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
};
configure_port(reuse_port, &listener)?;
listener.reuse_address(true)?;
listener.bind(&addr)?;
let listener = listener.listen(1024)?;
let listener = tokio::net::TcpListener::from_std(listener, &handle)?;
let local_addr = listener.local_addr()?;
Ok((listener, local_addr))
};
let bind_result = match bind() {
Ok((listener, local_addr)) => {
match local_addr_tx.send(Ok(local_addr)) {
Ok(_) => futures::future::ok((listener, local_addr)),
Err(_) => {
warn!(
"Thread {:?} unable to reach receiver, closing server",
thread::current().name()
);
futures::future::err(())
}
}
}
Err(err) => {
let _send_result = local_addr_tx.send(Err(err));
futures::future::err(())
}
};
bind_result
.and_then(move |(listener, local_addr)| {
let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
let mut http = server::conn::Http::new();
http.keep_alive(keep_alive);
let tcp_stream = SuspendableStream::new(listener.incoming());
tcp_stream
.map(move |socket| {
let service = ServerHandler::new(
jsonrpc_handler.clone(),
cors_domains.clone(),
cors_max_age,
allowed_headers.clone(),
allowed_hosts.clone(),
request_middleware.clone(),
rest_api,
health_api.clone(),
max_request_body_size,
keep_alive,
);
http.serve_connection(socket, service)
.map_err(|e| error!("Error serving connection: {:?}", e))
})
.buffer_unordered(1024)
.for_each(|_| Ok(()))
.map_err(|e| {
warn!("Incoming streams error, closing sever: {:?}", e);
})
.select(shutdown_signal.map_err(|e| {
debug!("Shutdown signaller dropped, closing server: {:?}", e);
}))
.map_err(|_| ())
})
.and_then(|_| done_tx.send(()))
});
}
#[cfg(unix)]
fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
use net2::unix::*;
if reuse {
tcp.reuse_port(true)?;
}
Ok(())
}
#[cfg(not(unix))]
fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
Ok(())
}
#[derive(Clone)]
pub struct CloseHandle(Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>);
impl CloseHandle {
pub fn close(self) {
if let Some(executors) = self.0.lock().take() {
for (executor, closer) in executors {
executor.close();
let _ = closer.send(());
}
}
}
}
pub struct Server {
address: SocketAddr,
executors: Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>,
done: Option<Vec<oneshot::Receiver<()>>>,
}
impl Server {
pub fn address(&self) -> &SocketAddr {
&self.address
}
pub fn close(self) {
self.close_handle().close()
}
pub fn wait(mut self) {
if let Some(receivers) = self.done.take() {
for receiver in receivers {
let _ = receiver.wait();
}
}
}
pub fn close_handle(&self) -> CloseHandle {
CloseHandle(self.executors.clone())
}
}
impl Drop for Server {
fn drop(&mut self) {
self.close_handle().close();
}
}