jsonrpsee_http_client/
transport.rs

1// Implementation note: hyper's API is not adapted to async/await at all, and there's
2// unfortunately a lot of boilerplate here that could be removed once/if it gets reworked.
3//
4// Additionally, despite the fact that hyper is capable of performing requests to multiple different
5// servers through the same `hyper::Client`, we don't use that feature on purpose. The reason is
6// that we need to be guaranteed that hyper doesn't re-use an existing connection if we ever reset
7// the JSON-RPC request id to a value that might have already been used.
8
9use base64::Engine;
10use hyper::body::Bytes;
11use hyper::http::{HeaderMap, HeaderValue};
12use hyper_util::client::legacy::connect::HttpConnector;
13use hyper_util::client::legacy::Client;
14use hyper_util::rt::TokioExecutor;
15use jsonrpsee_core::tracing::client::{rx_log_from_bytes, tx_log_from_str};
16use jsonrpsee_core::BoxError;
17use jsonrpsee_core::{
18	http_helpers::{self, HttpError},
19	TEN_MB_SIZE_BYTES,
20};
21use std::future::Future;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24use thiserror::Error;
25use tower::layer::util::Identity;
26use tower::{Layer, Service, ServiceExt};
27use url::Url;
28
29use crate::{HttpBody, HttpRequest, HttpResponse};
30
31#[cfg(feature = "tls")]
32use crate::{CertificateStore, CustomCertStore};
33
34const CONTENT_TYPE_JSON: &str = "application/json";
35
36/// Wrapper over HTTP transport and connector.
37#[derive(Debug)]
38pub enum HttpBackend<B = HttpBody> {
39	/// Hyper client with https connector.
40	#[cfg(feature = "tls")]
41	Https(Client<hyper_rustls::HttpsConnector<HttpConnector>, B>),
42	/// Hyper client with http connector.
43	Http(Client<HttpConnector, B>),
44}
45
46impl<B> Clone for HttpBackend<B> {
47	fn clone(&self) -> Self {
48		match self {
49			Self::Http(inner) => Self::Http(inner.clone()),
50			#[cfg(feature = "tls")]
51			Self::Https(inner) => Self::Https(inner.clone()),
52		}
53	}
54}
55
56impl<B> tower::Service<HttpRequest<B>> for HttpBackend<B>
57where
58	B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
59	B::Data: Send,
60	B::Error: Into<BoxError>,
61{
62	type Response = HttpResponse<hyper::body::Incoming>;
63	type Error = Error;
64	type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
65
66	fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
67		match self {
68			Self::Http(inner) => inner.poll_ready(ctx),
69			#[cfg(feature = "tls")]
70			Self::Https(inner) => inner.poll_ready(ctx),
71		}
72		.map_err(|e| Error::Http(HttpError::Stream(e.into())))
73	}
74
75	fn call(&mut self, req: HttpRequest<B>) -> Self::Future {
76		let resp = match self {
77			Self::Http(inner) => inner.call(req),
78			#[cfg(feature = "tls")]
79			Self::Https(inner) => inner.call(req),
80		};
81
82		Box::pin(async move { resp.await.map_err(|e| Error::Http(HttpError::Stream(e.into()))) })
83	}
84}
85
86/// Builder for [`HttpTransportClient`].
87#[derive(Debug)]
88pub struct HttpTransportClientBuilder<L> {
89	/// Certificate store.
90	#[cfg(feature = "tls")]
91	pub(crate) certificate_store: CertificateStore,
92	/// Configurable max request body size
93	pub(crate) max_request_size: u32,
94	/// Configurable max response body size
95	pub(crate) max_response_size: u32,
96	/// Max length for logging for requests and responses
97	///
98	/// Logs bigger than this limit will be truncated.
99	pub(crate) max_log_length: u32,
100	/// Custom headers to pass with every request.
101	pub(crate) headers: HeaderMap,
102	/// Service builder
103	pub(crate) service_builder: tower::ServiceBuilder<L>,
104	/// TCP_NODELAY
105	pub(crate) tcp_no_delay: bool,
106}
107
108impl Default for HttpTransportClientBuilder<Identity> {
109	fn default() -> Self {
110		Self::new()
111	}
112}
113
114impl HttpTransportClientBuilder<Identity> {
115	/// Create a new [`HttpTransportClientBuilder`].
116	pub fn new() -> Self {
117		Self {
118			#[cfg(feature = "tls")]
119			certificate_store: CertificateStore::Native,
120			max_request_size: TEN_MB_SIZE_BYTES,
121			max_response_size: TEN_MB_SIZE_BYTES,
122			max_log_length: 1024,
123			headers: HeaderMap::new(),
124			service_builder: tower::ServiceBuilder::new(),
125			tcp_no_delay: true,
126		}
127	}
128}
129
130impl<L> HttpTransportClientBuilder<L> {
131	/// See docs [`crate::HttpClientBuilder::with_custom_cert_store`] for more information.
132	#[cfg(feature = "tls")]
133	pub fn with_custom_cert_store(mut self, cfg: CustomCertStore) -> Self {
134		self.certificate_store = CertificateStore::Custom(cfg);
135		self
136	}
137
138	/// Set the maximum size of a request body in bytes. Default is 10 MiB.
139	pub fn max_request_size(mut self, size: u32) -> Self {
140		self.max_request_size = size;
141		self
142	}
143
144	/// Set the maximum size of a response in bytes. Default is 10 MiB.
145	pub fn max_response_size(mut self, size: u32) -> Self {
146		self.max_response_size = size;
147		self
148	}
149
150	/// Set a custom header passed to the server with every request (default is none).
151	///
152	/// The caller is responsible for checking that the headers do not conflict or are duplicated.
153	pub fn set_headers(mut self, headers: HeaderMap) -> Self {
154		self.headers = headers;
155		self
156	}
157
158	/// Configure `TCP_NODELAY` on the socket to the supplied value `nodelay`.
159	///
160	/// Default is `true`.
161	pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
162		self.tcp_no_delay = no_delay;
163		self
164	}
165
166	/// Max length for logging for requests and responses in number characters.
167	///
168	/// Logs bigger than this limit will be truncated.
169	pub fn set_max_logging_length(mut self, max: u32) -> Self {
170		self.max_log_length = max;
171		self
172	}
173
174	/// Configure a tower service.
175	pub fn set_service<T>(self, service: tower::ServiceBuilder<T>) -> HttpTransportClientBuilder<T> {
176		HttpTransportClientBuilder {
177			#[cfg(feature = "tls")]
178			certificate_store: self.certificate_store,
179			headers: self.headers,
180			max_log_length: self.max_log_length,
181			max_request_size: self.max_request_size,
182			max_response_size: self.max_response_size,
183			service_builder: service,
184			tcp_no_delay: self.tcp_no_delay,
185		}
186	}
187
188	/// Build a [`HttpTransportClient`].
189	pub fn build<S, B>(self, target: impl AsRef<str>) -> Result<HttpTransportClient<S>, Error>
190	where
191		L: Layer<HttpBackend, Service = S>,
192		S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
193		B: http_body::Body<Data = Bytes> + Send + 'static,
194		B::Data: Send,
195		B::Error: Into<BoxError>,
196	{
197		let Self {
198			#[cfg(feature = "tls")]
199			certificate_store,
200			max_request_size,
201			max_response_size,
202			max_log_length,
203			headers,
204			service_builder,
205			tcp_no_delay,
206		} = self;
207		let mut url = Url::parse(target.as_ref()).map_err(|e| Error::Url(format!("Invalid URL: {e}")))?;
208
209		if url.host_str().is_none() {
210			return Err(Error::Url("Invalid host".into()));
211		}
212		url.set_fragment(None);
213
214		let client = match url.scheme() {
215			"http" => {
216				let mut connector = HttpConnector::new();
217				connector.set_nodelay(tcp_no_delay);
218				HttpBackend::Http(Client::builder(TokioExecutor::new()).build(connector))
219			}
220			#[cfg(feature = "tls")]
221			"https" => {
222				// Make sure that the TLS provider is set. If not, set a default one.
223				// Otherwise, creating `tls` configuration may panic if there are multiple
224				// providers available due to `rustls` features (e.g. both `ring` and `aws-lc-rs`).
225				// Function returns an error if the provider is already installed, and we're fine with it.
226				let _ = rustls::crypto::ring::default_provider().install_default();
227
228				let mut http_conn = HttpConnector::new();
229				http_conn.set_nodelay(tcp_no_delay);
230				http_conn.enforce_http(false);
231
232				let https_conn = match certificate_store {
233					CertificateStore::Native => hyper_rustls::HttpsConnectorBuilder::new()
234						.with_tls_config(rustls_platform_verifier::tls_config())
235						.https_or_http()
236						.enable_all_versions()
237						.wrap_connector(http_conn),
238
239					CertificateStore::Custom(tls_config) => hyper_rustls::HttpsConnectorBuilder::new()
240						.with_tls_config(tls_config)
241						.https_or_http()
242						.enable_all_versions()
243						.wrap_connector(http_conn),
244				};
245
246				HttpBackend::Https(Client::builder(TokioExecutor::new()).build(https_conn))
247			}
248			_ => {
249				#[cfg(feature = "tls")]
250				let err = "URL scheme not supported, expects 'http' or 'https'";
251				#[cfg(not(feature = "tls"))]
252				let err = "URL scheme not supported, expects 'http'";
253				return Err(Error::Url(err.into()));
254			}
255		};
256
257		// Cache request headers: 2 default headers, followed by user custom headers.
258		// Maintain order for headers in case of duplicate keys:
259		// https://datatracker.ietf.org/doc/html/rfc7230#section-3.2.2
260		let mut cached_headers = HeaderMap::with_capacity(2 + headers.len());
261		cached_headers.insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static(CONTENT_TYPE_JSON));
262		cached_headers.insert(hyper::header::ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON));
263		for (key, value) in headers.into_iter() {
264			if let Some(key) = key {
265				cached_headers.insert(key, value);
266			}
267		}
268
269		if let Some(pwd) = url.password() {
270			if !cached_headers.contains_key(hyper::header::AUTHORIZATION) {
271				let digest = base64::engine::general_purpose::STANDARD.encode(format!("{}:{pwd}", url.username()));
272				cached_headers.insert(
273					hyper::header::AUTHORIZATION,
274					HeaderValue::from_str(&format!("Basic {digest}"))
275						.map_err(|_| Error::Url("Header value `authorization basic user:pwd` invalid".into()))?,
276				);
277			}
278		}
279
280		Ok(HttpTransportClient {
281			target: url.as_str().to_owned(),
282			client: service_builder.service(client),
283			max_request_size,
284			max_response_size,
285			max_log_length,
286			headers: cached_headers,
287		})
288	}
289}
290
291/// HTTP Transport Client.
292#[derive(Debug, Clone)]
293pub struct HttpTransportClient<S> {
294	/// Target to connect to.
295	target: String,
296	/// HTTP client
297	client: S,
298	/// Configurable max request body size
299	max_request_size: u32,
300	/// Configurable max response body size
301	max_response_size: u32,
302	/// Max length for logging for requests and responses
303	///
304	/// Logs bigger than this limit will be truncated.
305	max_log_length: u32,
306	/// Custom headers to pass with every request.
307	headers: HeaderMap,
308}
309
310impl<B, S> HttpTransportClient<S>
311where
312	S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
313	B: http_body::Body<Data = Bytes> + Send + 'static,
314	B::Data: Send,
315	B::Error: Into<BoxError>,
316{
317	async fn inner_send(&self, body: String) -> Result<HttpResponse<B>, Error> {
318		if body.len() > self.max_request_size as usize {
319			return Err(Error::RequestTooLarge);
320		}
321
322		let mut req = HttpRequest::post(&self.target);
323		if let Some(headers) = req.headers_mut() {
324			*headers = self.headers.clone();
325		}
326
327		let req = req.body(body.into()).expect("URI and request headers are valid; qed");
328		let response = self.client.clone().ready().await?.call(req).await?;
329
330		if response.status().is_success() {
331			Ok(response)
332		} else {
333			Err(Error::Rejected { status_code: response.status().into() })
334		}
335	}
336
337	/// Send serialized message and wait until all bytes from the HTTP message body have been read.
338	pub(crate) async fn send_and_read_body(&self, body: String) -> Result<Vec<u8>, Error> {
339		tx_log_from_str(&body, self.max_log_length);
340
341		let response = self.inner_send(body).await?;
342		let (parts, body) = response.into_parts();
343
344		let (body, _is_single) = http_helpers::read_body(&parts.headers, body, self.max_response_size).await?;
345
346		rx_log_from_bytes(&body, self.max_log_length);
347
348		Ok(body)
349	}
350
351	/// Send serialized message without reading the HTTP message body.
352	pub(crate) async fn send(&self, body: String) -> Result<(), Error> {
353		let _ = self.inner_send(body).await?;
354
355		Ok(())
356	}
357}
358
359/// Error that can happen during a request.
360#[derive(Debug, Error)]
361pub enum Error {
362	/// Invalid URL.
363	#[error("Invalid Url: {0}")]
364	Url(String),
365
366	/// Error during the HTTP request, including networking errors and HTTP protocol errors.
367	#[error(transparent)]
368	Http(#[from] HttpError),
369
370	/// Server returned a non-success status code.
371	#[error("Request rejected `{status_code}`")]
372	Rejected {
373		/// HTTP Status code returned by the server.
374		status_code: u16,
375	},
376
377	/// Request body too large.
378	#[error("The request body was too large")]
379	RequestTooLarge,
380
381	/// Invalid certificate store.
382	#[error("Invalid certificate store")]
383	InvalidCertficateStore,
384}
385
386#[cfg(test)]
387mod tests {
388	use super::*;
389
390	#[test]
391	fn invalid_http_url_rejected() {
392		let err = HttpTransportClientBuilder::new().build("ws://localhost:9933").unwrap_err();
393		assert!(matches!(err, Error::Url(_)));
394	}
395
396	#[cfg(feature = "tls")]
397	#[test]
398	fn https_works() {
399		let client = HttpTransportClientBuilder::new().build("https://localhost").unwrap();
400		assert_eq!(&client.target, "https://localhost/");
401	}
402
403	#[cfg(not(feature = "tls"))]
404	#[test]
405	fn https_fails_without_tls_feature() {
406		let err = HttpTransportClientBuilder::new().build("https://localhost").unwrap_err();
407		assert!(matches!(err, Error::Url(_)));
408	}
409
410	#[test]
411	fn faulty_port() {
412		let err = HttpTransportClientBuilder::new().build("http://localhost:-43").unwrap_err();
413		assert!(matches!(err, Error::Url(_)));
414
415		let err = HttpTransportClientBuilder::new().build("http://localhost:-99999").unwrap_err();
416		assert!(matches!(err, Error::Url(_)));
417	}
418
419	#[test]
420	fn url_with_path_works() {
421		let client = HttpTransportClientBuilder::new().build("http://localhost/my-special-path").unwrap();
422		assert_eq!(&client.target, "http://localhost/my-special-path");
423	}
424
425	#[test]
426	fn url_with_query_works() {
427		let client = HttpTransportClientBuilder::new().build("http://127.0.0.1/my?name1=value1&name2=value2").unwrap();
428		assert_eq!(&client.target, "http://127.0.0.1/my?name1=value1&name2=value2");
429	}
430
431	#[test]
432	fn url_with_fragment_is_ignored() {
433		let client = HttpTransportClientBuilder::new().build("http://127.0.0.1/my.htm#ignore").unwrap();
434		assert_eq!(&client.target, "http://127.0.0.1/my.htm");
435	}
436
437	#[test]
438	fn url_default_port_is_omitted() {
439		let client = HttpTransportClientBuilder::new().build("http://127.0.0.1:80").unwrap();
440		assert_eq!(&client.target, "http://127.0.0.1/");
441	}
442
443	#[cfg(feature = "tls")]
444	#[test]
445	fn https_custom_port_works() {
446		let client = HttpTransportClientBuilder::new().build("https://localhost:9999").unwrap();
447		assert_eq!(&client.target, "https://localhost:9999/");
448	}
449
450	#[test]
451	fn http_custom_port_works() {
452		let client = HttpTransportClientBuilder::new().build("http://localhost:9999").unwrap();
453		assert_eq!(&client.target, "http://localhost:9999/");
454	}
455
456	#[tokio::test]
457	async fn request_limit_works() {
458		let eighty_bytes_limit = 80;
459		let fifty_bytes_limit = 50;
460
461		let client = HttpTransportClientBuilder::new()
462			.max_request_size(eighty_bytes_limit)
463			.max_response_size(fifty_bytes_limit)
464			.build("http://localhost:9933")
465			.unwrap();
466
467		assert_eq!(client.max_request_size, eighty_bytes_limit);
468		assert_eq!(client.max_response_size, fifty_bytes_limit);
469
470		let body = "a".repeat(81);
471		assert_eq!(body.len(), 81);
472		let response = client.send(body).await.unwrap_err();
473		assert!(matches!(response, Error::RequestTooLarge));
474	}
475}