Struct jsonrpsee_server::ServerBuilder

source ·
pub struct ServerBuilder<HttpMiddleware, RpcMiddleware> { /* private fields */ }
Expand description

Builder to configure and create a JSON-RPC server

Implementations§

source§

impl Builder<Identity, Identity>

source

pub fn new() -> Self

Create a default server builder.

source§

impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware>

source

pub fn max_request_body_size(self, size: u32) -> Self

Set the maximum size of a request body in bytes. Default is 10 MiB.

source

pub fn max_response_body_size(self, size: u32) -> Self

Set the maximum size of a response body in bytes. Default is 10 MiB.

source

pub fn max_connections(self, max: u32) -> Self

Set the maximum number of connections allowed. Default is 100.

source

pub fn set_batch_request_config(self, cfg: BatchRequestConfig) -> Self

Configure how batch requests shall be handled by the server.

Default: batch requests are allowed and can be arbitrary big but the maximum payload size is limited.

source

pub fn max_subscriptions_per_connection(self, max: u32) -> Self

Set the maximum number of connections allowed. Default is 1024.

source

pub fn set_rpc_middleware<T>( self, rpc_middleware: RpcServiceBuilder<T>, ) -> Builder<HttpMiddleware, T>

Enable middleware that is invoked on every JSON-RPC call.

The middleware itself is very similar to the tower middleware but it has a different service trait which takes &self instead &mut self which means that you can’t use built-in middleware from tower.

Another consequence of &self is that you must wrap any of the middleware state in a type which is Send and provides interior mutability such Arc<Mutex>.

The builder itself exposes a similar API as the tower::ServiceBuilder where it is possible to compose layers to the middleware.

To add a middleware crate::middleware::rpc::RpcServiceBuilder exposes a few different layer APIs that is wrapped on top of the tower::ServiceBuilder.

When the server is started these layers are wrapped in the crate::middleware::rpc::RpcService and that’s why the service APIs is not exposed.


use std::{time::Instant, net::SocketAddr, sync::Arc};
use std::sync::atomic::{Ordering, AtomicUsize};

use jsonrpsee_server::middleware::rpc::{RpcServiceT, RpcService, RpcServiceBuilder};
use jsonrpsee_server::{ServerBuilder, MethodResponse};
use jsonrpsee_core::async_trait;
use jsonrpsee_types::Request;
use futures_util::future::BoxFuture;

#[derive(Clone)]
struct MyMiddleware<S> {
    service: S,
    count: Arc<AtomicUsize>,
}

impl<'a, S> RpcServiceT<'a> for MyMiddleware<S>
where S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
{
   type Future = BoxFuture<'a, MethodResponse>;

   fn call(&self, req: Request<'a>) -> Self::Future {
        tracing::info!("MyMiddleware processed call {}", req.method);
        let count = self.count.clone();
        let service = self.service.clone();

        Box::pin(async move {
            let rp = service.call(req).await;
            // Modify the state.
            count.fetch_add(1, Ordering::Relaxed);
            rp
        })
   }
}

// Create a state per connection
// NOTE: The service type can be omitted once `start` is called on the server.
let m = RpcServiceBuilder::new().layer_fn(move |service: ()| MyMiddleware { service, count: Arc::new(AtomicUsize::new(0)) });
let builder = ServerBuilder::default().set_rpc_middleware(m);
source

pub fn custom_tokio_runtime(self, rt: Handle) -> Self

Configure a custom tokio::runtime::Handle to run the server on.

Default: tokio::spawn

source

pub fn enable_ws_ping(self, config: PingConfig) -> Self

Enable WebSocket ping/pong on the server.

Default: pings are disabled.

§Examples
use std::{time::Duration, num::NonZeroUsize};
use jsonrpsee_server::{ServerBuilder, PingConfig};

// Set the ping interval to 10 seconds but terminates the connection if a client is inactive for more than 2 minutes
let ping_cfg = PingConfig::new().ping_interval(Duration::from_secs(10)).inactive_limit(Duration::from_secs(60 * 2));
let builder = ServerBuilder::default().enable_ws_ping(ping_cfg);
source

pub fn disable_ws_ping(self) -> Self

Disable WebSocket ping/pong on the server.

Default: pings are disabled.

source

pub fn set_id_provider<I: IdProvider + 'static>(self, id_provider: I) -> Self

Configure custom subscription ID provider for the server to use to when getting new subscription calls.

You may choose static dispatch or dynamic dispatch because IdProvider is implemented for Box<T>.

Default: RandomIntegerIdProvider.

§Examples
use jsonrpsee_server::{ServerBuilder, RandomStringIdProvider, IdProvider};

// static dispatch
let builder1 = ServerBuilder::default().set_id_provider(RandomStringIdProvider::new(16));

// or dynamic dispatch
let builder2 = ServerBuilder::default().set_id_provider(Box::new(RandomStringIdProvider::new(16)));
source

pub fn set_http_middleware<T>( self, http_middleware: ServiceBuilder<T>, ) -> Builder<T, RpcMiddleware>

Configure a custom tower::ServiceBuilder middleware for composing layers to be applied to the RPC service.

Default: No tower layers are applied to the RPC service.

§Examples

use std::time::Duration;
use std::net::SocketAddr;

#[tokio::main]
async fn main() {
    let builder = tower::ServiceBuilder::new().timeout(Duration::from_secs(2));

    let server = jsonrpsee_server::ServerBuilder::new()
        .set_http_middleware(builder)
        .build("127.0.0.1:0".parse::<SocketAddr>().unwrap())
        .await
        .unwrap();
}
source

pub fn set_tcp_no_delay(self, no_delay: bool) -> Self

Configure TCP_NODELAY on the socket to the supplied value nodelay.

Default is true.

source

pub fn http_only(self) -> Self

Configure the server to only serve JSON-RPC HTTP requests.

Default: both http and ws are enabled.

source

pub fn ws_only(self) -> Self

Configure the server to only serve JSON-RPC WebSocket requests.

That implies that server just denies HTTP requests which isn’t a WebSocket upgrade request

Default: both http and ws are enabled.

source

pub fn set_message_buffer_capacity(self, c: u32) -> Self

The server enforces backpressure which means that n messages can be buffered and if the client can’t keep with up the server.

This capacity is applied per connection and applies globally on the connection which implies all JSON-RPC messages.

For example if a subscription produces plenty of new items and the client can’t keep up then no new messages are handled.

If this limit is exceeded then the server will “back-off” and only accept new messages once the client reads pending messages.

§Panics

Panics if the buffer capacity is 0.

source

pub fn to_service_builder( self, ) -> TowerServiceBuilder<RpcMiddleware, HttpMiddleware>

Convert the server builder to a TowerServiceBuilder.

This can be used to utilize the TowerService from jsonrpsee.

§Examples
use jsonrpsee_server::{Methods, ServerHandle, ws, stop_channel, serve_with_graceful_shutdown};
use tower::Service;
use std::{error::Error as StdError, net::SocketAddr};
use futures_util::future::{self, Either};
use hyper_util::rt::{TokioIo, TokioExecutor};

fn run_server() -> ServerHandle {
    let (stop_handle, server_handle) = stop_channel();
    let svc_builder = jsonrpsee_server::Server::builder().max_connections(33).to_service_builder();
    let methods = Methods::new();
    let stop_handle = stop_handle.clone();

    tokio::spawn(async move {
        let listener = tokio::net::TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).await.unwrap();

        loop {
             // The `tokio::select!` macro is used to wait for either of the
             // listeners to accept a new connection or for the server to be
             // stopped.
             let (sock, remote_addr) = tokio::select! {
                 res = listener.accept() => {
                     match res {
                        Ok(sock) => sock,
                        Err(e) => {
                            tracing::error!("failed to accept v4 connection: {:?}", e);
                            continue;
                        }
                      }
                 }
                 _ = stop_handle.clone().shutdown() => break,
             };

             let stop_handle2 = stop_handle.clone();
             let svc_builder2 = svc_builder.clone();
             let methods2 = methods.clone();

             let svc = tower::service_fn(move |req| {
                  let stop_handle = stop_handle2.clone();
                  let svc_builder = svc_builder2.clone();
                  let methods = methods2.clone();

                  let mut svc = svc_builder.build(methods, stop_handle.clone());

                  // It's not possible to know whether the websocket upgrade handshake failed or not here.
                  let is_websocket = ws::is_upgrade_request(&req);

                  if is_websocket {
                      println!("websocket")
                  } else {
                      println!("http")
                  }

                  // Call the jsonrpsee service which
                  // may upgrade it to a WebSocket connection
                  // or treat it as "ordinary HTTP request".
                  async move { svc.call(req).await }
              });

              // Upgrade the connection to a HTTP service with graceful shutdown.
              tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle.clone().shutdown()));
         }
    });

    server_handle
}
source

pub async fn build( self, addrs: impl ToSocketAddrs, ) -> Result<Server<HttpMiddleware, RpcMiddleware>>

Finalize the configuration of the server. Consumes the Builder.

#[tokio::main]
async fn main() {
  let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
  let occupied_addr = listener.local_addr().unwrap();
  let addrs: &[std::net::SocketAddr] = &[
      occupied_addr,
      "127.0.0.1:0".parse().unwrap(),
  ];
  assert!(jsonrpsee_server::ServerBuilder::default().build(occupied_addr).await.is_err());
  assert!(jsonrpsee_server::ServerBuilder::default().build(addrs).await.is_ok());
}
source

pub fn build_from_tcp( self, listener: impl Into<StdTcpListener>, ) -> Result<Server<HttpMiddleware, RpcMiddleware>>

Finalizes the configuration of the server with customized TCP settings on the socket.

use jsonrpsee_server::Server;
use socket2::{Domain, Socket, Type};
use std::time::Duration;

#[tokio::main]
async fn main() {
  let addr = "127.0.0.1:0".parse().unwrap();
  let domain = Domain::for_address(addr);
  let socket = Socket::new(domain, Type::STREAM, None).unwrap();
  socket.set_nonblocking(true).unwrap();

  let address = addr.into();
  socket.bind(&address).unwrap();

  socket.listen(4096).unwrap();

  let server = Server::builder().build_from_tcp(socket).unwrap();
}

Trait Implementations§

source§

impl<HttpMiddleware: Debug, RpcMiddleware: Debug> Debug for Builder<HttpMiddleware, RpcMiddleware>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl Default for Builder<Identity, Identity>

source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

§

impl<HttpMiddleware, RpcMiddleware> Freeze for Builder<HttpMiddleware, RpcMiddleware>
where HttpMiddleware: Freeze, RpcMiddleware: Freeze,

§

impl<HttpMiddleware, RpcMiddleware> !RefUnwindSafe for Builder<HttpMiddleware, RpcMiddleware>

§

impl<HttpMiddleware, RpcMiddleware> Send for Builder<HttpMiddleware, RpcMiddleware>
where HttpMiddleware: Send, RpcMiddleware: Send,

§

impl<HttpMiddleware, RpcMiddleware> Sync for Builder<HttpMiddleware, RpcMiddleware>
where HttpMiddleware: Sync, RpcMiddleware: Sync,

§

impl<HttpMiddleware, RpcMiddleware> Unpin for Builder<HttpMiddleware, RpcMiddleware>
where HttpMiddleware: Unpin, RpcMiddleware: Unpin,

§

impl<HttpMiddleware, RpcMiddleware> !UnwindSafe for Builder<HttpMiddleware, RpcMiddleware>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

impl<T> MaybeSend for T
where T: Send,