1// This file is part of Substrate.
23// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
56// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
1011// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
1516// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
1819//! JSON-RPC specific middleware.
2021use std::{
22 num::NonZeroU32,
23 time::{Duration, Instant},
24};
2526use futures::future::{BoxFuture, FutureExt};
27use governor::{clock::Clock, Jitter};
28use jsonrpsee::{
29 server::middleware::rpc::RpcServiceT,
30 types::{ErrorObject, Id, Request},
31 MethodResponse,
32};
3334mod metrics;
35mod node_health;
36mod rate_limit;
3738pub use metrics::*;
39pub use node_health::*;
40pub use rate_limit::*;
4142const MAX_JITTER: Duration = Duration::from_millis(50);
43const MAX_RETRIES: usize = 10;
4445/// JSON-RPC middleware layer.
46#[derive(Debug, Clone, Default)]
47pub struct MiddlewareLayer {
48 rate_limit: Option<RateLimit>,
49 metrics: Option<Metrics>,
50}
5152impl MiddlewareLayer {
53/// Create an empty MiddlewareLayer.
54pub fn new() -> Self {
55Self::default()
56 }
5758/// Enable new rate limit middleware enforced per minute.
59pub fn with_rate_limit_per_minute(self, n: NonZeroU32) -> Self {
60Self { rate_limit: Some(RateLimit::per_minute(n)), metrics: self.metrics }
61 }
6263/// Enable metrics middleware.
64pub fn with_metrics(self, metrics: Metrics) -> Self {
65Self { rate_limit: self.rate_limit, metrics: Some(metrics) }
66 }
6768/// Register a new websocket connection.
69pub fn ws_connect(&self) {
70self.metrics.as_ref().map(|m| m.ws_connect());
71 }
7273/// Register that a websocket connection was closed.
74pub fn ws_disconnect(&self, now: Instant) {
75self.metrics.as_ref().map(|m| m.ws_disconnect(now));
76 }
77}
7879impl<S> tower::Layer<S> for MiddlewareLayer {
80type Service = Middleware<S>;
8182fn layer(&self, service: S) -> Self::Service {
83 Middleware { service, rate_limit: self.rate_limit.clone(), metrics: self.metrics.clone() }
84 }
85}
8687/// JSON-RPC middleware that handles metrics
88/// and rate-limiting.
89///
90/// These are part of the same middleware
91/// because the metrics needs to know whether
92/// a call was rate-limited or not because
93/// it will impact the roundtrip for a call.
94pub struct Middleware<S> {
95 service: S,
96 rate_limit: Option<RateLimit>,
97 metrics: Option<Metrics>,
98}
99100impl<'a, S> RpcServiceT<'a> for Middleware<S>
101where
102S: Send + Sync + RpcServiceT<'a> + Clone + 'static,
103{
104type Future = BoxFuture<'a, MethodResponse>;
105106fn call(&self, req: Request<'a>) -> Self::Future {
107let now = Instant::now();
108109self.metrics.as_ref().map(|m| m.on_call(&req));
110111let service = self.service.clone();
112let rate_limit = self.rate_limit.clone();
113let metrics = self.metrics.clone();
114115async move {
116let mut is_rate_limited = false;
117118if let Some(limit) = rate_limit.as_ref() {
119let mut attempts = 0;
120let jitter = Jitter::up_to(MAX_JITTER);
121122loop {
123if attempts >= MAX_RETRIES {
124return reject_too_many_calls(req.id);
125 }
126127if let Err(rejected) = limit.inner.check() {
128 tokio::time::sleep(jitter + rejected.wait_time_from(limit.clock.now()))
129 .await;
130 } else {
131break;
132 }
133134 is_rate_limited = true;
135 attempts += 1;
136 }
137 }
138139let rp = service.call(req.clone()).await;
140 metrics.as_ref().map(|m| m.on_response(&req, &rp, is_rate_limited, now));
141142 rp
143 }
144 .boxed()
145 }
146}
147148fn reject_too_many_calls(id: Id) -> MethodResponse {
149 MethodResponse::error(id, ErrorObject::owned(-32999, "RPC rate limit exceeded", None::<()>))
150}