1use crate::api::timestamp;
31use bytes::buf::{Buf, Reader};
32use fnv::FnvHashMap;
33use futures::{channel::mpsc, future, prelude::*};
34use http_body_util::{combinators::BoxBody, StreamBody};
35use hyper::body::Body as _;
36use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
37use hyper_util::{client::legacy as client, rt::TokioExecutor};
38use once_cell::sync::Lazy;
39use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
40use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Timestamp};
41use std::{
42 fmt,
43 io::Read as _,
44 pin::Pin,
45 sync::Arc,
46 task::{Context, Poll},
47};
48
49const LOG_TARGET: &str = "offchain-worker::http";
50
51pub type Body = BoxBody<hyper::body::Bytes, hyper::Error>;
52
53type Sender = mpsc::Sender<Result<hyper::body::Frame<hyper::body::Bytes>, hyper::Error>>;
54type Receiver = mpsc::Receiver<Result<hyper::body::Frame<hyper::body::Bytes>, hyper::Error>>;
55
56type HyperClient = client::Client<HttpsConnector<client::connect::HttpConnector>, Body>;
57type LazyClient = Lazy<HyperClient, Box<dyn FnOnce() -> HyperClient + Send>>;
58
59#[derive(Clone)]
61pub struct SharedClient(Arc<LazyClient>);
62
63impl SharedClient {
64 pub fn new() -> std::io::Result<Self> {
65 let builder = HttpsConnectorBuilder::new()
66 .with_provider_and_native_roots(rustls::crypto::ring::default_provider())?;
67 Ok(Self(Arc::new(Lazy::new(Box::new(|| {
68 let connector = builder.https_or_http().enable_http1().enable_http2().build();
69 client::Client::builder(TokioExecutor::new()).build(connector)
70 })))))
71 }
72}
73
74pub fn http(shared_client: SharedClient) -> (HttpApi, HttpWorker) {
76 let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker", 100_000);
77 let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api", 100_000);
78
79 let api = HttpApi {
80 to_worker,
81 from_worker: from_worker.fuse(),
82 next_id: HttpRequestId(rand::random::<u16>() % 2000),
85 requests: FnvHashMap::default(),
86 };
87
88 let engine =
89 HttpWorker { to_api, from_api, http_client: shared_client.0, requests: Vec::new() };
90
91 (api, engine)
92}
93
94pub struct HttpApi {
99 to_worker: TracingUnboundedSender<ApiToWorker>,
101 from_worker: stream::Fuse<TracingUnboundedReceiver<WorkerToApi>>,
104 next_id: HttpRequestId,
106 requests: FnvHashMap<HttpRequestId, HttpApiRequest>,
108}
109
110enum HttpApiRequest {
112 NotDispatched(hyper::Request<Body>, Sender),
114 Dispatched(Option<Sender>),
117 Response(HttpApiRequestRp),
119 Fail(client::Error),
123}
124
125struct HttpApiRequestRp {
127 sending_body: Option<Sender>,
130 status_code: hyper::StatusCode,
132 headers: hyper::HeaderMap,
134 body: stream::Fuse<Receiver>,
141 current_read_chunk: Option<Reader<hyper::body::Bytes>>,
144}
145
146impl HttpApi {
147 pub fn request_start(&mut self, method: &str, uri: &str) -> Result<HttpRequestId, ()> {
149 let (body_sender, receiver) = mpsc::channel(0);
153 let body = StreamBody::new(receiver);
154 let body = BoxBody::new(body);
155 let mut request = hyper::Request::new(body);
156 *request.method_mut() = hyper::Method::from_bytes(method.as_bytes()).map_err(|_| ())?;
157 *request.uri_mut() = hyper::Uri::from_maybe_shared(uri.to_owned()).map_err(|_| ())?;
158
159 let new_id = self.next_id;
160 debug_assert!(!self.requests.contains_key(&new_id));
161 match self.next_id.0.checked_add(1) {
162 Some(new_id) => self.next_id.0 = new_id,
163 None => {
164 tracing::error!(
165 target: LOG_TARGET,
166 "Overflow in offchain worker HTTP request ID assignment"
167 );
168 return Err(());
169 },
170 };
171 self.requests
172 .insert(new_id, HttpApiRequest::NotDispatched(request, body_sender));
173
174 tracing::trace!(
175 target: LOG_TARGET,
176 id = %new_id.0,
177 %method,
178 %uri,
179 "Requested started",
180 );
181
182 Ok(new_id)
183 }
184
185 pub fn request_add_header(
187 &mut self,
188 request_id: HttpRequestId,
189 name: &str,
190 value: &str,
191 ) -> Result<(), ()> {
192 let request = match self.requests.get_mut(&request_id) {
193 Some(&mut HttpApiRequest::NotDispatched(ref mut rq, _)) => rq,
194 _ => return Err(()),
195 };
196
197 let header_name = hyper::header::HeaderName::try_from(name).map_err(drop)?;
198 let header_value = hyper::header::HeaderValue::try_from(value).map_err(drop)?;
199 request.headers_mut().append(header_name, header_value);
202
203 tracing::debug!(target: LOG_TARGET, id = %request_id.0, %name, %value, "Added header to request");
204
205 Ok(())
206 }
207
208 pub fn request_write_body(
210 &mut self,
211 request_id: HttpRequestId,
212 chunk: &[u8],
213 deadline: Option<Timestamp>,
214 ) -> Result<(), HttpError> {
215 let mut request = self.requests.remove(&request_id).ok_or(HttpError::Invalid)?;
218
219 let mut deadline = timestamp::deadline_to_future(deadline);
220 let mut poll_sender = move |sender: &mut Sender| -> Result<(), HttpError> {
224 let mut when_ready = future::maybe_done(future::poll_fn(|cx| sender.poll_ready(cx)));
225 futures::executor::block_on(future::select(&mut when_ready, &mut deadline));
226 match when_ready {
227 future::MaybeDone::Done(Ok(())) => {},
228 future::MaybeDone::Done(Err(_)) => return Err(HttpError::IoError),
229 future::MaybeDone::Future(_) | future::MaybeDone::Gone => {
230 debug_assert!(matches!(deadline, future::MaybeDone::Done(..)));
231 return Err(HttpError::DeadlineReached);
232 },
233 };
234
235 futures::executor::block_on(
236 async {
237 future::poll_fn(|cx| sender.poll_ready(cx)).await?;
238 sender.start_send(Ok(hyper::body::Frame::data(hyper::body::Bytes::from(chunk.to_owned()))))
239 }
240 )
241 .map_err(|_| {
242 tracing::error!(target: "offchain-worker::http", "HTTP sender refused data despite being ready");
243 HttpError::IoError
244 })
245 };
246
247 loop {
248 request = match request {
249 HttpApiRequest::NotDispatched(request, sender) => {
250 tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Added new body chunk");
251 let _ = self
253 .to_worker
254 .unbounded_send(ApiToWorker::Dispatch { id: request_id, request });
255 HttpApiRequest::Dispatched(Some(sender))
256 },
257
258 HttpApiRequest::Dispatched(Some(mut sender)) => {
259 if !chunk.is_empty() {
260 match poll_sender(&mut sender) {
261 Err(HttpError::IoError) => {
262 tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Encountered io error while trying to add new chunk to body");
263 return Err(HttpError::IoError);
264 },
265 other => {
266 tracing::debug!(target: LOG_TARGET, id = %request_id.0, res = ?other, "Added chunk to body");
267 self.requests
268 .insert(request_id, HttpApiRequest::Dispatched(Some(sender)));
269 return other;
270 },
271 }
272 } else {
273 tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Finished writing body");
274
275 self.requests.insert(request_id, HttpApiRequest::Dispatched(None));
278 return Ok(());
279 }
280 },
281
282 HttpApiRequest::Response(
283 mut response @ HttpApiRequestRp { sending_body: Some(_), .. },
284 ) => {
285 if !chunk.is_empty() {
286 match poll_sender(
287 response
288 .sending_body
289 .as_mut()
290 .expect("Can only enter this match branch if Some; qed"),
291 ) {
292 Err(HttpError::IoError) => {
293 tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Encountered io error while trying to add new chunk to body");
294 return Err(HttpError::IoError);
295 },
296 other => {
297 tracing::debug!(target: LOG_TARGET, id = %request_id.0, res = ?other, "Added chunk to body");
298 self.requests
299 .insert(request_id, HttpApiRequest::Response(response));
300 return other;
301 },
302 }
303 } else {
304 tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Finished writing body");
305
306 self.requests.insert(
309 request_id,
310 HttpApiRequest::Response(HttpApiRequestRp {
311 sending_body: None,
312 ..response
313 }),
314 );
315 return Ok(());
316 }
317 },
318
319 HttpApiRequest::Fail(error) => {
320 tracing::debug!(target: LOG_TARGET, id = %request_id.0, ?error, "Request failed");
321
322 return Err(HttpError::IoError);
325 },
326
327 v @ HttpApiRequest::Dispatched(None) |
328 v @ HttpApiRequest::Response(HttpApiRequestRp { sending_body: None, .. }) => {
329 tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Body sending already finished");
330
331 self.requests.insert(request_id, v);
333 return Err(HttpError::Invalid);
334 },
335 }
336 }
337 }
338
339 pub fn response_wait(
341 &mut self,
342 ids: &[HttpRequestId],
343 deadline: Option<Timestamp>,
344 ) -> Vec<HttpRequestStatus> {
345 for id in ids {
348 match self.requests.get_mut(id) {
349 Some(HttpApiRequest::NotDispatched(_, _)) => {},
350 Some(HttpApiRequest::Dispatched(sending_body)) |
351 Some(HttpApiRequest::Response(HttpApiRequestRp { sending_body, .. })) => {
352 let _ = sending_body.take();
353 continue;
354 },
355 _ => continue,
356 };
357
358 let (request, _sender) = match self.requests.remove(id) {
359 Some(HttpApiRequest::NotDispatched(rq, s)) => (rq, s),
360 _ => unreachable!("we checked for NotDispatched above; qed"),
361 };
362
363 let _ = self.to_worker.unbounded_send(ApiToWorker::Dispatch { id: *id, request });
364
365 self.requests.insert(*id, HttpApiRequest::Dispatched(None));
367 }
368
369 let mut deadline = timestamp::deadline_to_future(deadline);
370
371 loop {
372 {
375 let mut output = Vec::with_capacity(ids.len());
376 let mut must_wait_more = false;
377 for id in ids {
378 output.push(match self.requests.get(id) {
379 None => HttpRequestStatus::Invalid,
380 Some(HttpApiRequest::NotDispatched(_, _)) => unreachable!(
381 "we replaced all the NotDispatched with Dispatched earlier; qed"
382 ),
383 Some(HttpApiRequest::Dispatched(_)) => {
384 must_wait_more = true;
385 HttpRequestStatus::DeadlineReached
386 },
387 Some(HttpApiRequest::Fail(_)) => HttpRequestStatus::IoError,
388 Some(HttpApiRequest::Response(HttpApiRequestRp {
389 status_code, ..
390 })) => HttpRequestStatus::Finished(status_code.as_u16()),
391 });
392 }
393 debug_assert_eq!(output.len(), ids.len());
394
395 let is_done =
397 if let future::MaybeDone::Done(_) = deadline { true } else { !must_wait_more };
398
399 if is_done {
400 debug_assert_eq!(output.len(), ids.len());
402 for n in (0..ids.len()).rev() {
403 match output[n] {
404 HttpRequestStatus::IoError => {
405 self.requests.remove(&ids[n]);
406 },
407 HttpRequestStatus::Invalid => {
408 tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Unknown request");
409 },
410 HttpRequestStatus::DeadlineReached => {
411 tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Deadline reached");
412 },
413 HttpRequestStatus::Finished(_) => {
414 tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Request finished");
415 },
416 }
417 }
418 return output;
419 }
420 }
421
422 let next_message = {
425 let mut next_msg = future::maybe_done(self.from_worker.next());
426 futures::executor::block_on(future::select(&mut next_msg, &mut deadline));
427 if let future::MaybeDone::Done(msg) = next_msg {
428 msg
429 } else {
430 debug_assert!(matches!(deadline, future::MaybeDone::Done(..)));
431 continue;
432 }
433 };
434
435 match next_message {
437 Some(WorkerToApi::Response { id, status_code, headers, body }) => {
438 match self.requests.remove(&id) {
439 Some(HttpApiRequest::Dispatched(sending_body)) => {
440 self.requests.insert(
441 id,
442 HttpApiRequest::Response(HttpApiRequestRp {
443 sending_body,
444 status_code,
445 headers,
446 body: body.fuse(),
447 current_read_chunk: None,
448 }),
449 );
450 },
451 None => {}, _ => {
453 tracing::error!(target: "offchain-worker::http", "State mismatch between the API and worker")
454 },
455 }
456 },
457
458 Some(WorkerToApi::Fail { id, error }) => match self.requests.remove(&id) {
459 Some(HttpApiRequest::Dispatched(_)) => {
460 tracing::debug!(target: LOG_TARGET, id = %id.0, ?error, "Request failed");
461 self.requests.insert(id, HttpApiRequest::Fail(error));
462 },
463 None => {}, _ => {
465 tracing::error!(target: "offchain-worker::http", "State mismatch between the API and worker")
466 },
467 },
468
469 None => {
470 tracing::error!(target: "offchain-worker::http", "Worker has crashed");
471 return ids.iter().map(|_| HttpRequestStatus::IoError).collect();
472 },
473 }
474 }
475 }
476
477 pub fn response_headers(&mut self, request_id: HttpRequestId) -> Vec<(Vec<u8>, Vec<u8>)> {
479 let _ = self.response_wait(&[request_id], Some(timestamp::now()));
481
482 let headers = match self.requests.get(&request_id) {
483 Some(HttpApiRequest::Response(HttpApiRequestRp { headers, .. })) => headers,
484 _ => return Vec::new(),
485 };
486
487 headers
488 .iter()
489 .map(|(name, value)| (name.as_str().as_bytes().to_owned(), value.as_bytes().to_owned()))
490 .collect()
491 }
492
493 pub fn response_read_body(
495 &mut self,
496 request_id: HttpRequestId,
497 buffer: &mut [u8],
498 deadline: Option<Timestamp>,
499 ) -> Result<usize, HttpError> {
500 let _ = self.response_wait(&[request_id], deadline);
502
503 let mut response = match self.requests.remove(&request_id) {
506 Some(HttpApiRequest::Response(r)) => r,
507 Some(rq @ HttpApiRequest::Dispatched(_)) => {
510 self.requests.insert(request_id, rq);
511 return Err(HttpError::DeadlineReached);
512 },
513 Some(HttpApiRequest::Fail { .. }) => return Err(HttpError::IoError),
515 Some(rq @ HttpApiRequest::NotDispatched(_, _)) => {
517 self.requests.insert(request_id, rq);
518 return Err(HttpError::Invalid);
519 },
520 None => return Err(HttpError::Invalid),
521 };
522
523 let mut deadline = timestamp::deadline_to_future(deadline);
525
526 loop {
527 if let Some(mut current_read_chunk) = response.current_read_chunk.take() {
529 match current_read_chunk.read(buffer) {
530 Ok(0) => {},
531 Ok(n) => {
532 self.requests.insert(
533 request_id,
534 HttpApiRequest::Response(HttpApiRequestRp {
535 current_read_chunk: Some(current_read_chunk),
536 ..response
537 }),
538 );
539 return Ok(n);
540 },
541 Err(err) => {
542 tracing::error!(target: "offchain-worker::http", "Failed to read from current read chunk: {:?}", err);
544 return Err(HttpError::IoError);
545 },
546 }
547 }
548
549 let mut next_body = future::maybe_done(response.body.next());
553 futures::executor::block_on(future::select(&mut next_body, &mut deadline));
554
555 if let future::MaybeDone::Done(next_body) = next_body {
556 match next_body {
557 Some(Ok(chunk)) =>
558 if let Ok(chunk) = chunk.into_data() {
559 response.current_read_chunk = Some(chunk.reader());
560 },
561 Some(Err(_)) => return Err(HttpError::IoError),
562 None => return Ok(0), }
564 }
565
566 if let future::MaybeDone::Done(_) = deadline {
567 self.requests.insert(request_id, HttpApiRequest::Response(response));
568 return Err(HttpError::DeadlineReached);
569 }
570 }
571 }
572}
573
574impl fmt::Debug for HttpApi {
575 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
576 f.debug_list().entries(self.requests.iter()).finish()
577 }
578}
579
580impl fmt::Debug for HttpApiRequest {
581 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
582 match self {
583 HttpApiRequest::NotDispatched(_, _) =>
584 f.debug_tuple("HttpApiRequest::NotDispatched").finish(),
585 HttpApiRequest::Dispatched(_) => f.debug_tuple("HttpApiRequest::Dispatched").finish(),
586 HttpApiRequest::Response(HttpApiRequestRp { status_code, headers, .. }) => f
587 .debug_tuple("HttpApiRequest::Response")
588 .field(status_code)
589 .field(headers)
590 .finish(),
591 HttpApiRequest::Fail(err) => f.debug_tuple("HttpApiRequest::Fail").field(err).finish(),
592 }
593 }
594}
595
596enum ApiToWorker {
598 Dispatch {
600 id: HttpRequestId,
602 request: hyper::Request<Body>,
604 },
605}
606
607enum WorkerToApi {
609 Response {
611 id: HttpRequestId,
613 status_code: hyper::StatusCode,
615 headers: hyper::HeaderMap,
617 body: Receiver,
625 },
626 Fail {
628 id: HttpRequestId,
630 error: client::Error,
632 },
633}
634
635pub struct HttpWorker {
637 to_api: TracingUnboundedSender<WorkerToApi>,
639 from_api: TracingUnboundedReceiver<ApiToWorker>,
641 http_client: Arc<LazyClient>,
643 requests: Vec<(HttpRequestId, HttpWorkerRequest)>,
645}
646
647enum HttpWorkerRequest {
649 Dispatched(client::ResponseFuture),
651 ReadBody {
653 body: Body,
655 tx: Sender,
657 },
658}
659
660impl Future for HttpWorker {
661 type Output = ();
662
663 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
664 let me = &mut *self;
669
670 for n in (0..me.requests.len()).rev() {
672 let (id, request) = me.requests.swap_remove(n);
673 match request {
674 HttpWorkerRequest::Dispatched(mut future) => {
675 let response = match Future::poll(Pin::new(&mut future), cx) {
677 Poll::Pending => {
678 me.requests.push((id, HttpWorkerRequest::Dispatched(future)));
679 continue;
680 },
681 Poll::Ready(Ok(response)) => response,
682 Poll::Ready(Err(error)) => {
683 let _ = me.to_api.unbounded_send(WorkerToApi::Fail { id, error });
684 continue; },
686 };
687
688 let (head, body) = response.into_parts();
690 let (status_code, headers) = (head.status, head.headers);
691
692 let (body_tx, body_rx) = mpsc::channel(3);
693 let _ = me.to_api.unbounded_send(WorkerToApi::Response {
694 id,
695 status_code,
696 headers,
697 body: body_rx,
698 });
699
700 me.requests.push((
701 id,
702 HttpWorkerRequest::ReadBody { body: Body::new(body), tx: body_tx },
703 ));
704 cx.waker().wake_by_ref(); continue;
706 },
707
708 HttpWorkerRequest::ReadBody { mut body, mut tx } => {
709 match tx.poll_ready(cx) {
712 Poll::Ready(Ok(())) => {},
713 Poll::Ready(Err(_)) => continue, Poll::Pending => {
715 me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx }));
716 continue;
717 },
718 }
719
720 match Pin::new(&mut body).poll_frame(cx) {
721 Poll::Ready(Some(Ok(chunk))) => {
722 let _ = tx.start_send(Ok(chunk));
723 me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx }));
724 cx.waker().wake_by_ref(); },
726 Poll::Ready(Some(Err(err))) => {
727 let _ = tx.start_send(Err(err));
728 },
730 Poll::Ready(None) => {}, Poll::Pending => {
732 me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx }));
733 },
734 }
735 },
736 }
737 }
738
739 match Stream::poll_next(Pin::new(&mut me.from_api), cx) {
741 Poll::Pending => {},
742 Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(ApiToWorker::Dispatch { id, request })) => {
744 let future = me.http_client.request(request);
745 debug_assert!(me.requests.iter().all(|(i, _)| *i != id));
746 me.requests.push((id, HttpWorkerRequest::Dispatched(future)));
747 cx.waker().wake_by_ref(); },
749 }
750
751 Poll::Pending
752 }
753}
754
755impl fmt::Debug for HttpWorker {
756 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
757 f.debug_list().entries(self.requests.iter()).finish()
758 }
759}
760
761impl fmt::Debug for HttpWorkerRequest {
762 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
763 match self {
764 HttpWorkerRequest::Dispatched(_) =>
765 f.debug_tuple("HttpWorkerRequest::Dispatched").finish(),
766 HttpWorkerRequest::ReadBody { .. } =>
767 f.debug_tuple("HttpWorkerRequest::Response").finish(),
768 }
769 }
770}
771
772#[cfg(test)]
773mod tests {
774 use super::{
775 super::{tests::TestNetwork, AsyncApi},
776 *,
777 };
778 use crate::api::timestamp;
779 use core::convert::Infallible;
780 use futures::future;
781 use http_body_util::BodyExt;
782 use sp_core::offchain::{Duration, Externalities, HttpError, HttpRequestId, HttpRequestStatus};
783 use std::sync::LazyLock;
784
785 static SHARED_CLIENT: LazyLock<SharedClient> = LazyLock::new(|| SharedClient::new().unwrap());
788
789 macro_rules! build_api_server {
792 () => {
793 build_api_server!(hyper::Response::new(http_body_util::Full::new(
794 hyper::body::Bytes::from("Hello World!")
795 )))
796 };
797 ( $response:expr ) => {{
798 let hyper_client = SHARED_CLIENT.clone();
799 let (api, worker) = http(hyper_client.clone());
800
801 let (addr_tx, addr_rx) = std::sync::mpsc::channel();
802 std::thread::spawn(move || {
803 let rt = tokio::runtime::Runtime::new().unwrap();
804 let worker = rt.spawn(worker);
805 let server = rt.spawn(async move {
806 let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 0));
807 let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
808 let _ = addr_tx.send(listener.local_addr().unwrap());
809 loop {
810 let (stream, _) = listener.accept().await.unwrap();
811 let io = hyper_util::rt::TokioIo::new(stream);
812 tokio::task::spawn(async move {
813 if let Err(err) = hyper::server::conn::http1::Builder::new()
814 .serve_connection(
815 io,
816 hyper::service::service_fn(
817 move |req: hyper::Request<hyper::body::Incoming>| async move {
818 let _ = req.into_body().collect().await;
821
822 Ok::<_, Infallible>($response)
823 },
824 ),
825 )
826 .await
827 {
828 eprintln!("Error serving connection: {:?}", err);
829 }
830 });
831 }
832 });
833 let _ = rt.block_on(future::join(worker, server));
834 });
835 (api, addr_rx.recv().unwrap())
836 }};
837 }
838
839 #[test]
840 fn basic_localhost() {
841 let deadline = timestamp::now().add(Duration::from_millis(10_000));
842
843 let (mut api, addr) = build_api_server!();
846
847 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
848 api.request_write_body(id, &[], Some(deadline)).unwrap();
849
850 match api.response_wait(&[id], Some(deadline))[0] {
851 HttpRequestStatus::Finished(200) => {},
852 v => panic!("Connecting to localhost failed: {:?}", v),
853 }
854
855 let headers = api.response_headers(id);
856 assert!(headers.iter().any(|(h, _)| h.eq_ignore_ascii_case(b"Date")));
857
858 let mut buf = vec![0; 2048];
859 let n = api.response_read_body(id, &mut buf, Some(deadline)).unwrap();
860 assert_eq!(&buf[..n], b"Hello World!");
861 }
862
863 #[test]
864 fn basic_http2_localhost() {
865 let deadline = timestamp::now().add(Duration::from_millis(10_000));
866
867 let (mut api, addr) = build_api_server!(hyper::Response::builder()
870 .version(hyper::Version::HTTP_2)
871 .body(http_body_util::Full::new(hyper::body::Bytes::from("Hello World!")))
872 .unwrap());
873
874 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
875 api.request_write_body(id, &[], Some(deadline)).unwrap();
876
877 match api.response_wait(&[id], Some(deadline))[0] {
878 HttpRequestStatus::Finished(200) => {},
879 v => panic!("Connecting to localhost failed: {:?}", v),
880 }
881
882 let headers = api.response_headers(id);
883 assert!(headers.iter().any(|(h, _)| h.eq_ignore_ascii_case(b"Date")));
884
885 let mut buf = vec![0; 2048];
886 let n = api.response_read_body(id, &mut buf, Some(deadline)).unwrap();
887 assert_eq!(&buf[..n], b"Hello World!");
888 }
889
890 #[test]
891 fn request_start_invalid_call() {
892 let (mut api, addr) = build_api_server!();
893
894 match api.request_start("\0", &format!("http://{}", addr)) {
895 Err(()) => {},
896 Ok(_) => panic!(),
897 };
898
899 match api.request_start("GET", "http://\0localhost") {
900 Err(()) => {},
901 Ok(_) => panic!(),
902 };
903 }
904
905 #[test]
906 fn request_add_header_invalid_call() {
907 let (mut api, addr) = build_api_server!();
908
909 match api.request_add_header(HttpRequestId(0xdead), "Foo", "bar") {
910 Err(()) => {},
911 Ok(_) => panic!(),
912 };
913
914 let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
915 match api.request_add_header(id, "\0", "bar") {
916 Err(()) => {},
917 Ok(_) => panic!(),
918 };
919
920 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
921 match api.request_add_header(id, "Foo", "\0") {
922 Err(()) => {},
923 Ok(_) => panic!(),
924 };
925
926 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
927 api.request_add_header(id, "Foo", "Bar").unwrap();
928 api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
929 match api.request_add_header(id, "Foo2", "Bar") {
930 Err(()) => {},
931 Ok(_) => panic!(),
932 };
933
934 let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
935 api.response_headers(id);
936 match api.request_add_header(id, "Foo2", "Bar") {
937 Err(()) => {},
938 Ok(_) => panic!(),
939 };
940
941 let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
942 api.response_read_body(id, &mut [], None).unwrap();
943 match api.request_add_header(id, "Foo2", "Bar") {
944 Err(()) => {},
945 Ok(_) => panic!(),
946 };
947 }
948
949 #[test]
950 fn request_write_body_invalid_call() {
951 let (mut api, addr) = build_api_server!();
952
953 match api.request_write_body(HttpRequestId(0xdead), &[1, 2, 3], None) {
954 Err(HttpError::Invalid) => {},
955 _ => panic!(),
956 };
957
958 match api.request_write_body(HttpRequestId(0xdead), &[], None) {
959 Err(HttpError::Invalid) => {},
960 _ => panic!(),
961 };
962
963 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
964 api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
965 api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
966 api.request_write_body(id, &[], None).unwrap();
967 match api.request_write_body(id, &[], None) {
968 Err(HttpError::Invalid) => {},
969 _ => panic!(),
970 };
971
972 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
973 api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
974 api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
975 api.request_write_body(id, &[], None).unwrap();
976 match api.request_write_body(id, &[1, 2, 3, 4], None) {
977 Err(HttpError::Invalid) => {},
978 _ => panic!(),
979 };
980
981 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
982 api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
983 api.response_wait(&[id], None);
984 match api.request_write_body(id, &[], None) {
985 Err(HttpError::Invalid) => {},
986 _ => panic!(),
987 };
988
989 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
990 api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
991 api.response_wait(&[id], None);
992 match api.request_write_body(id, &[1, 2, 3, 4], None) {
993 Err(HttpError::Invalid) => {},
994 _ => panic!(),
995 };
996
997 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
998 api.response_headers(id);
999 match api.request_write_body(id, &[1, 2, 3, 4], None) {
1000 Err(HttpError::Invalid) => {},
1001 _ => panic!(),
1002 };
1003
1004 let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
1005 api.response_headers(id);
1006 match api.request_write_body(id, &[], None) {
1007 Err(HttpError::Invalid) => {},
1008 _ => panic!(),
1009 };
1010
1011 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1012 api.response_read_body(id, &mut [], None).unwrap();
1013 match api.request_write_body(id, &[1, 2, 3, 4], None) {
1014 Err(HttpError::Invalid) => {},
1015 _ => panic!(),
1016 };
1017
1018 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1019 api.response_read_body(id, &mut [], None).unwrap();
1020 match api.request_write_body(id, &[], None) {
1021 Err(HttpError::Invalid) => {},
1022 _ => panic!(),
1023 };
1024 }
1025
1026 #[test]
1027 fn response_headers_invalid_call() {
1028 let (mut api, addr) = build_api_server!();
1029 assert_eq!(api.response_headers(HttpRequestId(0xdead)), &[]);
1030
1031 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1032 assert_eq!(api.response_headers(id), &[]);
1033
1034 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1035 api.request_write_body(id, &[], None).unwrap();
1036 while api.response_headers(id).is_empty() {
1037 std::thread::sleep(std::time::Duration::from_millis(100));
1038 }
1039
1040 let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
1041 api.response_wait(&[id], None);
1042 assert_ne!(api.response_headers(id), &[]);
1043
1044 let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
1045 let mut buf = [0; 128];
1046 while api.response_read_body(id, &mut buf, None).unwrap() != 0 {}
1047 assert_eq!(api.response_headers(id), &[]);
1048 }
1049
1050 #[test]
1051 fn response_header_invalid_call() {
1052 let (mut api, addr) = build_api_server!();
1053
1054 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1055 assert_eq!(api.response_headers(id), &[]);
1056
1057 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1058 api.request_add_header(id, "Foo", "Bar").unwrap();
1059 assert_eq!(api.response_headers(id), &[]);
1060
1061 let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
1062 api.request_add_header(id, "Foo", "Bar").unwrap();
1063 api.request_write_body(id, &[], None).unwrap();
1064 assert_eq!(api.response_headers(id), &[]);
1069 }
1070
1071 #[test]
1072 fn response_read_body_invalid_call() {
1073 let (mut api, addr) = build_api_server!();
1074 let mut buf = [0; 512];
1075
1076 match api.response_read_body(HttpRequestId(0xdead), &mut buf, None) {
1077 Err(HttpError::Invalid) => {},
1078 _ => panic!(),
1079 }
1080
1081 let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
1082 while api.response_read_body(id, &mut buf, None).unwrap() != 0 {}
1083 match api.response_read_body(id, &mut buf, None) {
1084 Err(HttpError::Invalid) => {},
1085 _ => panic!(),
1086 }
1087 }
1088
1089 #[test]
1090 fn fuzzing() {
1091 let (mut api, addr) = build_api_server!();
1096
1097 for _ in 0..50 {
1098 let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1099
1100 for _ in 0..250 {
1101 match rand::random::<u8>() % 6 {
1102 0 => {
1103 let _ = api.request_add_header(id, "Foo", "Bar");
1104 },
1105 1 => {
1106 let _ = api.request_write_body(id, &[1, 2, 3, 4], None);
1107 },
1108 2 => {
1109 let _ = api.request_write_body(id, &[], None);
1110 },
1111 3 => {
1112 let _ = api.response_wait(&[id], None);
1113 },
1114 4 => {
1115 let _ = api.response_headers(id);
1116 },
1117 5 => {
1118 let mut buf = [0; 512];
1119 let _ = api.response_read_body(id, &mut buf, None);
1120 },
1121 6..=255 => unreachable!(),
1122 }
1123 }
1124 }
1125 }
1126
1127 #[test]
1128 fn shared_http_client_is_only_initialized_on_access() {
1129 let shared_client = SharedClient::new().unwrap();
1130
1131 {
1132 let mock = Arc::new(TestNetwork());
1133 let (mut api, async_api) = AsyncApi::new(mock, false, shared_client.clone());
1134 api.timestamp();
1135
1136 futures::executor::block_on(async move {
1137 assert!(futures::poll!(async_api.process()).is_pending());
1138 });
1139 }
1140
1141 assert!(Lazy::into_value(Arc::try_unwrap(shared_client.0).unwrap()).is_err());
1143
1144 let shared_client = SharedClient::new().unwrap();
1145
1146 {
1147 let mock = Arc::new(TestNetwork());
1148 let (mut api, async_api) = AsyncApi::new(mock, false, shared_client.clone());
1149 let id = api.http_request_start("lol", "nope", &[]).unwrap();
1150 api.http_request_write_body(id, &[], None).unwrap();
1151 futures::executor::block_on(async move {
1152 assert!(futures::poll!(async_api.process()).is_pending());
1153 });
1154 }
1155
1156 assert!(Lazy::into_value(Arc::try_unwrap(shared_client.0).unwrap()).is_ok());
1158 }
1159}