use crate::api::timestamp;
use bytes::buf::{Buf, Reader};
use fnv::FnvHashMap;
use futures::{channel::mpsc, future, prelude::*};
use http_body_util::{combinators::BoxBody, StreamBody};
use hyper::body::Body as _;
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use hyper_util::{client::legacy as client, rt::TokioExecutor};
use once_cell::sync::Lazy;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Timestamp};
use std::{
fmt,
io::Read as _,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
const LOG_TARGET: &str = "offchain-worker::http";
pub type Body = BoxBody<hyper::body::Bytes, hyper::Error>;
type Sender = mpsc::Sender<Result<hyper::body::Frame<hyper::body::Bytes>, hyper::Error>>;
type Receiver = mpsc::Receiver<Result<hyper::body::Frame<hyper::body::Bytes>, hyper::Error>>;
type HyperClient = client::Client<HttpsConnector<client::connect::HttpConnector>, Body>;
type LazyClient = Lazy<HyperClient, Box<dyn FnOnce() -> HyperClient + Send>>;
#[derive(Clone)]
pub struct SharedClient(Arc<LazyClient>);
impl SharedClient {
pub fn new() -> std::io::Result<Self> {
let builder = HttpsConnectorBuilder::new()
.with_provider_and_native_roots(rustls::crypto::ring::default_provider())?;
Ok(Self(Arc::new(Lazy::new(Box::new(|| {
let connector = builder.https_or_http().enable_http1().enable_http2().build();
client::Client::builder(TokioExecutor::new()).build(connector)
})))))
}
}
pub fn http(shared_client: SharedClient) -> (HttpApi, HttpWorker) {
let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker", 100_000);
let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api", 100_000);
let api = HttpApi {
to_worker,
from_worker: from_worker.fuse(),
next_id: HttpRequestId(rand::random::<u16>() % 2000),
requests: FnvHashMap::default(),
};
let engine =
HttpWorker { to_api, from_api, http_client: shared_client.0, requests: Vec::new() };
(api, engine)
}
pub struct HttpApi {
to_worker: TracingUnboundedSender<ApiToWorker>,
from_worker: stream::Fuse<TracingUnboundedReceiver<WorkerToApi>>,
next_id: HttpRequestId,
requests: FnvHashMap<HttpRequestId, HttpApiRequest>,
}
enum HttpApiRequest {
NotDispatched(hyper::Request<Body>, Sender),
Dispatched(Option<Sender>),
Response(HttpApiRequestRp),
Fail(client::Error),
}
struct HttpApiRequestRp {
sending_body: Option<Sender>,
status_code: hyper::StatusCode,
headers: hyper::HeaderMap,
body: stream::Fuse<Receiver>,
current_read_chunk: Option<Reader<hyper::body::Bytes>>,
}
impl HttpApi {
pub fn request_start(&mut self, method: &str, uri: &str) -> Result<HttpRequestId, ()> {
let (body_sender, receiver) = mpsc::channel(0);
let body = StreamBody::new(receiver);
let body = BoxBody::new(body);
let mut request = hyper::Request::new(body);
*request.method_mut() = hyper::Method::from_bytes(method.as_bytes()).map_err(|_| ())?;
*request.uri_mut() = hyper::Uri::from_maybe_shared(uri.to_owned()).map_err(|_| ())?;
let new_id = self.next_id;
debug_assert!(!self.requests.contains_key(&new_id));
match self.next_id.0.checked_add(1) {
Some(new_id) => self.next_id.0 = new_id,
None => {
tracing::error!(
target: LOG_TARGET,
"Overflow in offchain worker HTTP request ID assignment"
);
return Err(());
},
};
self.requests
.insert(new_id, HttpApiRequest::NotDispatched(request, body_sender));
tracing::trace!(
target: LOG_TARGET,
id = %new_id.0,
%method,
%uri,
"Requested started",
);
Ok(new_id)
}
pub fn request_add_header(
&mut self,
request_id: HttpRequestId,
name: &str,
value: &str,
) -> Result<(), ()> {
let request = match self.requests.get_mut(&request_id) {
Some(&mut HttpApiRequest::NotDispatched(ref mut rq, _)) => rq,
_ => return Err(()),
};
let header_name = hyper::header::HeaderName::try_from(name).map_err(drop)?;
let header_value = hyper::header::HeaderValue::try_from(value).map_err(drop)?;
request.headers_mut().append(header_name, header_value);
tracing::debug!(target: LOG_TARGET, id = %request_id.0, %name, %value, "Added header to request");
Ok(())
}
pub fn request_write_body(
&mut self,
request_id: HttpRequestId,
chunk: &[u8],
deadline: Option<Timestamp>,
) -> Result<(), HttpError> {
let mut request = self.requests.remove(&request_id).ok_or(HttpError::Invalid)?;
let mut deadline = timestamp::deadline_to_future(deadline);
let mut poll_sender = move |sender: &mut Sender| -> Result<(), HttpError> {
let mut when_ready = future::maybe_done(future::poll_fn(|cx| sender.poll_ready(cx)));
futures::executor::block_on(future::select(&mut when_ready, &mut deadline));
match when_ready {
future::MaybeDone::Done(Ok(())) => {},
future::MaybeDone::Done(Err(_)) => return Err(HttpError::IoError),
future::MaybeDone::Future(_) | future::MaybeDone::Gone => {
debug_assert!(matches!(deadline, future::MaybeDone::Done(..)));
return Err(HttpError::DeadlineReached);
},
};
futures::executor::block_on(
async {
future::poll_fn(|cx| sender.poll_ready(cx)).await?;
sender.start_send(Ok(hyper::body::Frame::data(hyper::body::Bytes::from(chunk.to_owned()))))
}
)
.map_err(|_| {
tracing::error!(target: "offchain-worker::http", "HTTP sender refused data despite being ready");
HttpError::IoError
})
};
loop {
request = match request {
HttpApiRequest::NotDispatched(request, sender) => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Added new body chunk");
let _ = self
.to_worker
.unbounded_send(ApiToWorker::Dispatch { id: request_id, request });
HttpApiRequest::Dispatched(Some(sender))
},
HttpApiRequest::Dispatched(Some(mut sender)) => {
if !chunk.is_empty() {
match poll_sender(&mut sender) {
Err(HttpError::IoError) => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Encountered io error while trying to add new chunk to body");
return Err(HttpError::IoError);
},
other => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, res = ?other, "Added chunk to body");
self.requests
.insert(request_id, HttpApiRequest::Dispatched(Some(sender)));
return other;
},
}
} else {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Finished writing body");
self.requests.insert(request_id, HttpApiRequest::Dispatched(None));
return Ok(());
}
},
HttpApiRequest::Response(
mut response @ HttpApiRequestRp { sending_body: Some(_), .. },
) => {
if !chunk.is_empty() {
match poll_sender(
response
.sending_body
.as_mut()
.expect("Can only enter this match branch if Some; qed"),
) {
Err(HttpError::IoError) => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Encountered io error while trying to add new chunk to body");
return Err(HttpError::IoError);
},
other => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, res = ?other, "Added chunk to body");
self.requests
.insert(request_id, HttpApiRequest::Response(response));
return other;
},
}
} else {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Finished writing body");
self.requests.insert(
request_id,
HttpApiRequest::Response(HttpApiRequestRp {
sending_body: None,
..response
}),
);
return Ok(());
}
},
HttpApiRequest::Fail(error) => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, ?error, "Request failed");
return Err(HttpError::IoError);
},
v @ HttpApiRequest::Dispatched(None) |
v @ HttpApiRequest::Response(HttpApiRequestRp { sending_body: None, .. }) => {
tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Body sending already finished");
self.requests.insert(request_id, v);
return Err(HttpError::Invalid);
},
}
}
}
pub fn response_wait(
&mut self,
ids: &[HttpRequestId],
deadline: Option<Timestamp>,
) -> Vec<HttpRequestStatus> {
for id in ids {
match self.requests.get_mut(id) {
Some(HttpApiRequest::NotDispatched(_, _)) => {},
Some(HttpApiRequest::Dispatched(sending_body)) |
Some(HttpApiRequest::Response(HttpApiRequestRp { sending_body, .. })) => {
let _ = sending_body.take();
continue;
},
_ => continue,
};
let (request, _sender) = match self.requests.remove(id) {
Some(HttpApiRequest::NotDispatched(rq, s)) => (rq, s),
_ => unreachable!("we checked for NotDispatched above; qed"),
};
let _ = self.to_worker.unbounded_send(ApiToWorker::Dispatch { id: *id, request });
self.requests.insert(*id, HttpApiRequest::Dispatched(None));
}
let mut deadline = timestamp::deadline_to_future(deadline);
loop {
{
let mut output = Vec::with_capacity(ids.len());
let mut must_wait_more = false;
for id in ids {
output.push(match self.requests.get(id) {
None => HttpRequestStatus::Invalid,
Some(HttpApiRequest::NotDispatched(_, _)) => unreachable!(
"we replaced all the NotDispatched with Dispatched earlier; qed"
),
Some(HttpApiRequest::Dispatched(_)) => {
must_wait_more = true;
HttpRequestStatus::DeadlineReached
},
Some(HttpApiRequest::Fail(_)) => HttpRequestStatus::IoError,
Some(HttpApiRequest::Response(HttpApiRequestRp {
status_code, ..
})) => HttpRequestStatus::Finished(status_code.as_u16()),
});
}
debug_assert_eq!(output.len(), ids.len());
let is_done =
if let future::MaybeDone::Done(_) = deadline { true } else { !must_wait_more };
if is_done {
debug_assert_eq!(output.len(), ids.len());
for n in (0..ids.len()).rev() {
match output[n] {
HttpRequestStatus::IoError => {
self.requests.remove(&ids[n]);
},
HttpRequestStatus::Invalid => {
tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Unknown request");
},
HttpRequestStatus::DeadlineReached => {
tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Deadline reached");
},
HttpRequestStatus::Finished(_) => {
tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Request finished");
},
}
}
return output;
}
}
let next_message = {
let mut next_msg = future::maybe_done(self.from_worker.next());
futures::executor::block_on(future::select(&mut next_msg, &mut deadline));
if let future::MaybeDone::Done(msg) = next_msg {
msg
} else {
debug_assert!(matches!(deadline, future::MaybeDone::Done(..)));
continue;
}
};
match next_message {
Some(WorkerToApi::Response { id, status_code, headers, body }) => {
match self.requests.remove(&id) {
Some(HttpApiRequest::Dispatched(sending_body)) => {
self.requests.insert(
id,
HttpApiRequest::Response(HttpApiRequestRp {
sending_body,
status_code,
headers,
body: body.fuse(),
current_read_chunk: None,
}),
);
},
None => {}, _ => {
tracing::error!(target: "offchain-worker::http", "State mismatch between the API and worker")
},
}
},
Some(WorkerToApi::Fail { id, error }) => match self.requests.remove(&id) {
Some(HttpApiRequest::Dispatched(_)) => {
tracing::debug!(target: LOG_TARGET, id = %id.0, ?error, "Request failed");
self.requests.insert(id, HttpApiRequest::Fail(error));
},
None => {}, _ => {
tracing::error!(target: "offchain-worker::http", "State mismatch between the API and worker")
},
},
None => {
tracing::error!(target: "offchain-worker::http", "Worker has crashed");
return ids.iter().map(|_| HttpRequestStatus::IoError).collect();
},
}
}
}
pub fn response_headers(&mut self, request_id: HttpRequestId) -> Vec<(Vec<u8>, Vec<u8>)> {
let _ = self.response_wait(&[request_id], Some(timestamp::now()));
let headers = match self.requests.get(&request_id) {
Some(HttpApiRequest::Response(HttpApiRequestRp { headers, .. })) => headers,
_ => return Vec::new(),
};
headers
.iter()
.map(|(name, value)| (name.as_str().as_bytes().to_owned(), value.as_bytes().to_owned()))
.collect()
}
pub fn response_read_body(
&mut self,
request_id: HttpRequestId,
buffer: &mut [u8],
deadline: Option<Timestamp>,
) -> Result<usize, HttpError> {
let _ = self.response_wait(&[request_id], deadline);
let mut response = match self.requests.remove(&request_id) {
Some(HttpApiRequest::Response(r)) => r,
Some(rq @ HttpApiRequest::Dispatched(_)) => {
self.requests.insert(request_id, rq);
return Err(HttpError::DeadlineReached);
},
Some(HttpApiRequest::Fail { .. }) => return Err(HttpError::IoError),
Some(rq @ HttpApiRequest::NotDispatched(_, _)) => {
self.requests.insert(request_id, rq);
return Err(HttpError::Invalid);
},
None => return Err(HttpError::Invalid),
};
let mut deadline = timestamp::deadline_to_future(deadline);
loop {
if let Some(mut current_read_chunk) = response.current_read_chunk.take() {
match current_read_chunk.read(buffer) {
Ok(0) => {},
Ok(n) => {
self.requests.insert(
request_id,
HttpApiRequest::Response(HttpApiRequestRp {
current_read_chunk: Some(current_read_chunk),
..response
}),
);
return Ok(n);
},
Err(err) => {
tracing::error!(target: "offchain-worker::http", "Failed to read from current read chunk: {:?}", err);
return Err(HttpError::IoError);
},
}
}
let mut next_body = future::maybe_done(response.body.next());
futures::executor::block_on(future::select(&mut next_body, &mut deadline));
if let future::MaybeDone::Done(next_body) = next_body {
match next_body {
Some(Ok(chunk)) =>
if let Ok(chunk) = chunk.into_data() {
response.current_read_chunk = Some(chunk.reader());
},
Some(Err(_)) => return Err(HttpError::IoError),
None => return Ok(0), }
}
if let future::MaybeDone::Done(_) = deadline {
self.requests.insert(request_id, HttpApiRequest::Response(response));
return Err(HttpError::DeadlineReached);
}
}
}
}
impl fmt::Debug for HttpApi {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list().entries(self.requests.iter()).finish()
}
}
impl fmt::Debug for HttpApiRequest {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
HttpApiRequest::NotDispatched(_, _) =>
f.debug_tuple("HttpApiRequest::NotDispatched").finish(),
HttpApiRequest::Dispatched(_) => f.debug_tuple("HttpApiRequest::Dispatched").finish(),
HttpApiRequest::Response(HttpApiRequestRp { status_code, headers, .. }) => f
.debug_tuple("HttpApiRequest::Response")
.field(status_code)
.field(headers)
.finish(),
HttpApiRequest::Fail(err) => f.debug_tuple("HttpApiRequest::Fail").field(err).finish(),
}
}
}
enum ApiToWorker {
Dispatch {
id: HttpRequestId,
request: hyper::Request<Body>,
},
}
enum WorkerToApi {
Response {
id: HttpRequestId,
status_code: hyper::StatusCode,
headers: hyper::HeaderMap,
body: Receiver,
},
Fail {
id: HttpRequestId,
error: client::Error,
},
}
pub struct HttpWorker {
to_api: TracingUnboundedSender<WorkerToApi>,
from_api: TracingUnboundedReceiver<ApiToWorker>,
http_client: Arc<LazyClient>,
requests: Vec<(HttpRequestId, HttpWorkerRequest)>,
}
enum HttpWorkerRequest {
Dispatched(client::ResponseFuture),
ReadBody {
body: Body,
tx: Sender,
},
}
impl Future for HttpWorker {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let me = &mut *self;
for n in (0..me.requests.len()).rev() {
let (id, request) = me.requests.swap_remove(n);
match request {
HttpWorkerRequest::Dispatched(mut future) => {
let response = match Future::poll(Pin::new(&mut future), cx) {
Poll::Pending => {
me.requests.push((id, HttpWorkerRequest::Dispatched(future)));
continue;
},
Poll::Ready(Ok(response)) => response,
Poll::Ready(Err(error)) => {
let _ = me.to_api.unbounded_send(WorkerToApi::Fail { id, error });
continue; },
};
let (head, body) = response.into_parts();
let (status_code, headers) = (head.status, head.headers);
let (body_tx, body_rx) = mpsc::channel(3);
let _ = me.to_api.unbounded_send(WorkerToApi::Response {
id,
status_code,
headers,
body: body_rx,
});
me.requests.push((
id,
HttpWorkerRequest::ReadBody { body: Body::new(body), tx: body_tx },
));
cx.waker().wake_by_ref(); continue;
},
HttpWorkerRequest::ReadBody { mut body, mut tx } => {
match tx.poll_ready(cx) {
Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => continue, Poll::Pending => {
me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx }));
continue;
},
}
match Pin::new(&mut body).poll_frame(cx) {
Poll::Ready(Some(Ok(chunk))) => {
let _ = tx.start_send(Ok(chunk));
me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx }));
cx.waker().wake_by_ref(); },
Poll::Ready(Some(Err(err))) => {
let _ = tx.start_send(Err(err));
},
Poll::Ready(None) => {}, Poll::Pending => {
me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx }));
},
}
},
}
}
match Stream::poll_next(Pin::new(&mut me.from_api), cx) {
Poll::Pending => {},
Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(ApiToWorker::Dispatch { id, request })) => {
let future = me.http_client.request(request);
debug_assert!(me.requests.iter().all(|(i, _)| *i != id));
me.requests.push((id, HttpWorkerRequest::Dispatched(future)));
cx.waker().wake_by_ref(); },
}
Poll::Pending
}
}
impl fmt::Debug for HttpWorker {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list().entries(self.requests.iter()).finish()
}
}
impl fmt::Debug for HttpWorkerRequest {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
HttpWorkerRequest::Dispatched(_) =>
f.debug_tuple("HttpWorkerRequest::Dispatched").finish(),
HttpWorkerRequest::ReadBody { .. } =>
f.debug_tuple("HttpWorkerRequest::Response").finish(),
}
}
}
#[cfg(test)]
mod tests {
use super::{
super::{tests::TestNetwork, AsyncApi},
*,
};
use crate::api::timestamp;
use core::convert::Infallible;
use futures::future;
use http_body_util::BodyExt;
use sp_core::offchain::{Duration, Externalities, HttpError, HttpRequestId, HttpRequestStatus};
use std::sync::LazyLock;
static SHARED_CLIENT: LazyLock<SharedClient> = LazyLock::new(|| SharedClient::new().unwrap());
macro_rules! build_api_server {
() => {
build_api_server!(hyper::Response::new(http_body_util::Full::new(
hyper::body::Bytes::from("Hello World!")
)))
};
( $response:expr ) => {{
let hyper_client = SHARED_CLIENT.clone();
let (api, worker) = http(hyper_client.clone());
let (addr_tx, addr_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let worker = rt.spawn(worker);
let server = rt.spawn(async move {
let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 0));
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
let _ = addr_tx.send(listener.local_addr().unwrap());
loop {
let (stream, _) = listener.accept().await.unwrap();
let io = hyper_util::rt::TokioIo::new(stream);
tokio::task::spawn(async move {
if let Err(err) = hyper::server::conn::http1::Builder::new()
.serve_connection(
io,
hyper::service::service_fn(
move |req: hyper::Request<hyper::body::Incoming>| async move {
let _ = req.into_body().collect().await;
Ok::<_, Infallible>($response)
},
),
)
.await
{
eprintln!("Error serving connection: {:?}", err);
}
});
}
});
let _ = rt.block_on(future::join(worker, server));
});
(api, addr_rx.recv().unwrap())
}};
}
#[test]
fn basic_localhost() {
let deadline = timestamp::now().add(Duration::from_millis(10_000));
let (mut api, addr) = build_api_server!();
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
api.request_write_body(id, &[], Some(deadline)).unwrap();
match api.response_wait(&[id], Some(deadline))[0] {
HttpRequestStatus::Finished(200) => {},
v => panic!("Connecting to localhost failed: {:?}", v),
}
let headers = api.response_headers(id);
assert!(headers.iter().any(|(h, _)| h.eq_ignore_ascii_case(b"Date")));
let mut buf = vec![0; 2048];
let n = api.response_read_body(id, &mut buf, Some(deadline)).unwrap();
assert_eq!(&buf[..n], b"Hello World!");
}
#[test]
fn basic_http2_localhost() {
let deadline = timestamp::now().add(Duration::from_millis(10_000));
let (mut api, addr) = build_api_server!(hyper::Response::builder()
.version(hyper::Version::HTTP_2)
.body(http_body_util::Full::new(hyper::body::Bytes::from("Hello World!")))
.unwrap());
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
api.request_write_body(id, &[], Some(deadline)).unwrap();
match api.response_wait(&[id], Some(deadline))[0] {
HttpRequestStatus::Finished(200) => {},
v => panic!("Connecting to localhost failed: {:?}", v),
}
let headers = api.response_headers(id);
assert!(headers.iter().any(|(h, _)| h.eq_ignore_ascii_case(b"Date")));
let mut buf = vec![0; 2048];
let n = api.response_read_body(id, &mut buf, Some(deadline)).unwrap();
assert_eq!(&buf[..n], b"Hello World!");
}
#[test]
fn request_start_invalid_call() {
let (mut api, addr) = build_api_server!();
match api.request_start("\0", &format!("http://{}", addr)) {
Err(()) => {},
Ok(_) => panic!(),
};
match api.request_start("GET", "http://\0localhost") {
Err(()) => {},
Ok(_) => panic!(),
};
}
#[test]
fn request_add_header_invalid_call() {
let (mut api, addr) = build_api_server!();
match api.request_add_header(HttpRequestId(0xdead), "Foo", "bar") {
Err(()) => {},
Ok(_) => panic!(),
};
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
match api.request_add_header(id, "\0", "bar") {
Err(()) => {},
Ok(_) => panic!(),
};
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
match api.request_add_header(id, "Foo", "\0") {
Err(()) => {},
Ok(_) => panic!(),
};
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
api.request_add_header(id, "Foo", "Bar").unwrap();
api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
match api.request_add_header(id, "Foo2", "Bar") {
Err(()) => {},
Ok(_) => panic!(),
};
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
api.response_headers(id);
match api.request_add_header(id, "Foo2", "Bar") {
Err(()) => {},
Ok(_) => panic!(),
};
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
api.response_read_body(id, &mut [], None).unwrap();
match api.request_add_header(id, "Foo2", "Bar") {
Err(()) => {},
Ok(_) => panic!(),
};
}
#[test]
fn request_write_body_invalid_call() {
let (mut api, addr) = build_api_server!();
match api.request_write_body(HttpRequestId(0xdead), &[1, 2, 3], None) {
Err(HttpError::Invalid) => {},
_ => panic!(),
};
match api.request_write_body(HttpRequestId(0xdead), &[], None) {
Err(HttpError::Invalid) => {},
_ => panic!(),
};
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
api.request_write_body(id, &[], None).unwrap();
match api.request_write_body(id, &[], None) {
Err(HttpError::Invalid) => {},
_ => panic!(),
};
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
api.request_write_body(id, &[], None).unwrap();
match api.request_write_body(id, &[1, 2, 3, 4], None) {
Err(HttpError::Invalid) => {},
_ => panic!(),
};
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
api.response_wait(&[id], None);
match api.request_write_body(id, &[], None) {
Err(HttpError::Invalid) => {},
_ => panic!(),
};
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
api.response_wait(&[id], None);
match api.request_write_body(id, &[1, 2, 3, 4], None) {
Err(HttpError::Invalid) => {},
_ => panic!(),
};
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
api.response_headers(id);
match api.request_write_body(id, &[1, 2, 3, 4], None) {
Err(HttpError::Invalid) => {},
_ => panic!(),
};
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
api.response_headers(id);
match api.request_write_body(id, &[], None) {
Err(HttpError::Invalid) => {},
_ => panic!(),
};
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
api.response_read_body(id, &mut [], None).unwrap();
match api.request_write_body(id, &[1, 2, 3, 4], None) {
Err(HttpError::Invalid) => {},
_ => panic!(),
};
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
api.response_read_body(id, &mut [], None).unwrap();
match api.request_write_body(id, &[], None) {
Err(HttpError::Invalid) => {},
_ => panic!(),
};
}
#[test]
fn response_headers_invalid_call() {
let (mut api, addr) = build_api_server!();
assert_eq!(api.response_headers(HttpRequestId(0xdead)), &[]);
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
assert_eq!(api.response_headers(id), &[]);
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
api.request_write_body(id, &[], None).unwrap();
while api.response_headers(id).is_empty() {
std::thread::sleep(std::time::Duration::from_millis(100));
}
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
api.response_wait(&[id], None);
assert_ne!(api.response_headers(id), &[]);
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
let mut buf = [0; 128];
while api.response_read_body(id, &mut buf, None).unwrap() != 0 {}
assert_eq!(api.response_headers(id), &[]);
}
#[test]
fn response_header_invalid_call() {
let (mut api, addr) = build_api_server!();
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
assert_eq!(api.response_headers(id), &[]);
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
api.request_add_header(id, "Foo", "Bar").unwrap();
assert_eq!(api.response_headers(id), &[]);
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
api.request_add_header(id, "Foo", "Bar").unwrap();
api.request_write_body(id, &[], None).unwrap();
assert_eq!(api.response_headers(id), &[]);
}
#[test]
fn response_read_body_invalid_call() {
let (mut api, addr) = build_api_server!();
let mut buf = [0; 512];
match api.response_read_body(HttpRequestId(0xdead), &mut buf, None) {
Err(HttpError::Invalid) => {},
_ => panic!(),
}
let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
while api.response_read_body(id, &mut buf, None).unwrap() != 0 {}
match api.response_read_body(id, &mut buf, None) {
Err(HttpError::Invalid) => {},
_ => panic!(),
}
}
#[test]
fn fuzzing() {
let (mut api, addr) = build_api_server!();
for _ in 0..50 {
let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
for _ in 0..250 {
match rand::random::<u8>() % 6 {
0 => {
let _ = api.request_add_header(id, "Foo", "Bar");
},
1 => {
let _ = api.request_write_body(id, &[1, 2, 3, 4], None);
},
2 => {
let _ = api.request_write_body(id, &[], None);
},
3 => {
let _ = api.response_wait(&[id], None);
},
4 => {
let _ = api.response_headers(id);
},
5 => {
let mut buf = [0; 512];
let _ = api.response_read_body(id, &mut buf, None);
},
6..=255 => unreachable!(),
}
}
}
}
#[test]
fn shared_http_client_is_only_initialized_on_access() {
let shared_client = SharedClient::new().unwrap();
{
let mock = Arc::new(TestNetwork());
let (mut api, async_api) = AsyncApi::new(mock, false, shared_client.clone());
api.timestamp();
futures::executor::block_on(async move {
assert!(futures::poll!(async_api.process()).is_pending());
});
}
assert!(Lazy::into_value(Arc::try_unwrap(shared_client.0).unwrap()).is_err());
let shared_client = SharedClient::new().unwrap();
{
let mock = Arc::new(TestNetwork());
let (mut api, async_api) = AsyncApi::new(mock, false, shared_client.clone());
let id = api.http_request_start("lol", "nope", &[]).unwrap();
api.http_request_write_body(id, &[], None).unwrap();
futures::executor::block_on(async move {
assert!(futures::poll!(async_api.process()).is_pending());
});
}
assert!(Lazy::into_value(Arc::try_unwrap(shared_client.0).unwrap()).is_ok());
}
}