sc_rpc_server/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Substrate RPC servers.
20
21#![warn(missing_docs)]
22
23pub mod middleware;
24pub mod utils;
25
26use std::{error::Error as StdError, net::SocketAddr, time::Duration};
27
28use jsonrpsee::{
29	core::BoxError,
30	server::{
31		serve_with_graceful_shutdown, stop_channel, ws, PingConfig, ServerHandle, StopHandle,
32	},
33	Methods, RpcModule,
34};
35use tower::Service;
36use utils::{
37	build_rpc_api, deny_unsafe, format_listen_addrs, get_proxy_ip, ListenAddrError, RpcSettings,
38};
39
40pub use ip_network::IpNetwork;
41pub use jsonrpsee::{
42	core::id_providers::{RandomIntegerIdProvider, RandomStringIdProvider},
43	server::{middleware::rpc::RpcServiceBuilder, BatchRequestConfig},
44};
45pub use middleware::{Metrics, MiddlewareLayer, NodeHealthProxyLayer, RpcMetrics};
46pub use utils::{RpcEndpoint, RpcMethods};
47
48const MEGABYTE: u32 = 1024 * 1024;
49
50/// Type to encapsulate the server handle and listening address.
51pub struct Server {
52	/// Handle to the rpc server
53	handle: ServerHandle,
54	/// Listening address of the server
55	listen_addrs: Vec<SocketAddr>,
56}
57
58impl Server {
59	/// Creates a new Server.
60	pub fn new(handle: ServerHandle, listen_addrs: Vec<SocketAddr>) -> Server {
61		Server { handle, listen_addrs }
62	}
63
64	/// Returns the `jsonrpsee::server::ServerHandle` for this Server. Can be used to stop the
65	/// server.
66	pub fn handle(&self) -> &ServerHandle {
67		&self.handle
68	}
69
70	/// The listen address for the running RPC service.
71	pub fn listen_addrs(&self) -> &[SocketAddr] {
72		&self.listen_addrs
73	}
74}
75
76impl Drop for Server {
77	fn drop(&mut self) {
78		// This doesn't not wait for the server to be stopped but fires the signal.
79		let _ = self.handle.stop();
80	}
81}
82
83/// Trait for providing subscription IDs that can be cloned.
84pub trait SubscriptionIdProvider:
85	jsonrpsee::core::traits::IdProvider + dyn_clone::DynClone
86{
87}
88
89dyn_clone::clone_trait_object!(SubscriptionIdProvider);
90
91/// RPC server configuration.
92#[derive(Debug)]
93pub struct Config<M: Send + Sync + 'static> {
94	/// RPC interfaces to start.
95	pub endpoints: Vec<RpcEndpoint>,
96	/// Metrics.
97	pub metrics: Option<RpcMetrics>,
98	/// RPC API.
99	pub rpc_api: RpcModule<M>,
100	/// Subscription ID provider.
101	pub id_provider: Option<Box<dyn SubscriptionIdProvider>>,
102	/// Tokio runtime handle.
103	pub tokio_handle: tokio::runtime::Handle,
104}
105
106#[derive(Debug, Clone)]
107struct PerConnection {
108	methods: Methods,
109	stop_handle: StopHandle,
110	metrics: Option<RpcMetrics>,
111	tokio_handle: tokio::runtime::Handle,
112}
113
114/// Start RPC server listening on given address.
115pub async fn start_server<M>(config: Config<M>) -> Result<Server, Box<dyn StdError + Send + Sync>>
116where
117	M: Send + Sync,
118{
119	let Config { endpoints, metrics, tokio_handle, rpc_api, id_provider } = config;
120
121	let (stop_handle, server_handle) = stop_channel();
122	let cfg = PerConnection {
123		methods: build_rpc_api(rpc_api).into(),
124		metrics,
125		tokio_handle: tokio_handle.clone(),
126		stop_handle,
127	};
128
129	let mut local_addrs = Vec::new();
130
131	for endpoint in endpoints {
132		let allowed_to_fail = endpoint.is_optional;
133		let local_addr = endpoint.listen_addr;
134
135		let mut listener = match endpoint.bind().await {
136			Ok(l) => l,
137			Err(e) if allowed_to_fail => {
138				log::debug!(target: "rpc", "JSON-RPC server failed to bind optional address: {:?}, error: {:?}", local_addr, e);
139				continue;
140			},
141			Err(e) => return Err(e),
142		};
143		let local_addr = listener.local_addr();
144		local_addrs.push(local_addr);
145		let cfg = cfg.clone();
146
147		let mut id_provider2 = id_provider.clone();
148
149		tokio_handle.spawn(async move {
150			loop {
151				let (sock, remote_addr, rpc_cfg) = tokio::select! {
152					res = listener.accept() => {
153						match res {
154							Ok(s) => s,
155							Err(e) => {
156								log::debug!(target: "rpc", "Failed to accept connection: {:?}", e);
157								continue;
158							}
159						}
160					}
161					_ = cfg.stop_handle.clone().shutdown() => break,
162				};
163
164				let RpcSettings {
165					batch_config,
166					max_connections,
167					max_payload_in_mb,
168					max_payload_out_mb,
169					max_buffer_capacity_per_connection,
170					max_subscriptions_per_connection,
171					rpc_methods,
172					rate_limit_trust_proxy_headers,
173					rate_limit_whitelisted_ips,
174					host_filter,
175					cors,
176					rate_limit,
177				} = rpc_cfg;
178
179				let http_middleware = tower::ServiceBuilder::new()
180					.option_layer(host_filter)
181					// Proxy `GET /health, /health/readiness` requests to the internal
182					// `system_health` method.
183					.layer(NodeHealthProxyLayer::default())
184					.layer(cors);
185
186				let mut builder = jsonrpsee::server::Server::builder()
187					.max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
188					.max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
189					.max_connections(max_connections)
190					.max_subscriptions_per_connection(max_subscriptions_per_connection)
191					.enable_ws_ping(
192						PingConfig::new()
193							.ping_interval(Duration::from_secs(30))
194							.inactive_limit(Duration::from_secs(60))
195							.max_failures(3),
196					)
197					.set_http_middleware(http_middleware)
198					.set_message_buffer_capacity(max_buffer_capacity_per_connection)
199					.set_batch_request_config(batch_config)
200					.custom_tokio_runtime(cfg.tokio_handle.clone())
201					.set_id_provider(RandomStringIdProvider::new(16));
202
203				if let Some(provider) = id_provider2.take() {
204					builder = builder.set_id_provider(provider);
205				} else {
206					builder = builder.set_id_provider(RandomStringIdProvider::new(16));
207				};
208
209				let service_builder = builder.to_service_builder();
210				let deny_unsafe = deny_unsafe(&local_addr, &rpc_methods);
211
212				let ip = remote_addr.ip();
213				let cfg2 = cfg.clone();
214				let service_builder2 = service_builder.clone();
215
216				let svc =
217					tower::service_fn(move |mut req: http::Request<hyper::body::Incoming>| {
218						req.extensions_mut().insert(deny_unsafe);
219
220						let PerConnection { methods, metrics, tokio_handle, stop_handle } =
221							cfg2.clone();
222						let service_builder = service_builder2.clone();
223
224						let proxy_ip =
225							if rate_limit_trust_proxy_headers { get_proxy_ip(&req) } else { None };
226
227						let rate_limit_cfg = if rate_limit_whitelisted_ips
228							.iter()
229							.any(|ips| ips.contains(proxy_ip.unwrap_or(ip)))
230						{
231							log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is trusted, disabling rate-limit", proxy_ip);
232							None
233						} else {
234							if !rate_limit_whitelisted_ips.is_empty() {
235								log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is not trusted, rate-limit enabled", proxy_ip);
236							}
237							rate_limit
238						};
239
240						let is_websocket = ws::is_upgrade_request(&req);
241						let transport_label = if is_websocket { "ws" } else { "http" };
242
243						let middleware_layer = match (metrics, rate_limit_cfg) {
244							(None, None) => None,
245							(Some(metrics), None) => Some(
246								MiddlewareLayer::new()
247									.with_metrics(Metrics::new(metrics, transport_label)),
248							),
249							(None, Some(rate_limit)) =>
250								Some(MiddlewareLayer::new().with_rate_limit_per_minute(rate_limit)),
251							(Some(metrics), Some(rate_limit)) => Some(
252								MiddlewareLayer::new()
253									.with_metrics(Metrics::new(metrics, transport_label))
254									.with_rate_limit_per_minute(rate_limit),
255							),
256						};
257
258						let rpc_middleware =
259							RpcServiceBuilder::new().option_layer(middleware_layer.clone());
260						let mut svc = service_builder
261							.set_rpc_middleware(rpc_middleware)
262							.build(methods, stop_handle);
263
264						async move {
265							if is_websocket {
266								let on_disconnect = svc.on_session_closed();
267
268								// Spawn a task to handle when the connection is closed.
269								tokio_handle.spawn(async move {
270									let now = std::time::Instant::now();
271									middleware_layer.as_ref().map(|m| m.ws_connect());
272									on_disconnect.await;
273									middleware_layer.as_ref().map(|m| m.ws_disconnect(now));
274								});
275							}
276
277							// https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
278							// to be `Box<dyn std::error::Error + Send + Sync>` so we need to
279							// convert it to a concrete type as workaround.
280							svc.call(req).await.map_err(|e| BoxError::from(e))
281						}
282					});
283
284				cfg.tokio_handle.spawn(serve_with_graceful_shutdown(
285					sock,
286					svc,
287					cfg.stop_handle.clone().shutdown(),
288				));
289			}
290		});
291	}
292
293	if local_addrs.is_empty() {
294		return Err(Box::new(ListenAddrError));
295	}
296
297	// The previous logging format was before
298	// `Running JSON-RPC server: addr=127.0.0.1:9944, allowed origins=["*"]`
299	//
300	// The new format is `Running JSON-RPC server: addr=<addr1, addr2, .. addr_n>`
301	// with the exception that for a single address it will be `Running JSON-RPC server: addr=addr,`
302	// with a trailing comma.
303	//
304	// This is to make it work with old scripts/utils that parse the logs.
305	log::info!("Running JSON-RPC server: addr={}", format_listen_addrs(&local_addrs));
306
307	Ok(Server::new(server_handle, local_addrs))
308}