referrerpolicy=no-referrer-when-downgrade

sc_offchain/api/
http.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! This module is composed of two structs: [`HttpApi`] and [`HttpWorker`]. Calling the [`http`]
20//! function returns a pair of [`HttpApi`] and [`HttpWorker`] that share some state.
21//!
22//! The [`HttpApi`] is (indirectly) passed to the runtime when calling an offchain worker, while
23//! the [`HttpWorker`] must be processed in the background. The [`HttpApi`] mimics the API of the
24//! HTTP-related methods available to offchain workers.
25//!
26//! The reason for this design is driven by the fact that HTTP requests should continue running
27//! (i.e.: the socket should continue being processed) in the background even if the runtime isn't
28//! actively calling any function.
29
30use crate::api::timestamp;
31use bytes::buf::{Buf, Reader};
32use fnv::FnvHashMap;
33use futures::{channel::mpsc, future, prelude::*};
34use http_body_util::{combinators::BoxBody, StreamBody};
35use hyper::body::Body as _;
36use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
37use hyper_util::{client::legacy as client, rt::TokioExecutor};
38use once_cell::sync::Lazy;
39use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
40use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Timestamp};
41use std::{
42	fmt,
43	io::Read as _,
44	pin::Pin,
45	sync::Arc,
46	task::{Context, Poll},
47};
48
49const LOG_TARGET: &str = "offchain-worker::http";
50
51pub type Body = BoxBody<hyper::body::Bytes, hyper::Error>;
52
53type Sender = mpsc::Sender<Result<hyper::body::Frame<hyper::body::Bytes>, hyper::Error>>;
54type Receiver = mpsc::Receiver<Result<hyper::body::Frame<hyper::body::Bytes>, hyper::Error>>;
55
56type HyperClient = client::Client<HttpsConnector<client::connect::HttpConnector>, Body>;
57type LazyClient = Lazy<HyperClient, Box<dyn FnOnce() -> HyperClient + Send>>;
58
59/// Wrapper struct used for keeping the hyper_rustls client running.
60#[derive(Clone)]
61pub struct SharedClient(Arc<LazyClient>);
62
63impl SharedClient {
64	pub fn new() -> std::io::Result<Self> {
65		let builder = HttpsConnectorBuilder::new()
66			.with_provider_and_native_roots(rustls::crypto::ring::default_provider())?;
67		Ok(Self(Arc::new(Lazy::new(Box::new(|| {
68			let connector = builder.https_or_http().enable_http1().enable_http2().build();
69			client::Client::builder(TokioExecutor::new()).build(connector)
70		})))))
71	}
72}
73
74/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
75pub fn http(shared_client: SharedClient) -> (HttpApi, HttpWorker) {
76	let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker", 100_000);
77	let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api", 100_000);
78
79	let api = HttpApi {
80		to_worker,
81		from_worker: from_worker.fuse(),
82		// We start with a random ID for the first HTTP request, to prevent mischievous people from
83		// writing runtime code with hardcoded IDs.
84		next_id: HttpRequestId(rand::random::<u16>() % 2000),
85		requests: FnvHashMap::default(),
86	};
87
88	let engine =
89		HttpWorker { to_api, from_api, http_client: shared_client.0, requests: Vec::new() };
90
91	(api, engine)
92}
93
94/// Provides HTTP capabilities.
95///
96/// Since this struct is a helper for offchain workers, its API is mimicking the API provided
97/// to offchain workers.
98pub struct HttpApi {
99	/// Used to sends messages to the worker.
100	to_worker: TracingUnboundedSender<ApiToWorker>,
101	/// Used to receive messages from the worker.
102	/// We use a `Fuse` in order to have an extra protection against panicking.
103	from_worker: stream::Fuse<TracingUnboundedReceiver<WorkerToApi>>,
104	/// Id to assign to the next HTTP request that is started.
105	next_id: HttpRequestId,
106	/// List of HTTP requests in preparation or in progress.
107	requests: FnvHashMap<HttpRequestId, HttpApiRequest>,
108}
109
110/// One active request within `HttpApi`.
111enum HttpApiRequest {
112	/// The request object is being constructed locally and not started yet.
113	NotDispatched(hyper::Request<Body>, Sender),
114	/// The request has been dispatched and we're in the process of sending out the body (if the
115	/// field is `Some`) or waiting for a response (if the field is `None`).
116	Dispatched(Option<Sender>),
117	/// Received a response.
118	Response(HttpApiRequestRp),
119	/// A request has been dispatched but the worker notified us of an error. We report this
120	/// failure to the user as an `IoError` and remove the request from the list as soon as
121	/// possible.
122	Fail(client::Error),
123}
124
125/// A request within `HttpApi` that has received a response.
126struct HttpApiRequestRp {
127	/// We might still be writing the request's body when the response comes.
128	/// This field allows to continue writing that body.
129	sending_body: Option<Sender>,
130	/// Status code of the response.
131	status_code: hyper::StatusCode,
132	/// Headers of the response.
133	headers: hyper::HeaderMap,
134	/// Body of the response, as a channel of `Chunk` objects.
135	/// While the code is designed to drop the `Receiver` once it ends, we wrap it within a
136	/// `Fuse` in order to be extra precautious about panics.
137	/// Elements extracted from the channel are first put into `current_read_chunk`.
138	/// If the channel produces an error, then that is translated into an `IoError` and the request
139	/// is removed from the list.
140	body: stream::Fuse<Receiver>,
141	/// Chunk that has been extracted from the channel and that is currently being read.
142	/// Reading data from the response should read from this field in priority.
143	current_read_chunk: Option<Reader<hyper::body::Bytes>>,
144}
145
146impl HttpApi {
147	/// Mimics the corresponding method in the offchain API.
148	pub fn request_start(&mut self, method: &str, uri: &str) -> Result<HttpRequestId, ()> {
149		// Start by building the prototype of the request.
150		// We do this first so that we don't touch anything in `self` if building the prototype
151		// fails.
152		let (body_sender, receiver) = mpsc::channel(0);
153		let body = StreamBody::new(receiver);
154		let body = BoxBody::new(body);
155		let mut request = hyper::Request::new(body);
156		*request.method_mut() = hyper::Method::from_bytes(method.as_bytes()).map_err(|_| ())?;
157		*request.uri_mut() = hyper::Uri::from_maybe_shared(uri.to_owned()).map_err(|_| ())?;
158
159		let new_id = self.next_id;
160		debug_assert!(!self.requests.contains_key(&new_id));
161		match self.next_id.0.checked_add(1) {
162			Some(new_id) => self.next_id.0 = new_id,
163			None => {
164				tracing::error!(
165					target: LOG_TARGET,
166					"Overflow in offchain worker HTTP request ID assignment"
167				);
168				return Err(());
169			},
170		};
171		self.requests
172			.insert(new_id, HttpApiRequest::NotDispatched(request, body_sender));
173
174		tracing::trace!(
175			target: LOG_TARGET,
176			id = %new_id.0,
177			%method,
178			%uri,
179			"Requested started",
180		);
181
182		Ok(new_id)
183	}
184
185	/// Mimics the corresponding method in the offchain API.
186	pub fn request_add_header(
187		&mut self,
188		request_id: HttpRequestId,
189		name: &str,
190		value: &str,
191	) -> Result<(), ()> {
192		let request = match self.requests.get_mut(&request_id) {
193			Some(&mut HttpApiRequest::NotDispatched(ref mut rq, _)) => rq,
194			_ => return Err(()),
195		};
196
197		let header_name = hyper::header::HeaderName::try_from(name).map_err(drop)?;
198		let header_value = hyper::header::HeaderValue::try_from(value).map_err(drop)?;
199		// Note that we're always appending headers and never replacing old values.
200		// We assume here that the user knows what they're doing.
201		request.headers_mut().append(header_name, header_value);
202
203		tracing::debug!(target: LOG_TARGET, id = %request_id.0, %name, %value, "Added header to request");
204
205		Ok(())
206	}
207
208	/// Mimics the corresponding method in the offchain API.
209	pub fn request_write_body(
210		&mut self,
211		request_id: HttpRequestId,
212		chunk: &[u8],
213		deadline: Option<Timestamp>,
214	) -> Result<(), HttpError> {
215		// Extract the request from the list.
216		// Don't forget to add it back if necessary when returning.
217		let mut request = self.requests.remove(&request_id).ok_or(HttpError::Invalid)?;
218
219		let mut deadline = timestamp::deadline_to_future(deadline);
220		// Closure that writes data to a sender, taking the deadline into account. Can return `Ok`
221		// (if the body has been written), or `DeadlineReached`, or `IoError`.
222		// If `IoError` is returned, don't forget to remove the request from the list.
223		let mut poll_sender = move |sender: &mut Sender| -> Result<(), HttpError> {
224			let mut when_ready = future::maybe_done(future::poll_fn(|cx| sender.poll_ready(cx)));
225			futures::executor::block_on(future::select(&mut when_ready, &mut deadline));
226			match when_ready {
227				future::MaybeDone::Done(Ok(())) => {},
228				future::MaybeDone::Done(Err(_)) => return Err(HttpError::IoError),
229				future::MaybeDone::Future(_) | future::MaybeDone::Gone => {
230					debug_assert!(matches!(deadline, future::MaybeDone::Done(..)));
231					return Err(HttpError::DeadlineReached);
232				},
233			};
234
235			futures::executor::block_on(
236				async {
237					future::poll_fn(|cx|  sender.poll_ready(cx)).await?;
238					sender.start_send(Ok(hyper::body::Frame::data(hyper::body::Bytes::from(chunk.to_owned()))))
239				}
240			)
241			.map_err(|_| {
242				tracing::error!(target: "offchain-worker::http", "HTTP sender refused data despite being ready");
243				HttpError::IoError
244			})
245		};
246
247		loop {
248			request = match request {
249				HttpApiRequest::NotDispatched(request, sender) => {
250					tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Added new body chunk");
251					// If the request is not dispatched yet, dispatch it and loop again.
252					let _ = self
253						.to_worker
254						.unbounded_send(ApiToWorker::Dispatch { id: request_id, request });
255					HttpApiRequest::Dispatched(Some(sender))
256				},
257
258				HttpApiRequest::Dispatched(Some(mut sender)) => {
259					if !chunk.is_empty() {
260						match poll_sender(&mut sender) {
261							Err(HttpError::IoError) => {
262								tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Encountered io error while trying to add new chunk to body");
263								return Err(HttpError::IoError);
264							},
265							other => {
266								tracing::debug!(target: LOG_TARGET, id = %request_id.0, res = ?other, "Added chunk to body");
267								self.requests
268									.insert(request_id, HttpApiRequest::Dispatched(Some(sender)));
269								return other;
270							},
271						}
272					} else {
273						tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Finished writing body");
274
275						// Writing an empty body is a hint that we should stop writing. Dropping
276						// the sender.
277						self.requests.insert(request_id, HttpApiRequest::Dispatched(None));
278						return Ok(());
279					}
280				},
281
282				HttpApiRequest::Response(
283					mut response @ HttpApiRequestRp { sending_body: Some(_), .. },
284				) => {
285					if !chunk.is_empty() {
286						match poll_sender(
287							response
288								.sending_body
289								.as_mut()
290								.expect("Can only enter this match branch if Some; qed"),
291						) {
292							Err(HttpError::IoError) => {
293								tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Encountered io error while trying to add new chunk to body");
294								return Err(HttpError::IoError);
295							},
296							other => {
297								tracing::debug!(target: LOG_TARGET, id = %request_id.0, res = ?other, "Added chunk to body");
298								self.requests
299									.insert(request_id, HttpApiRequest::Response(response));
300								return other;
301							},
302						}
303					} else {
304						tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Finished writing body");
305
306						// Writing an empty body is a hint that we should stop writing. Dropping
307						// the sender.
308						self.requests.insert(
309							request_id,
310							HttpApiRequest::Response(HttpApiRequestRp {
311								sending_body: None,
312								..response
313							}),
314						);
315						return Ok(());
316					}
317				},
318
319				HttpApiRequest::Fail(error) => {
320					tracing::debug!(target: LOG_TARGET, id = %request_id.0, ?error, "Request failed");
321
322					// If the request has already failed, return without putting back the request
323					// in the list.
324					return Err(HttpError::IoError);
325				},
326
327				v @ HttpApiRequest::Dispatched(None) |
328				v @ HttpApiRequest::Response(HttpApiRequestRp { sending_body: None, .. }) => {
329					tracing::debug!(target: LOG_TARGET, id = %request_id.0, "Body sending already finished");
330
331					// We have already finished sending this body.
332					self.requests.insert(request_id, v);
333					return Err(HttpError::Invalid);
334				},
335			}
336		}
337	}
338
339	/// Mimics the corresponding method in the offchain API.
340	pub fn response_wait(
341		&mut self,
342		ids: &[HttpRequestId],
343		deadline: Option<Timestamp>,
344	) -> Vec<HttpRequestStatus> {
345		// First of all, dispatch all the non-dispatched requests and drop all senders so that the
346		// user can't write anymore data.
347		for id in ids {
348			match self.requests.get_mut(id) {
349				Some(HttpApiRequest::NotDispatched(_, _)) => {},
350				Some(HttpApiRequest::Dispatched(sending_body)) |
351				Some(HttpApiRequest::Response(HttpApiRequestRp { sending_body, .. })) => {
352					let _ = sending_body.take();
353					continue;
354				},
355				_ => continue,
356			};
357
358			let (request, _sender) = match self.requests.remove(id) {
359				Some(HttpApiRequest::NotDispatched(rq, s)) => (rq, s),
360				_ => unreachable!("we checked for NotDispatched above; qed"),
361			};
362
363			let _ = self.to_worker.unbounded_send(ApiToWorker::Dispatch { id: *id, request });
364
365			// We also destroy the sender in order to forbid writing more data.
366			self.requests.insert(*id, HttpApiRequest::Dispatched(None));
367		}
368
369		let mut deadline = timestamp::deadline_to_future(deadline);
370
371		loop {
372			// Within that loop, first try to see if we have all the elements for a response.
373			// This includes the situation where the deadline is reached.
374			{
375				let mut output = Vec::with_capacity(ids.len());
376				let mut must_wait_more = false;
377				for id in ids {
378					output.push(match self.requests.get(id) {
379						None => HttpRequestStatus::Invalid,
380						Some(HttpApiRequest::NotDispatched(_, _)) => unreachable!(
381							"we replaced all the NotDispatched with Dispatched earlier; qed"
382						),
383						Some(HttpApiRequest::Dispatched(_)) => {
384							must_wait_more = true;
385							HttpRequestStatus::DeadlineReached
386						},
387						Some(HttpApiRequest::Fail(_)) => HttpRequestStatus::IoError,
388						Some(HttpApiRequest::Response(HttpApiRequestRp {
389							status_code, ..
390						})) => HttpRequestStatus::Finished(status_code.as_u16()),
391					});
392				}
393				debug_assert_eq!(output.len(), ids.len());
394
395				// Are we ready to call `return`?
396				let is_done =
397					if let future::MaybeDone::Done(_) = deadline { true } else { !must_wait_more };
398
399				if is_done {
400					// Requests in "fail" mode are purged before returning.
401					debug_assert_eq!(output.len(), ids.len());
402					for n in (0..ids.len()).rev() {
403						match output[n] {
404							HttpRequestStatus::IoError => {
405								self.requests.remove(&ids[n]);
406							},
407							HttpRequestStatus::Invalid => {
408								tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Unknown request");
409							},
410							HttpRequestStatus::DeadlineReached => {
411								tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Deadline reached");
412							},
413							HttpRequestStatus::Finished(_) => {
414								tracing::debug!(target: LOG_TARGET, id = %ids[n].0, "Request finished");
415							},
416						}
417					}
418					return output;
419				}
420			}
421
422			// Grab next message from the worker. We call `continue` if deadline is reached so that
423			// we loop back and `return`.
424			let next_message = {
425				let mut next_msg = future::maybe_done(self.from_worker.next());
426				futures::executor::block_on(future::select(&mut next_msg, &mut deadline));
427				if let future::MaybeDone::Done(msg) = next_msg {
428					msg
429				} else {
430					debug_assert!(matches!(deadline, future::MaybeDone::Done(..)));
431					continue;
432				}
433			};
434
435			// Update internal state based on received message.
436			match next_message {
437				Some(WorkerToApi::Response { id, status_code, headers, body }) => {
438					match self.requests.remove(&id) {
439						Some(HttpApiRequest::Dispatched(sending_body)) => {
440							self.requests.insert(
441								id,
442								HttpApiRequest::Response(HttpApiRequestRp {
443									sending_body,
444									status_code,
445									headers,
446									body: body.fuse(),
447									current_read_chunk: None,
448								}),
449							);
450						},
451						None => {}, // can happen if we detected an IO error when sending the body
452						_ => {
453							tracing::error!(target: "offchain-worker::http", "State mismatch between the API and worker")
454						},
455					}
456				},
457
458				Some(WorkerToApi::Fail { id, error }) => match self.requests.remove(&id) {
459					Some(HttpApiRequest::Dispatched(_)) => {
460						tracing::debug!(target: LOG_TARGET, id = %id.0, ?error, "Request failed");
461						self.requests.insert(id, HttpApiRequest::Fail(error));
462					},
463					None => {}, // can happen if we detected an IO error when sending the body
464					_ => {
465						tracing::error!(target: "offchain-worker::http", "State mismatch between the API and worker")
466					},
467				},
468
469				None => {
470					tracing::error!(target: "offchain-worker::http", "Worker has crashed");
471					return ids.iter().map(|_| HttpRequestStatus::IoError).collect();
472				},
473			}
474		}
475	}
476
477	/// Mimics the corresponding method in the offchain API.
478	pub fn response_headers(&mut self, request_id: HttpRequestId) -> Vec<(Vec<u8>, Vec<u8>)> {
479		// Do an implicit non-blocking wait on the request.
480		let _ = self.response_wait(&[request_id], Some(timestamp::now()));
481
482		let headers = match self.requests.get(&request_id) {
483			Some(HttpApiRequest::Response(HttpApiRequestRp { headers, .. })) => headers,
484			_ => return Vec::new(),
485		};
486
487		headers
488			.iter()
489			.map(|(name, value)| (name.as_str().as_bytes().to_owned(), value.as_bytes().to_owned()))
490			.collect()
491	}
492
493	/// Mimics the corresponding method in the offchain API.
494	pub fn response_read_body(
495		&mut self,
496		request_id: HttpRequestId,
497		buffer: &mut [u8],
498		deadline: Option<Timestamp>,
499	) -> Result<usize, HttpError> {
500		// Do an implicit wait on the request.
501		let _ = self.response_wait(&[request_id], deadline);
502
503		// Remove the request from the list and handle situations where the request is invalid or
504		// in the wrong state.
505		let mut response = match self.requests.remove(&request_id) {
506			Some(HttpApiRequest::Response(r)) => r,
507			// Because we called `response_wait` above, we know that the deadline has been reached
508			// and we still haven't received a response.
509			Some(rq @ HttpApiRequest::Dispatched(_)) => {
510				self.requests.insert(request_id, rq);
511				return Err(HttpError::DeadlineReached);
512			},
513			// The request has failed.
514			Some(HttpApiRequest::Fail { .. }) => return Err(HttpError::IoError),
515			// Request hasn't been dispatched yet; reading the body is invalid.
516			Some(rq @ HttpApiRequest::NotDispatched(_, _)) => {
517				self.requests.insert(request_id, rq);
518				return Err(HttpError::Invalid);
519			},
520			None => return Err(HttpError::Invalid),
521		};
522
523		// Convert the deadline into a `Future` that resolves when the deadline is reached.
524		let mut deadline = timestamp::deadline_to_future(deadline);
525
526		loop {
527			// First read from `current_read_chunk`.
528			if let Some(mut current_read_chunk) = response.current_read_chunk.take() {
529				match current_read_chunk.read(buffer) {
530					Ok(0) => {},
531					Ok(n) => {
532						self.requests.insert(
533							request_id,
534							HttpApiRequest::Response(HttpApiRequestRp {
535								current_read_chunk: Some(current_read_chunk),
536								..response
537							}),
538						);
539						return Ok(n);
540					},
541					Err(err) => {
542						// This code should never be reached unless there's a logic error somewhere.
543						tracing::error!(target: "offchain-worker::http", "Failed to read from current read chunk: {:?}", err);
544						return Err(HttpError::IoError);
545					},
546				}
547			}
548
549			// If we reach here, that means the `current_read_chunk` is empty and needs to be
550			// filled with a new chunk from `body`. We block on either the next body or the
551			// deadline.
552			let mut next_body = future::maybe_done(response.body.next());
553			futures::executor::block_on(future::select(&mut next_body, &mut deadline));
554
555			if let future::MaybeDone::Done(next_body) = next_body {
556				match next_body {
557					Some(Ok(chunk)) =>
558						if let Ok(chunk) = chunk.into_data() {
559							response.current_read_chunk = Some(chunk.reader());
560						},
561					Some(Err(_)) => return Err(HttpError::IoError),
562					None => return Ok(0), // eof
563				}
564			}
565
566			if let future::MaybeDone::Done(_) = deadline {
567				self.requests.insert(request_id, HttpApiRequest::Response(response));
568				return Err(HttpError::DeadlineReached);
569			}
570		}
571	}
572}
573
574impl fmt::Debug for HttpApi {
575	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
576		f.debug_list().entries(self.requests.iter()).finish()
577	}
578}
579
580impl fmt::Debug for HttpApiRequest {
581	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
582		match self {
583			HttpApiRequest::NotDispatched(_, _) =>
584				f.debug_tuple("HttpApiRequest::NotDispatched").finish(),
585			HttpApiRequest::Dispatched(_) => f.debug_tuple("HttpApiRequest::Dispatched").finish(),
586			HttpApiRequest::Response(HttpApiRequestRp { status_code, headers, .. }) => f
587				.debug_tuple("HttpApiRequest::Response")
588				.field(status_code)
589				.field(headers)
590				.finish(),
591			HttpApiRequest::Fail(err) => f.debug_tuple("HttpApiRequest::Fail").field(err).finish(),
592		}
593	}
594}
595
596/// Message send from the API to the worker.
597enum ApiToWorker {
598	/// Dispatches a new HTTP request.
599	Dispatch {
600		/// ID to send back when the response comes back.
601		id: HttpRequestId,
602		/// Request to start executing.
603		request: hyper::Request<Body>,
604	},
605}
606
607/// Message send from the API to the worker.
608enum WorkerToApi {
609	/// A request has succeeded.
610	Response {
611		/// The ID that was passed to the worker.
612		id: HttpRequestId,
613		/// Status code of the response.
614		status_code: hyper::StatusCode,
615		/// Headers of the response.
616		headers: hyper::HeaderMap,
617		/// Body of the response, as a channel of `Chunk` objects.
618		/// We send the body back through a channel instead of returning the hyper `Body` object
619		/// because we don't want the `HttpApi` to have to drive the reading.
620		/// Instead, reading an item from the channel will notify the worker task, which will push
621		/// the next item.
622		/// Can also be used to send an error, in case an error happened on the HTTP socket. After
623		/// an error is sent, the channel will close.
624		body: Receiver,
625	},
626	/// A request has failed because of an error. The request is then no longer valid.
627	Fail {
628		/// The ID that was passed to the worker.
629		id: HttpRequestId,
630		/// Error that happened.
631		error: client::Error,
632	},
633}
634
635/// Must be continuously polled for the [`HttpApi`] to properly work.
636pub struct HttpWorker {
637	/// Used to sends messages to the `HttpApi`.
638	to_api: TracingUnboundedSender<WorkerToApi>,
639	/// Used to receive messages from the `HttpApi`.
640	from_api: TracingUnboundedReceiver<ApiToWorker>,
641	/// The engine that runs HTTP requests.
642	http_client: Arc<LazyClient>,
643	/// HTTP requests that are being worked on by the engine.
644	requests: Vec<(HttpRequestId, HttpWorkerRequest)>,
645}
646
647/// HTTP request being processed by the worker.
648enum HttpWorkerRequest {
649	/// Request has been dispatched and is waiting for a response from the Internet.
650	Dispatched(client::ResponseFuture),
651	/// Progressively reading the body of the response and sending it to the channel.
652	ReadBody {
653		/// Body to read `Chunk`s from. Only used if the channel is ready to accept data.
654		body: Body,
655		/// Channel to the [`HttpApi`] where we send the chunks to.
656		tx: Sender,
657	},
658}
659
660impl Future for HttpWorker {
661	type Output = ();
662
663	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
664		// Reminder: this is continuously run in the background.
665
666		// We use a `me` variable because the compiler isn't smart enough to allow borrowing
667		// multiple fields at once through a `Deref`.
668		let me = &mut *self;
669
670		// We remove each element from `requests` one by one and add them back only if necessary.
671		for n in (0..me.requests.len()).rev() {
672			let (id, request) = me.requests.swap_remove(n);
673			match request {
674				HttpWorkerRequest::Dispatched(mut future) => {
675					// Check for an HTTP response from the Internet.
676					let response = match Future::poll(Pin::new(&mut future), cx) {
677						Poll::Pending => {
678							me.requests.push((id, HttpWorkerRequest::Dispatched(future)));
679							continue;
680						},
681						Poll::Ready(Ok(response)) => response,
682						Poll::Ready(Err(error)) => {
683							let _ = me.to_api.unbounded_send(WorkerToApi::Fail { id, error });
684							continue; // don't insert the request back
685						},
686					};
687
688					// We received a response! Decompose it into its parts.
689					let (head, body) = response.into_parts();
690					let (status_code, headers) = (head.status, head.headers);
691
692					let (body_tx, body_rx) = mpsc::channel(3);
693					let _ = me.to_api.unbounded_send(WorkerToApi::Response {
694						id,
695						status_code,
696						headers,
697						body: body_rx,
698					});
699
700					me.requests.push((
701						id,
702						HttpWorkerRequest::ReadBody { body: Body::new(body), tx: body_tx },
703					));
704					cx.waker().wake_by_ref(); // reschedule in order to poll the new future
705					continue;
706				},
707
708				HttpWorkerRequest::ReadBody { mut body, mut tx } => {
709					// Before reading from the HTTP response, check that `tx` is ready to accept
710					// a new chunk.
711					match tx.poll_ready(cx) {
712						Poll::Ready(Ok(())) => {},
713						Poll::Ready(Err(_)) => continue, // don't insert the request back
714						Poll::Pending => {
715							me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx }));
716							continue;
717						},
718					}
719
720					match Pin::new(&mut body).poll_frame(cx) {
721						Poll::Ready(Some(Ok(chunk))) => {
722							let _ = tx.start_send(Ok(chunk));
723							me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx }));
724							cx.waker().wake_by_ref(); // reschedule in order to continue reading
725						},
726						Poll::Ready(Some(Err(err))) => {
727							let _ = tx.start_send(Err(err));
728							// don't insert the request back
729						},
730						Poll::Ready(None) => {}, // EOF; don't insert the request back
731						Poll::Pending => {
732							me.requests.push((id, HttpWorkerRequest::ReadBody { body, tx }));
733						},
734					}
735				},
736			}
737		}
738
739		// Check for messages coming from the [`HttpApi`].
740		match Stream::poll_next(Pin::new(&mut me.from_api), cx) {
741			Poll::Pending => {},
742			Poll::Ready(None) => return Poll::Ready(()), // stops the worker
743			Poll::Ready(Some(ApiToWorker::Dispatch { id, request })) => {
744				let future = me.http_client.request(request);
745				debug_assert!(me.requests.iter().all(|(i, _)| *i != id));
746				me.requests.push((id, HttpWorkerRequest::Dispatched(future)));
747				cx.waker().wake_by_ref(); // reschedule the task to poll the request
748			},
749		}
750
751		Poll::Pending
752	}
753}
754
755impl fmt::Debug for HttpWorker {
756	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
757		f.debug_list().entries(self.requests.iter()).finish()
758	}
759}
760
761impl fmt::Debug for HttpWorkerRequest {
762	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
763		match self {
764			HttpWorkerRequest::Dispatched(_) =>
765				f.debug_tuple("HttpWorkerRequest::Dispatched").finish(),
766			HttpWorkerRequest::ReadBody { .. } =>
767				f.debug_tuple("HttpWorkerRequest::Response").finish(),
768		}
769	}
770}
771
772#[cfg(test)]
773mod tests {
774	use super::{
775		super::{tests::TestNetwork, AsyncApi},
776		*,
777	};
778	use crate::api::timestamp;
779	use core::convert::Infallible;
780	use futures::future;
781	use http_body_util::BodyExt;
782	use sp_core::offchain::{Duration, Externalities, HttpError, HttpRequestId, HttpRequestStatus};
783	use std::sync::LazyLock;
784
785	// Using LazyLock to avoid spawning lots of different SharedClients,
786	// as spawning a SharedClient is CPU-intensive and opens lots of fds.
787	static SHARED_CLIENT: LazyLock<SharedClient> = LazyLock::new(|| SharedClient::new().unwrap());
788
789	// Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP
790	// server that runs in the background as well.
791	macro_rules! build_api_server {
792		() => {
793			build_api_server!(hyper::Response::new(http_body_util::Full::new(
794				hyper::body::Bytes::from("Hello World!")
795			)))
796		};
797		( $response:expr ) => {{
798			let hyper_client = SHARED_CLIENT.clone();
799			let (api, worker) = http(hyper_client.clone());
800
801			let (addr_tx, addr_rx) = std::sync::mpsc::channel();
802			std::thread::spawn(move || {
803				let rt = tokio::runtime::Runtime::new().unwrap();
804				let worker = rt.spawn(worker);
805				let server = rt.spawn(async move {
806					let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 0));
807					let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
808					let _ = addr_tx.send(listener.local_addr().unwrap());
809					loop {
810						let (stream, _) = listener.accept().await.unwrap();
811						let io = hyper_util::rt::TokioIo::new(stream);
812						tokio::task::spawn(async move {
813							if let Err(err) = hyper::server::conn::http1::Builder::new()
814								.serve_connection(
815									io,
816									hyper::service::service_fn(
817										move |req: hyper::Request<hyper::body::Incoming>| async move {
818											// Wait until the complete request was received and
819											// processed, otherwise the tests are flaky.
820											let _ = req.into_body().collect().await;
821
822											Ok::<_, Infallible>($response)
823										},
824									),
825								)
826								.await
827							{
828								eprintln!("Error serving connection: {:?}", err);
829							}
830						});
831					}
832				});
833				let _ = rt.block_on(future::join(worker, server));
834			});
835			(api, addr_rx.recv().unwrap())
836		}};
837	}
838
839	#[test]
840	fn basic_localhost() {
841		let deadline = timestamp::now().add(Duration::from_millis(10_000));
842
843		// Performs an HTTP query to a background HTTP server.
844
845		let (mut api, addr) = build_api_server!();
846
847		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
848		api.request_write_body(id, &[], Some(deadline)).unwrap();
849
850		match api.response_wait(&[id], Some(deadline))[0] {
851			HttpRequestStatus::Finished(200) => {},
852			v => panic!("Connecting to localhost failed: {:?}", v),
853		}
854
855		let headers = api.response_headers(id);
856		assert!(headers.iter().any(|(h, _)| h.eq_ignore_ascii_case(b"Date")));
857
858		let mut buf = vec![0; 2048];
859		let n = api.response_read_body(id, &mut buf, Some(deadline)).unwrap();
860		assert_eq!(&buf[..n], b"Hello World!");
861	}
862
863	#[test]
864	fn basic_http2_localhost() {
865		let deadline = timestamp::now().add(Duration::from_millis(10_000));
866
867		// Performs an HTTP query to a background HTTP server.
868
869		let (mut api, addr) = build_api_server!(hyper::Response::builder()
870			.version(hyper::Version::HTTP_2)
871			.body(http_body_util::Full::new(hyper::body::Bytes::from("Hello World!")))
872			.unwrap());
873
874		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
875		api.request_write_body(id, &[], Some(deadline)).unwrap();
876
877		match api.response_wait(&[id], Some(deadline))[0] {
878			HttpRequestStatus::Finished(200) => {},
879			v => panic!("Connecting to localhost failed: {:?}", v),
880		}
881
882		let headers = api.response_headers(id);
883		assert!(headers.iter().any(|(h, _)| h.eq_ignore_ascii_case(b"Date")));
884
885		let mut buf = vec![0; 2048];
886		let n = api.response_read_body(id, &mut buf, Some(deadline)).unwrap();
887		assert_eq!(&buf[..n], b"Hello World!");
888	}
889
890	#[test]
891	fn request_start_invalid_call() {
892		let (mut api, addr) = build_api_server!();
893
894		match api.request_start("\0", &format!("http://{}", addr)) {
895			Err(()) => {},
896			Ok(_) => panic!(),
897		};
898
899		match api.request_start("GET", "http://\0localhost") {
900			Err(()) => {},
901			Ok(_) => panic!(),
902		};
903	}
904
905	#[test]
906	fn request_add_header_invalid_call() {
907		let (mut api, addr) = build_api_server!();
908
909		match api.request_add_header(HttpRequestId(0xdead), "Foo", "bar") {
910			Err(()) => {},
911			Ok(_) => panic!(),
912		};
913
914		let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
915		match api.request_add_header(id, "\0", "bar") {
916			Err(()) => {},
917			Ok(_) => panic!(),
918		};
919
920		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
921		match api.request_add_header(id, "Foo", "\0") {
922			Err(()) => {},
923			Ok(_) => panic!(),
924		};
925
926		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
927		api.request_add_header(id, "Foo", "Bar").unwrap();
928		api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
929		match api.request_add_header(id, "Foo2", "Bar") {
930			Err(()) => {},
931			Ok(_) => panic!(),
932		};
933
934		let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
935		api.response_headers(id);
936		match api.request_add_header(id, "Foo2", "Bar") {
937			Err(()) => {},
938			Ok(_) => panic!(),
939		};
940
941		let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
942		api.response_read_body(id, &mut [], None).unwrap();
943		match api.request_add_header(id, "Foo2", "Bar") {
944			Err(()) => {},
945			Ok(_) => panic!(),
946		};
947	}
948
949	#[test]
950	fn request_write_body_invalid_call() {
951		let (mut api, addr) = build_api_server!();
952
953		match api.request_write_body(HttpRequestId(0xdead), &[1, 2, 3], None) {
954			Err(HttpError::Invalid) => {},
955			_ => panic!(),
956		};
957
958		match api.request_write_body(HttpRequestId(0xdead), &[], None) {
959			Err(HttpError::Invalid) => {},
960			_ => panic!(),
961		};
962
963		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
964		api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
965		api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
966		api.request_write_body(id, &[], None).unwrap();
967		match api.request_write_body(id, &[], None) {
968			Err(HttpError::Invalid) => {},
969			_ => panic!(),
970		};
971
972		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
973		api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
974		api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
975		api.request_write_body(id, &[], None).unwrap();
976		match api.request_write_body(id, &[1, 2, 3, 4], None) {
977			Err(HttpError::Invalid) => {},
978			_ => panic!(),
979		};
980
981		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
982		api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
983		api.response_wait(&[id], None);
984		match api.request_write_body(id, &[], None) {
985			Err(HttpError::Invalid) => {},
986			_ => panic!(),
987		};
988
989		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
990		api.request_write_body(id, &[1, 2, 3, 4], None).unwrap();
991		api.response_wait(&[id], None);
992		match api.request_write_body(id, &[1, 2, 3, 4], None) {
993			Err(HttpError::Invalid) => {},
994			_ => panic!(),
995		};
996
997		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
998		api.response_headers(id);
999		match api.request_write_body(id, &[1, 2, 3, 4], None) {
1000			Err(HttpError::Invalid) => {},
1001			_ => panic!(),
1002		};
1003
1004		let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
1005		api.response_headers(id);
1006		match api.request_write_body(id, &[], None) {
1007			Err(HttpError::Invalid) => {},
1008			_ => panic!(),
1009		};
1010
1011		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1012		api.response_read_body(id, &mut [], None).unwrap();
1013		match api.request_write_body(id, &[1, 2, 3, 4], None) {
1014			Err(HttpError::Invalid) => {},
1015			_ => panic!(),
1016		};
1017
1018		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1019		api.response_read_body(id, &mut [], None).unwrap();
1020		match api.request_write_body(id, &[], None) {
1021			Err(HttpError::Invalid) => {},
1022			_ => panic!(),
1023		};
1024	}
1025
1026	#[test]
1027	fn response_headers_invalid_call() {
1028		let (mut api, addr) = build_api_server!();
1029		assert_eq!(api.response_headers(HttpRequestId(0xdead)), &[]);
1030
1031		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1032		assert_eq!(api.response_headers(id), &[]);
1033
1034		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1035		api.request_write_body(id, &[], None).unwrap();
1036		while api.response_headers(id).is_empty() {
1037			std::thread::sleep(std::time::Duration::from_millis(100));
1038		}
1039
1040		let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
1041		api.response_wait(&[id], None);
1042		assert_ne!(api.response_headers(id), &[]);
1043
1044		let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
1045		let mut buf = [0; 128];
1046		while api.response_read_body(id, &mut buf, None).unwrap() != 0 {}
1047		assert_eq!(api.response_headers(id), &[]);
1048	}
1049
1050	#[test]
1051	fn response_header_invalid_call() {
1052		let (mut api, addr) = build_api_server!();
1053
1054		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1055		assert_eq!(api.response_headers(id), &[]);
1056
1057		let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1058		api.request_add_header(id, "Foo", "Bar").unwrap();
1059		assert_eq!(api.response_headers(id), &[]);
1060
1061		let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
1062		api.request_add_header(id, "Foo", "Bar").unwrap();
1063		api.request_write_body(id, &[], None).unwrap();
1064		// Note: this test actually sends out the request, and is supposed to test a situation
1065		// where we haven't received any response yet. This test can theoretically fail if the
1066		// HTTP response comes back faster than the kernel schedules our thread, but that is highly
1067		// unlikely.
1068		assert_eq!(api.response_headers(id), &[]);
1069	}
1070
1071	#[test]
1072	fn response_read_body_invalid_call() {
1073		let (mut api, addr) = build_api_server!();
1074		let mut buf = [0; 512];
1075
1076		match api.response_read_body(HttpRequestId(0xdead), &mut buf, None) {
1077			Err(HttpError::Invalid) => {},
1078			_ => panic!(),
1079		}
1080
1081		let id = api.request_start("GET", &format!("http://{}", addr)).unwrap();
1082		while api.response_read_body(id, &mut buf, None).unwrap() != 0 {}
1083		match api.response_read_body(id, &mut buf, None) {
1084			Err(HttpError::Invalid) => {},
1085			_ => panic!(),
1086		}
1087	}
1088
1089	#[test]
1090	fn fuzzing() {
1091		// Uses the API in random ways to try to trigger panics.
1092		// Doesn't test some paths, such as waiting for multiple requests. Also doesn't test what
1093		// happens if the server force-closes our socket.
1094
1095		let (mut api, addr) = build_api_server!();
1096
1097		for _ in 0..50 {
1098			let id = api.request_start("POST", &format!("http://{}", addr)).unwrap();
1099
1100			for _ in 0..250 {
1101				match rand::random::<u8>() % 6 {
1102					0 => {
1103						let _ = api.request_add_header(id, "Foo", "Bar");
1104					},
1105					1 => {
1106						let _ = api.request_write_body(id, &[1, 2, 3, 4], None);
1107					},
1108					2 => {
1109						let _ = api.request_write_body(id, &[], None);
1110					},
1111					3 => {
1112						let _ = api.response_wait(&[id], None);
1113					},
1114					4 => {
1115						let _ = api.response_headers(id);
1116					},
1117					5 => {
1118						let mut buf = [0; 512];
1119						let _ = api.response_read_body(id, &mut buf, None);
1120					},
1121					6..=255 => unreachable!(),
1122				}
1123			}
1124		}
1125	}
1126
1127	#[test]
1128	fn shared_http_client_is_only_initialized_on_access() {
1129		let shared_client = SharedClient::new().unwrap();
1130
1131		{
1132			let mock = Arc::new(TestNetwork());
1133			let (mut api, async_api) = AsyncApi::new(mock, false, shared_client.clone());
1134			api.timestamp();
1135
1136			futures::executor::block_on(async move {
1137				assert!(futures::poll!(async_api.process()).is_pending());
1138			});
1139		}
1140
1141		// Check that the http client wasn't initialized, because it wasn't used.
1142		assert!(Lazy::into_value(Arc::try_unwrap(shared_client.0).unwrap()).is_err());
1143
1144		let shared_client = SharedClient::new().unwrap();
1145
1146		{
1147			let mock = Arc::new(TestNetwork());
1148			let (mut api, async_api) = AsyncApi::new(mock, false, shared_client.clone());
1149			let id = api.http_request_start("lol", "nope", &[]).unwrap();
1150			api.http_request_write_body(id, &[], None).unwrap();
1151			futures::executor::block_on(async move {
1152				assert!(futures::poll!(async_api.process()).is_pending());
1153			});
1154		}
1155
1156		// Check that the http client initialized, because it was used.
1157		assert!(Lazy::into_value(Arc::try_unwrap(shared_client.0).unwrap()).is_ok());
1158	}
1159}