use crate::BoxError;
use bytes::{Buf, Bytes};
use http_body::Frame;
use http_body_util::{BodyExt, Limited};
use std::{
pin::Pin,
task::{Context, Poll},
};
pub type Request<T = Body> = http::Request<T>;
pub type Response<T = Body> = http::Response<T>;
#[derive(Debug, Default)]
pub struct Body(http_body_util::combinators::UnsyncBoxBody<Bytes, BoxError>);
impl Body {
pub fn empty() -> Self {
Self::default()
}
pub fn new<B>(body: B) -> Self
where
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<BoxError>,
{
Self(body.map_err(|e| e.into()).boxed_unsync())
}
}
impl From<String> for Body {
fn from(s: String) -> Self {
let body = http_body_util::Full::from(s);
Self::new(body)
}
}
impl From<&'static str> for Body {
fn from(s: &'static str) -> Self {
let body = http_body_util::Full::from(s);
Self::new(body)
}
}
impl From<Vec<u8>> for Body {
fn from(bytes: Vec<u8>) -> Self {
let body = http_body_util::Full::from(bytes);
Self::new(body)
}
}
impl http_body::Body for Body {
type Data = Bytes;
type Error = BoxError;
#[inline]
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut self.0).poll_frame(cx)
}
#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.0.size_hint()
}
#[inline]
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}
}
#[derive(Debug, thiserror::Error)]
pub enum HttpError {
#[error("The HTTP message was too big")]
TooLarge,
#[error("Malformed request")]
Malformed,
#[error(transparent)]
Stream(#[from] BoxError),
}
pub async fn read_body<B>(headers: &http::HeaderMap, body: B, max_body_size: u32) -> Result<(Vec<u8>, bool), HttpError>
where
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
let body_size = read_header_content_length(headers).unwrap_or(0);
if body_size > max_body_size {
return Err(HttpError::TooLarge);
}
futures_util::pin_mut!(body);
let mut received_data = Vec::with_capacity(std::cmp::min(body_size as usize, 16 * 1024));
let mut limited_body = Limited::new(body, max_body_size as usize);
let mut is_single = None;
while let Some(frame_or_err) = limited_body.frame().await {
let frame = frame_or_err.map_err(HttpError::Stream)?;
let Some(data) = frame.data_ref() else {
continue;
};
if received_data.is_empty() {
let first_non_whitespace =
data.chunk().iter().enumerate().take(128).find(|(_, byte)| !byte.is_ascii_whitespace());
let skip = match first_non_whitespace {
Some((idx, b'{')) => {
is_single = Some(true);
idx
}
Some((idx, b'[')) => {
is_single = Some(false);
idx
}
_ => return Err(HttpError::Malformed),
};
received_data.extend_from_slice(&data.chunk()[skip..]);
} else {
received_data.extend_from_slice(data.chunk());
}
}
match is_single {
Some(single) if !received_data.is_empty() => {
tracing::trace!(
target: "jsonrpsee-http",
"HTTP response body: {}",
std::str::from_utf8(&received_data).unwrap_or("Invalid UTF-8 data")
);
Ok((received_data, single))
}
_ => Err(HttpError::Malformed),
}
}
fn read_header_content_length(headers: &http::header::HeaderMap) -> Option<u32> {
let length = read_header_value(headers, http::header::CONTENT_LENGTH)?;
length.parse::<u32>().ok()
}
pub fn read_header_value(headers: &http::header::HeaderMap, header_name: http::header::HeaderName) -> Option<&str> {
let mut values = headers.get_all(header_name).iter();
let val = values.next()?;
if values.next().is_none() {
val.to_str().ok()
} else {
None
}
}
pub fn read_header_values<'a>(
headers: &'a http::header::HeaderMap,
header_name: &str,
) -> http::header::GetAll<'a, http::header::HeaderValue> {
headers.get_all(header_name)
}
#[cfg(test)]
mod tests {
use super::{read_body, read_header_content_length, HttpError};
use http_body_util::BodyExt;
type Body = http_body_util::Full<bytes::Bytes>;
#[tokio::test]
async fn body_to_bytes_size_limit_works() {
let headers = http::header::HeaderMap::new();
let full_body = Body::from(vec![0; 128]);
let body = full_body.map_err(|e| HttpError::Stream(e.into()));
assert!(read_body(&headers, body, 127).await.is_err());
}
#[test]
fn read_content_length_works() {
let mut headers = http::header::HeaderMap::new();
headers.insert(http::header::CONTENT_LENGTH, "177".parse().unwrap());
assert_eq!(read_header_content_length(&headers), Some(177));
headers.append(http::header::CONTENT_LENGTH, "999".parse().unwrap());
assert_eq!(read_header_content_length(&headers), None);
}
#[test]
fn read_content_length_too_big_value() {
let mut headers = http::header::HeaderMap::new();
headers.insert(http::header::CONTENT_LENGTH, "18446744073709551616".parse().unwrap());
assert_eq!(read_header_content_length(&headers), None);
}
}