use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::{HttpBody, HttpRequest};
use futures_util::future::{self, Either};
use hyper_util::rt::{TokioExecutor, TokioIo};
use jsonrpsee_core::BoxError;
use pin_project::pin_project;
use tower::util::Oneshot;
use tower::ServiceExt;
#[derive(Debug, Copy, Clone)]
pub(crate) struct TowerToHyperService<S> {
service: S,
}
impl<S> TowerToHyperService<S> {
pub(crate) fn new(service: S) -> Self {
Self { service }
}
}
impl<S> hyper::service::Service<HttpRequest<hyper::body::Incoming>> for TowerToHyperService<S>
where
S: tower::Service<HttpRequest> + Clone,
{
type Response = S::Response;
type Error = S::Error;
type Future = TowerToHyperServiceFuture<S, HttpRequest>;
fn call(&self, req: HttpRequest<hyper::body::Incoming>) -> Self::Future {
let req = req.map(HttpBody::new);
TowerToHyperServiceFuture { future: self.service.clone().oneshot(req) }
}
}
#[pin_project]
pub(crate) struct TowerToHyperServiceFuture<S, R>
where
S: tower::Service<R>,
{
#[pin]
future: Oneshot<S, R>,
}
impl<S, R> std::future::Future for TowerToHyperServiceFuture<S, R>
where
S: tower::Service<R>,
{
type Output = Result<S::Response, S::Error>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().future.poll(cx)
}
}
pub async fn serve<S, B, I>(io: I, service: S) -> Result<(), BoxError>
where
S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
S::Future: Send,
S::Response: Send,
S::Error: Into<BoxError>,
B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
let service = hyper_util::service::TowerToHyperService::new(service);
let io = TokioIo::new(io);
let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(io, service);
conn.await
}
pub async fn serve_with_graceful_shutdown<S, B, I>(
io: I,
service: S,
stopped: impl Future<Output = ()>,
) -> Result<(), BoxError>
where
S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
S::Future: Send,
S::Response: Send,
S::Error: Into<BoxError>,
B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
let service = hyper_util::service::TowerToHyperService::new(service);
let io = TokioIo::new(io);
let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(io, service);
tokio::pin!(stopped, conn);
match future::select(conn, stopped).await {
Either::Left((conn, _)) => conn,
Either::Right((_, mut conn)) => {
conn.as_mut().graceful_shutdown();
conn.await
}
}
}
pub(crate) mod deserialize {
pub(crate) fn from_slice_with_extensions(
data: &[u8],
extensions: http::Extensions,
) -> Result<jsonrpsee_types::Request, serde_json::Error> {
let mut req: jsonrpsee_types::Request = serde_json::from_slice(data)?;
*req.extensions_mut() = extensions;
Ok(req)
}
pub(crate) fn from_str_with_extensions(
data: &str,
extensions: http::Extensions,
) -> Result<jsonrpsee_types::Request, serde_json::Error> {
let mut req: jsonrpsee_types::Request = serde_json::from_str(data)?;
*req.extensions_mut() = extensions;
Ok(req)
}
}