1use crate::BatchRequestConfig;
22use std::{
23 error::Error as StdError,
24 net::{IpAddr, SocketAddr},
25 num::NonZeroU32,
26 str::FromStr,
27};
28
29use forwarded_header_value::ForwardedHeaderValue;
30use http::header::{HeaderName, HeaderValue};
31use ip_network::IpNetwork;
32use jsonrpsee::{server::middleware::http::HostFilterLayer, RpcModule};
33use sc_rpc_api::DenyUnsafe;
34use tower_http::cors::{AllowOrigin, CorsLayer};
35
36const X_FORWARDED_FOR: HeaderName = HeaderName::from_static("x-forwarded-for");
37const X_REAL_IP: HeaderName = HeaderName::from_static("x-real-ip");
38const FORWARDED: HeaderName = HeaderName::from_static("forwarded");
39
40#[derive(Debug)]
41pub(crate) struct ListenAddrError;
42
43impl std::error::Error for ListenAddrError {}
44
45impl std::fmt::Display for ListenAddrError {
46 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
47 write!(f, "No listen address was successfully bound")
48 }
49}
50
51#[derive(Debug, Copy, Clone)]
53pub enum RpcMethods {
54 Safe,
56 Unsafe,
58 Auto,
60}
61
62impl Default for RpcMethods {
63 fn default() -> Self {
64 RpcMethods::Auto
65 }
66}
67
68impl FromStr for RpcMethods {
69 type Err = String;
70
71 fn from_str(s: &str) -> Result<Self, Self::Err> {
72 match s {
73 "safe" => Ok(RpcMethods::Safe),
74 "unsafe" => Ok(RpcMethods::Unsafe),
75 "auto" => Ok(RpcMethods::Auto),
76 invalid => Err(format!("Invalid rpc methods {invalid}")),
77 }
78 }
79}
80
81#[derive(Debug, Clone)]
82pub(crate) struct RpcSettings {
83 pub(crate) batch_config: BatchRequestConfig,
84 pub(crate) max_connections: u32,
85 pub(crate) max_payload_in_mb: u32,
86 pub(crate) max_payload_out_mb: u32,
87 pub(crate) max_subscriptions_per_connection: u32,
88 pub(crate) max_buffer_capacity_per_connection: u32,
89 pub(crate) rpc_methods: RpcMethods,
90 pub(crate) rate_limit: Option<NonZeroU32>,
91 pub(crate) rate_limit_trust_proxy_headers: bool,
92 pub(crate) rate_limit_whitelisted_ips: Vec<IpNetwork>,
93 pub(crate) cors: CorsLayer,
94 pub(crate) host_filter: Option<HostFilterLayer>,
95}
96
97#[derive(Debug, Clone)]
99pub struct RpcEndpoint {
100 pub listen_addr: SocketAddr,
102 pub batch_config: BatchRequestConfig,
104 pub max_connections: u32,
106 pub max_payload_in_mb: u32,
108 pub max_payload_out_mb: u32,
110 pub max_subscriptions_per_connection: u32,
112 pub max_buffer_capacity_per_connection: u32,
114 pub rate_limit: Option<NonZeroU32>,
116 pub rate_limit_trust_proxy_headers: bool,
118 pub rate_limit_whitelisted_ips: Vec<IpNetwork>,
120 pub cors: Option<Vec<String>>,
122 pub rpc_methods: RpcMethods,
124 pub is_optional: bool,
128 pub retry_random_port: bool,
130}
131
132impl RpcEndpoint {
133 pub(crate) async fn bind(self) -> Result<Listener, Box<dyn StdError + Send + Sync>> {
135 let listener = match tokio::net::TcpListener::bind(self.listen_addr).await {
136 Ok(listener) => listener,
137 Err(_) if self.retry_random_port => {
138 let mut addr = self.listen_addr;
139 addr.set_port(0);
140
141 tokio::net::TcpListener::bind(addr).await?
142 },
143 Err(e) => return Err(e.into()),
144 };
145 let local_addr = listener.local_addr()?;
146 let host_filter = host_filtering(self.cors.is_some(), local_addr);
147 let cors = try_into_cors(self.cors)?;
148
149 Ok(Listener {
150 listener,
151 local_addr,
152 cfg: RpcSettings {
153 batch_config: self.batch_config,
154 max_connections: self.max_connections,
155 max_payload_in_mb: self.max_payload_in_mb,
156 max_payload_out_mb: self.max_payload_out_mb,
157 max_subscriptions_per_connection: self.max_subscriptions_per_connection,
158 max_buffer_capacity_per_connection: self.max_buffer_capacity_per_connection,
159 rpc_methods: self.rpc_methods,
160 rate_limit: self.rate_limit,
161 rate_limit_trust_proxy_headers: self.rate_limit_trust_proxy_headers,
162 rate_limit_whitelisted_ips: self.rate_limit_whitelisted_ips,
163 host_filter,
164 cors,
165 },
166 })
167 }
168}
169
170pub(crate) struct Listener {
172 listener: tokio::net::TcpListener,
173 local_addr: SocketAddr,
174 cfg: RpcSettings,
175}
176
177impl Listener {
178 pub(crate) async fn accept(
180 &mut self,
181 ) -> std::io::Result<(tokio::net::TcpStream, SocketAddr, RpcSettings)> {
182 let (sock, remote_addr) = self.listener.accept().await?;
183 Ok((sock, remote_addr, self.cfg.clone()))
184 }
185
186 pub fn local_addr(&self) -> SocketAddr {
188 self.local_addr
189 }
190}
191
192pub(crate) fn host_filtering(enabled: bool, addr: SocketAddr) -> Option<HostFilterLayer> {
193 if enabled {
194 let mut hosts = Vec::new();
197
198 if addr.is_ipv4() {
199 hosts.push(format!("localhost:{}", addr.port()));
200 hosts.push(format!("127.0.0.1:{}", addr.port()));
201 } else {
202 hosts.push(format!("[::1]:{}", addr.port()));
203 }
204
205 Some(HostFilterLayer::new(hosts).expect("Valid hosts; qed"))
206 } else {
207 None
208 }
209}
210
211pub(crate) fn build_rpc_api<M: Send + Sync + 'static>(mut rpc_api: RpcModule<M>) -> RpcModule<M> {
212 let mut available_methods = rpc_api.method_names().collect::<Vec<_>>();
213 available_methods.push("rpc_methods");
215 available_methods.sort();
216
217 rpc_api
218 .register_method("rpc_methods", move |_, _, _| {
219 serde_json::json!({
220 "methods": available_methods,
221 })
222 })
223 .expect("infallible all other methods have their own address space; qed");
224
225 rpc_api
226}
227
228pub(crate) fn try_into_cors(
229 maybe_cors: Option<Vec<String>>,
230) -> Result<CorsLayer, Box<dyn StdError + Send + Sync>> {
231 if let Some(cors) = maybe_cors {
232 let mut list = Vec::new();
233
234 for origin in cors {
235 list.push(HeaderValue::from_str(&origin)?)
236 }
237
238 Ok(CorsLayer::new().allow_origin(AllowOrigin::list(list)))
239 } else {
240 Ok(CorsLayer::permissive())
242 }
243}
244
245pub(crate) fn get_proxy_ip<B>(req: &http::Request<B>) -> Option<IpAddr> {
252 if let Some(ip) = req
253 .headers()
254 .get(&FORWARDED)
255 .and_then(|v| v.to_str().ok())
256 .and_then(|v| ForwardedHeaderValue::from_forwarded(v).ok())
257 .and_then(|v| v.remotest_forwarded_for_ip())
258 {
259 return Some(ip);
260 }
261
262 if let Some(ip) = req
263 .headers()
264 .get(&X_FORWARDED_FOR)
265 .and_then(|v| v.to_str().ok())
266 .and_then(|v| ForwardedHeaderValue::from_x_forwarded_for(v).ok())
267 .and_then(|v| v.remotest_forwarded_for_ip())
268 {
269 return Some(ip);
270 }
271
272 if let Some(ip) = req
273 .headers()
274 .get(&X_REAL_IP)
275 .and_then(|v| v.to_str().ok())
276 .and_then(|v| IpAddr::from_str(v).ok())
277 {
278 return Some(ip);
279 }
280
281 None
282}
283
284pub fn deny_unsafe(addr: &SocketAddr, methods: &RpcMethods) -> DenyUnsafe {
286 match (addr.ip().is_loopback(), methods) {
287 (_, RpcMethods::Unsafe) | (true, RpcMethods::Auto) => DenyUnsafe::No,
288 _ => DenyUnsafe::Yes,
289 }
290}
291
292pub(crate) fn format_listen_addrs(addr: &[SocketAddr]) -> String {
293 let mut s = String::new();
294
295 let mut it = addr.iter().peekable();
296
297 while let Some(addr) = it.next() {
298 s.push_str(&addr.to_string());
299
300 if it.peek().is_some() {
301 s.push(',');
302 }
303 }
304
305 if addr.len() == 1 {
306 s.push(',');
307 }
308
309 s
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use hyper::header::HeaderValue;
316 use jsonrpsee::server::{HttpBody, HttpRequest};
317
318 fn request() -> http::Request<HttpBody> {
319 HttpRequest::builder().body(HttpBody::empty()).unwrap()
320 }
321
322 #[test]
323 fn empty_works() {
324 let req = request();
325 let host = get_proxy_ip(&req);
326 assert!(host.is_none())
327 }
328
329 #[test]
330 fn host_from_x_real_ip() {
331 let mut req = request();
332
333 req.headers_mut().insert(&X_REAL_IP, HeaderValue::from_static("127.0.0.1"));
334 let ip = get_proxy_ip(&req);
335 assert_eq!(Some(IpAddr::from_str("127.0.0.1").unwrap()), ip);
336 }
337
338 #[test]
339 fn ip_from_forwarded_works() {
340 let mut req = request();
341
342 req.headers_mut().insert(
343 &FORWARDED,
344 HeaderValue::from_static("for=192.0.2.60;proto=http;by=203.0.113.43;host=example.com"),
345 );
346 let ip = get_proxy_ip(&req);
347 assert_eq!(Some(IpAddr::from_str("192.0.2.60").unwrap()), ip);
348 }
349
350 #[test]
351 fn ip_from_forwarded_multiple() {
352 let mut req = request();
353
354 req.headers_mut().append(&FORWARDED, HeaderValue::from_static("for=127.0.0.1"));
355 req.headers_mut().append(&FORWARDED, HeaderValue::from_static("for=192.0.2.60"));
356 req.headers_mut().append(&FORWARDED, HeaderValue::from_static("for=192.0.2.61"));
357 let ip = get_proxy_ip(&req);
358 assert_eq!(Some(IpAddr::from_str("127.0.0.1").unwrap()), ip);
359 }
360
361 #[test]
362 fn ip_from_x_forwarded_works() {
363 let mut req = request();
364
365 req.headers_mut()
366 .insert(&X_FORWARDED_FOR, HeaderValue::from_static("127.0.0.1,192.0.2.60,0.0.0.1"));
367 let ip = get_proxy_ip(&req);
368 assert_eq!(Some(IpAddr::from_str("127.0.0.1").unwrap()), ip);
369 }
370}