1use base64::Engine;
10use hyper::body::Bytes;
11use hyper::http::{HeaderMap, HeaderValue};
12use hyper_util::client::legacy::connect::HttpConnector;
13use hyper_util::client::legacy::Client;
14use hyper_util::rt::TokioExecutor;
15use jsonrpsee_core::tracing::client::{rx_log_from_bytes, tx_log_from_str};
16use jsonrpsee_core::BoxError;
17use jsonrpsee_core::{
18 http_helpers::{self, HttpError},
19 TEN_MB_SIZE_BYTES,
20};
21use std::future::Future;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24use thiserror::Error;
25use tower::layer::util::Identity;
26use tower::{Layer, Service, ServiceExt};
27use url::Url;
28
29use crate::{HttpBody, HttpRequest, HttpResponse};
30
31#[cfg(feature = "tls")]
32use crate::{CertificateStore, CustomCertStore};
33
34const CONTENT_TYPE_JSON: &str = "application/json";
35
36#[derive(Debug)]
38pub enum HttpBackend<B = HttpBody> {
39 #[cfg(feature = "tls")]
41 Https(Client<hyper_rustls::HttpsConnector<HttpConnector>, B>),
42 Http(Client<HttpConnector, B>),
44}
45
46impl<B> Clone for HttpBackend<B> {
47 fn clone(&self) -> Self {
48 match self {
49 Self::Http(inner) => Self::Http(inner.clone()),
50 #[cfg(feature = "tls")]
51 Self::Https(inner) => Self::Https(inner.clone()),
52 }
53 }
54}
55
56impl<B> tower::Service<HttpRequest<B>> for HttpBackend<B>
57where
58 B: http_body::Body<Data = Bytes> + Send + Unpin + 'static,
59 B::Data: Send,
60 B::Error: Into<BoxError>,
61{
62 type Response = HttpResponse<hyper::body::Incoming>;
63 type Error = Error;
64 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
65
66 fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
67 match self {
68 Self::Http(inner) => inner.poll_ready(ctx),
69 #[cfg(feature = "tls")]
70 Self::Https(inner) => inner.poll_ready(ctx),
71 }
72 .map_err(|e| Error::Http(HttpError::Stream(e.into())))
73 }
74
75 fn call(&mut self, req: HttpRequest<B>) -> Self::Future {
76 let resp = match self {
77 Self::Http(inner) => inner.call(req),
78 #[cfg(feature = "tls")]
79 Self::Https(inner) => inner.call(req),
80 };
81
82 Box::pin(async move { resp.await.map_err(|e| Error::Http(HttpError::Stream(e.into()))) })
83 }
84}
85
86#[derive(Debug)]
88pub struct HttpTransportClientBuilder<L> {
89 #[cfg(feature = "tls")]
91 pub(crate) certificate_store: CertificateStore,
92 pub(crate) max_request_size: u32,
94 pub(crate) max_response_size: u32,
96 pub(crate) max_log_length: u32,
100 pub(crate) headers: HeaderMap,
102 pub(crate) service_builder: tower::ServiceBuilder<L>,
104 pub(crate) tcp_no_delay: bool,
106}
107
108impl Default for HttpTransportClientBuilder<Identity> {
109 fn default() -> Self {
110 Self::new()
111 }
112}
113
114impl HttpTransportClientBuilder<Identity> {
115 pub fn new() -> Self {
117 Self {
118 #[cfg(feature = "tls")]
119 certificate_store: CertificateStore::Native,
120 max_request_size: TEN_MB_SIZE_BYTES,
121 max_response_size: TEN_MB_SIZE_BYTES,
122 max_log_length: 1024,
123 headers: HeaderMap::new(),
124 service_builder: tower::ServiceBuilder::new(),
125 tcp_no_delay: true,
126 }
127 }
128}
129
130impl<L> HttpTransportClientBuilder<L> {
131 #[cfg(feature = "tls")]
133 pub fn with_custom_cert_store(mut self, cfg: CustomCertStore) -> Self {
134 self.certificate_store = CertificateStore::Custom(cfg);
135 self
136 }
137
138 pub fn max_request_size(mut self, size: u32) -> Self {
140 self.max_request_size = size;
141 self
142 }
143
144 pub fn max_response_size(mut self, size: u32) -> Self {
146 self.max_response_size = size;
147 self
148 }
149
150 pub fn set_headers(mut self, headers: HeaderMap) -> Self {
154 self.headers = headers;
155 self
156 }
157
158 pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
162 self.tcp_no_delay = no_delay;
163 self
164 }
165
166 pub fn set_max_logging_length(mut self, max: u32) -> Self {
170 self.max_log_length = max;
171 self
172 }
173
174 pub fn set_service<T>(self, service: tower::ServiceBuilder<T>) -> HttpTransportClientBuilder<T> {
176 HttpTransportClientBuilder {
177 #[cfg(feature = "tls")]
178 certificate_store: self.certificate_store,
179 headers: self.headers,
180 max_log_length: self.max_log_length,
181 max_request_size: self.max_request_size,
182 max_response_size: self.max_response_size,
183 service_builder: service,
184 tcp_no_delay: self.tcp_no_delay,
185 }
186 }
187
188 pub fn build<S, B>(self, target: impl AsRef<str>) -> Result<HttpTransportClient<S>, Error>
190 where
191 L: Layer<HttpBackend, Service = S>,
192 S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
193 B: http_body::Body<Data = Bytes> + Send + 'static,
194 B::Data: Send,
195 B::Error: Into<BoxError>,
196 {
197 let Self {
198 #[cfg(feature = "tls")]
199 certificate_store,
200 max_request_size,
201 max_response_size,
202 max_log_length,
203 headers,
204 service_builder,
205 tcp_no_delay,
206 } = self;
207 let mut url = Url::parse(target.as_ref()).map_err(|e| Error::Url(format!("Invalid URL: {e}")))?;
208
209 if url.host_str().is_none() {
210 return Err(Error::Url("Invalid host".into()));
211 }
212 url.set_fragment(None);
213
214 let client = match url.scheme() {
215 "http" => {
216 let mut connector = HttpConnector::new();
217 connector.set_nodelay(tcp_no_delay);
218 HttpBackend::Http(Client::builder(TokioExecutor::new()).build(connector))
219 }
220 #[cfg(feature = "tls")]
221 "https" => {
222 let _ = rustls::crypto::ring::default_provider().install_default();
227
228 let mut http_conn = HttpConnector::new();
229 http_conn.set_nodelay(tcp_no_delay);
230 http_conn.enforce_http(false);
231
232 let https_conn = match certificate_store {
233 CertificateStore::Native => hyper_rustls::HttpsConnectorBuilder::new()
234 .with_tls_config(rustls_platform_verifier::tls_config())
235 .https_or_http()
236 .enable_all_versions()
237 .wrap_connector(http_conn),
238
239 CertificateStore::Custom(tls_config) => hyper_rustls::HttpsConnectorBuilder::new()
240 .with_tls_config(tls_config)
241 .https_or_http()
242 .enable_all_versions()
243 .wrap_connector(http_conn),
244 };
245
246 HttpBackend::Https(Client::builder(TokioExecutor::new()).build(https_conn))
247 }
248 _ => {
249 #[cfg(feature = "tls")]
250 let err = "URL scheme not supported, expects 'http' or 'https'";
251 #[cfg(not(feature = "tls"))]
252 let err = "URL scheme not supported, expects 'http'";
253 return Err(Error::Url(err.into()));
254 }
255 };
256
257 let mut cached_headers = HeaderMap::with_capacity(2 + headers.len());
261 cached_headers.insert(hyper::header::CONTENT_TYPE, HeaderValue::from_static(CONTENT_TYPE_JSON));
262 cached_headers.insert(hyper::header::ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON));
263 for (key, value) in headers.into_iter() {
264 if let Some(key) = key {
265 cached_headers.insert(key, value);
266 }
267 }
268
269 if let Some(pwd) = url.password() {
270 if !cached_headers.contains_key(hyper::header::AUTHORIZATION) {
271 let digest = base64::engine::general_purpose::STANDARD.encode(format!("{}:{pwd}", url.username()));
272 cached_headers.insert(
273 hyper::header::AUTHORIZATION,
274 HeaderValue::from_str(&format!("Basic {digest}"))
275 .map_err(|_| Error::Url("Header value `authorization basic user:pwd` invalid".into()))?,
276 );
277 }
278 }
279
280 Ok(HttpTransportClient {
281 target: url.as_str().to_owned(),
282 client: service_builder.service(client),
283 max_request_size,
284 max_response_size,
285 max_log_length,
286 headers: cached_headers,
287 })
288 }
289}
290
291#[derive(Debug, Clone)]
293pub struct HttpTransportClient<S> {
294 target: String,
296 client: S,
298 max_request_size: u32,
300 max_response_size: u32,
302 max_log_length: u32,
306 headers: HeaderMap,
308}
309
310impl<B, S> HttpTransportClient<S>
311where
312 S: Service<HttpRequest, Response = HttpResponse<B>, Error = Error> + Clone,
313 B: http_body::Body<Data = Bytes> + Send + 'static,
314 B::Data: Send,
315 B::Error: Into<BoxError>,
316{
317 async fn inner_send(&self, body: String) -> Result<HttpResponse<B>, Error> {
318 if body.len() > self.max_request_size as usize {
319 return Err(Error::RequestTooLarge);
320 }
321
322 let mut req = HttpRequest::post(&self.target);
323 if let Some(headers) = req.headers_mut() {
324 *headers = self.headers.clone();
325 }
326
327 let req = req.body(body.into()).expect("URI and request headers are valid; qed");
328 let response = self.client.clone().ready().await?.call(req).await?;
329
330 if response.status().is_success() {
331 Ok(response)
332 } else {
333 Err(Error::Rejected { status_code: response.status().into() })
334 }
335 }
336
337 pub(crate) async fn send_and_read_body(&self, body: String) -> Result<Vec<u8>, Error> {
339 tx_log_from_str(&body, self.max_log_length);
340
341 let response = self.inner_send(body).await?;
342 let (parts, body) = response.into_parts();
343
344 let (body, _is_single) = http_helpers::read_body(&parts.headers, body, self.max_response_size).await?;
345
346 rx_log_from_bytes(&body, self.max_log_length);
347
348 Ok(body)
349 }
350
351 pub(crate) async fn send(&self, body: String) -> Result<(), Error> {
353 let _ = self.inner_send(body).await?;
354
355 Ok(())
356 }
357}
358
359#[derive(Debug, Error)]
361pub enum Error {
362 #[error("Invalid Url: {0}")]
364 Url(String),
365
366 #[error(transparent)]
368 Http(#[from] HttpError),
369
370 #[error("Request rejected `{status_code}`")]
372 Rejected {
373 status_code: u16,
375 },
376
377 #[error("The request body was too large")]
379 RequestTooLarge,
380
381 #[error("Invalid certificate store")]
383 InvalidCertficateStore,
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389
390 #[test]
391 fn invalid_http_url_rejected() {
392 let err = HttpTransportClientBuilder::new().build("ws://localhost:9933").unwrap_err();
393 assert!(matches!(err, Error::Url(_)));
394 }
395
396 #[cfg(feature = "tls")]
397 #[test]
398 fn https_works() {
399 let client = HttpTransportClientBuilder::new().build("https://localhost").unwrap();
400 assert_eq!(&client.target, "https://localhost/");
401 }
402
403 #[cfg(not(feature = "tls"))]
404 #[test]
405 fn https_fails_without_tls_feature() {
406 let err = HttpTransportClientBuilder::new().build("https://localhost").unwrap_err();
407 assert!(matches!(err, Error::Url(_)));
408 }
409
410 #[test]
411 fn faulty_port() {
412 let err = HttpTransportClientBuilder::new().build("http://localhost:-43").unwrap_err();
413 assert!(matches!(err, Error::Url(_)));
414
415 let err = HttpTransportClientBuilder::new().build("http://localhost:-99999").unwrap_err();
416 assert!(matches!(err, Error::Url(_)));
417 }
418
419 #[test]
420 fn url_with_path_works() {
421 let client = HttpTransportClientBuilder::new().build("http://localhost/my-special-path").unwrap();
422 assert_eq!(&client.target, "http://localhost/my-special-path");
423 }
424
425 #[test]
426 fn url_with_query_works() {
427 let client = HttpTransportClientBuilder::new().build("http://127.0.0.1/my?name1=value1&name2=value2").unwrap();
428 assert_eq!(&client.target, "http://127.0.0.1/my?name1=value1&name2=value2");
429 }
430
431 #[test]
432 fn url_with_fragment_is_ignored() {
433 let client = HttpTransportClientBuilder::new().build("http://127.0.0.1/my.htm#ignore").unwrap();
434 assert_eq!(&client.target, "http://127.0.0.1/my.htm");
435 }
436
437 #[test]
438 fn url_default_port_is_omitted() {
439 let client = HttpTransportClientBuilder::new().build("http://127.0.0.1:80").unwrap();
440 assert_eq!(&client.target, "http://127.0.0.1/");
441 }
442
443 #[cfg(feature = "tls")]
444 #[test]
445 fn https_custom_port_works() {
446 let client = HttpTransportClientBuilder::new().build("https://localhost:9999").unwrap();
447 assert_eq!(&client.target, "https://localhost:9999/");
448 }
449
450 #[test]
451 fn http_custom_port_works() {
452 let client = HttpTransportClientBuilder::new().build("http://localhost:9999").unwrap();
453 assert_eq!(&client.target, "http://localhost:9999/");
454 }
455
456 #[tokio::test]
457 async fn request_limit_works() {
458 let eighty_bytes_limit = 80;
459 let fifty_bytes_limit = 50;
460
461 let client = HttpTransportClientBuilder::new()
462 .max_request_size(eighty_bytes_limit)
463 .max_response_size(fifty_bytes_limit)
464 .build("http://localhost:9933")
465 .unwrap();
466
467 assert_eq!(client.max_request_size, eighty_bytes_limit);
468 assert_eq!(client.max_response_size, fifty_bytes_limit);
469
470 let body = "a".repeat(81);
471 assert_eq!(body.len(), 81);
472 let response = client.send(body).await.unwrap_err();
473 assert!(matches!(response, Error::RequestTooLarge));
474 }
475}