hyper/client/conn/
http1.rs

1//! HTTP/1 client connections
2
3use std::error::Error as StdError;
4use std::fmt;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use crate::rt::{Read, Write};
10use bytes::Bytes;
11use futures_util::ready;
12use http::{Request, Response};
13use httparse::ParserConfig;
14
15use super::super::dispatch::{self, TrySendError};
16use crate::body::{Body, Incoming as IncomingBody};
17use crate::proto;
18
19type Dispatcher<T, B> =
20    proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
21
22/// The sender side of an established connection.
23pub struct SendRequest<B> {
24    dispatch: dispatch::Sender<Request<B>, Response<IncomingBody>>,
25}
26
27/// Deconstructed parts of a `Connection`.
28///
29/// This allows taking apart a `Connection` at a later time, in order to
30/// reclaim the IO object, and additional related pieces.
31#[derive(Debug)]
32#[non_exhaustive]
33pub struct Parts<T> {
34    /// The original IO object used in the handshake.
35    pub io: T,
36    /// A buffer of bytes that have been read but not processed as HTTP.
37    ///
38    /// For instance, if the `Connection` is used for an HTTP upgrade request,
39    /// it is possible the server sent back the first bytes of the new protocol
40    /// along with the response upgrade.
41    ///
42    /// You will want to check for any existing bytes if you plan to continue
43    /// communicating on the IO object.
44    pub read_buf: Bytes,
45}
46
47/// A future that processes all HTTP state for the IO object.
48///
49/// In most cases, this should just be spawned into an executor, so that it
50/// can process incoming and outgoing messages, notice hangups, and the like.
51#[must_use = "futures do nothing unless polled"]
52pub struct Connection<T, B>
53where
54    T: Read + Write,
55    B: Body + 'static,
56{
57    inner: Dispatcher<T, B>,
58}
59
60impl<T, B> Connection<T, B>
61where
62    T: Read + Write + Unpin,
63    B: Body + 'static,
64    B::Error: Into<Box<dyn StdError + Send + Sync>>,
65{
66    /// Return the inner IO object, and additional information.
67    ///
68    /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
69    pub fn into_parts(self) -> Parts<T> {
70        let (io, read_buf, _) = self.inner.into_inner();
71        Parts { io, read_buf }
72    }
73
74    /// Poll the connection for completion, but without calling `shutdown`
75    /// on the underlying IO.
76    ///
77    /// This is useful to allow running a connection while doing an HTTP
78    /// upgrade. Once the upgrade is completed, the connection would be "done",
79    /// but it is not desired to actually shutdown the IO object. Instead you
80    /// would take it back using `into_parts`.
81    ///
82    /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
83    /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
84    /// to work with this function; or use the `without_shutdown` wrapper.
85    pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
86        self.inner.poll_without_shutdown(cx)
87    }
88
89    /// Prevent shutdown of the underlying IO object at the end of service the request,
90    /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
91    pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
92        let mut conn = Some(self);
93        futures_util::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
94            ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
95            Poll::Ready(Ok(conn.take().unwrap().into_parts()))
96        })
97        .await
98    }
99}
100
101/// A builder to configure an HTTP connection.
102///
103/// After setting options, the builder is used to create a handshake future.
104///
105/// **Note**: The default values of options are *not considered stable*. They
106/// are subject to change at any time.
107#[derive(Clone, Debug)]
108pub struct Builder {
109    h09_responses: bool,
110    h1_parser_config: ParserConfig,
111    h1_writev: Option<bool>,
112    h1_title_case_headers: bool,
113    h1_preserve_header_case: bool,
114    h1_max_headers: Option<usize>,
115    #[cfg(feature = "ffi")]
116    h1_preserve_header_order: bool,
117    h1_read_buf_exact_size: Option<usize>,
118    h1_max_buf_size: Option<usize>,
119}
120
121/// Returns a handshake future over some IO.
122///
123/// This is a shortcut for `Builder::new().handshake(io)`.
124/// See [`client::conn`](crate::client::conn) for more.
125pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
126where
127    T: Read + Write + Unpin,
128    B: Body + 'static,
129    B::Data: Send,
130    B::Error: Into<Box<dyn StdError + Send + Sync>>,
131{
132    Builder::new().handshake(io).await
133}
134
135// ===== impl SendRequest
136
137impl<B> SendRequest<B> {
138    /// Polls to determine whether this sender can be used yet for a request.
139    ///
140    /// If the associated connection is closed, this returns an Error.
141    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
142        self.dispatch.poll_ready(cx)
143    }
144
145    /// Waits until the dispatcher is ready
146    ///
147    /// If the associated connection is closed, this returns an Error.
148    pub async fn ready(&mut self) -> crate::Result<()> {
149        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
150    }
151
152    /// Checks if the connection is currently ready to send a request.
153    ///
154    /// # Note
155    ///
156    /// This is mostly a hint. Due to inherent latency of networks, it is
157    /// possible that even after checking this is ready, sending a request
158    /// may still fail because the connection was closed in the meantime.
159    pub fn is_ready(&self) -> bool {
160        self.dispatch.is_ready()
161    }
162
163    /// Checks if the connection side has been closed.
164    pub fn is_closed(&self) -> bool {
165        self.dispatch.is_closed()
166    }
167}
168
169impl<B> SendRequest<B>
170where
171    B: Body + 'static,
172{
173    /// Sends a `Request` on the associated connection.
174    ///
175    /// Returns a future that if successful, yields the `Response`.
176    ///
177    /// `req` must have a `Host` header.
178    ///
179    /// # Uri
180    ///
181    /// The `Uri` of the request is serialized as-is.
182    ///
183    /// - Usually you want origin-form (`/path?query`).
184    /// - For sending to an HTTP proxy, you want to send in absolute-form
185    ///   (`https://hyper.rs/guides`).
186    ///
187    /// This is however not enforced or validated and it is up to the user
188    /// of this method to ensure the `Uri` is correct for their intended purpose.
189    pub fn send_request(
190        &mut self,
191        req: Request<B>,
192    ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
193        let sent = self.dispatch.send(req);
194
195        async move {
196            match sent {
197                Ok(rx) => match rx.await {
198                    Ok(Ok(resp)) => Ok(resp),
199                    Ok(Err(err)) => Err(err),
200                    // this is definite bug if it happens, but it shouldn't happen!
201                    Err(_canceled) => panic!("dispatch dropped without returning error"),
202                },
203                Err(_req) => {
204                    debug!("connection was not ready");
205                    Err(crate::Error::new_canceled().with("connection was not ready"))
206                }
207            }
208        }
209    }
210
211    /// Sends a `Request` on the associated connection.
212    ///
213    /// Returns a future that if successful, yields the `Response`.
214    ///
215    /// # Error
216    ///
217    /// If there was an error before trying to serialize the request to the
218    /// connection, the message will be returned as part of this error.
219    pub fn try_send_request(
220        &mut self,
221        req: Request<B>,
222    ) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
223        let sent = self.dispatch.try_send(req);
224        async move {
225            match sent {
226                Ok(rx) => match rx.await {
227                    Ok(Ok(res)) => Ok(res),
228                    Ok(Err(err)) => Err(err),
229                    // this is definite bug if it happens, but it shouldn't happen!
230                    Err(_) => panic!("dispatch dropped without returning error"),
231                },
232                Err(req) => {
233                    debug!("connection was not ready");
234                    let error = crate::Error::new_canceled().with("connection was not ready");
235                    Err(TrySendError {
236                        error,
237                        message: Some(req),
238                    })
239                }
240            }
241        }
242    }
243}
244
245impl<B> fmt::Debug for SendRequest<B> {
246    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247        f.debug_struct("SendRequest").finish()
248    }
249}
250
251// ===== impl Connection
252
253impl<T, B> Connection<T, B>
254where
255    T: Read + Write + Unpin + Send,
256    B: Body + 'static,
257    B::Error: Into<Box<dyn StdError + Send + Sync>>,
258{
259    /// Enable this connection to support higher-level HTTP upgrades.
260    ///
261    /// See [the `upgrade` module](crate::upgrade) for more.
262    pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
263        upgrades::UpgradeableConnection { inner: Some(self) }
264    }
265}
266
267impl<T, B> fmt::Debug for Connection<T, B>
268where
269    T: Read + Write + fmt::Debug,
270    B: Body + 'static,
271{
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        f.debug_struct("Connection").finish()
274    }
275}
276
277impl<T, B> Future for Connection<T, B>
278where
279    T: Read + Write + Unpin,
280    B: Body + 'static,
281    B::Data: Send,
282    B::Error: Into<Box<dyn StdError + Send + Sync>>,
283{
284    type Output = crate::Result<()>;
285
286    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
287        match ready!(Pin::new(&mut self.inner).poll(cx))? {
288            proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
289            proto::Dispatched::Upgrade(pending) => {
290                // With no `Send` bound on `I`, we can't try to do
291                // upgrades here. In case a user was trying to use
292                // `upgrade` with this API, send a special
293                // error letting them know about that.
294                pending.manual();
295                Poll::Ready(Ok(()))
296            }
297        }
298    }
299}
300
301// ===== impl Builder
302
303impl Builder {
304    /// Creates a new connection builder.
305    #[inline]
306    pub fn new() -> Builder {
307        Builder {
308            h09_responses: false,
309            h1_writev: None,
310            h1_read_buf_exact_size: None,
311            h1_parser_config: Default::default(),
312            h1_title_case_headers: false,
313            h1_preserve_header_case: false,
314            h1_max_headers: None,
315            #[cfg(feature = "ffi")]
316            h1_preserve_header_order: false,
317            h1_max_buf_size: None,
318        }
319    }
320
321    /// Set whether HTTP/0.9 responses should be tolerated.
322    ///
323    /// Default is false.
324    pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
325        self.h09_responses = enabled;
326        self
327    }
328
329    /// Set whether HTTP/1 connections will accept spaces between header names
330    /// and the colon that follow them in responses.
331    ///
332    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
333    /// to say about it:
334    ///
335    /// > No whitespace is allowed between the header field-name and colon. In
336    /// > the past, differences in the handling of such whitespace have led to
337    /// > security vulnerabilities in request routing and response handling. A
338    /// > server MUST reject any received request message that contains
339    /// > whitespace between a header field-name and colon with a response code
340    /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
341    /// > response message before forwarding the message downstream.
342    ///
343    /// Default is false.
344    ///
345    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
346    pub fn allow_spaces_after_header_name_in_responses(&mut self, enabled: bool) -> &mut Builder {
347        self.h1_parser_config
348            .allow_spaces_after_header_name_in_responses(enabled);
349        self
350    }
351
352    /// Set whether HTTP/1 connections will accept obsolete line folding for
353    /// header values.
354    ///
355    /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
356    /// parsing.
357    ///
358    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
359    /// to say about it:
360    ///
361    /// > A server that receives an obs-fold in a request message that is not
362    /// > within a message/http container MUST either reject the message by
363    /// > sending a 400 (Bad Request), preferably with a representation
364    /// > explaining that obsolete line folding is unacceptable, or replace
365    /// > each received obs-fold with one or more SP octets prior to
366    /// > interpreting the field value or forwarding the message downstream.
367    ///
368    /// > A proxy or gateway that receives an obs-fold in a response message
369    /// > that is not within a message/http container MUST either discard the
370    /// > message and replace it with a 502 (Bad Gateway) response, preferably
371    /// > with a representation explaining that unacceptable line folding was
372    /// > received, or replace each received obs-fold with one or more SP
373    /// > octets prior to interpreting the field value or forwarding the
374    /// > message downstream.
375    ///
376    /// > A user agent that receives an obs-fold in a response message that is
377    /// > not within a message/http container MUST replace each received
378    /// > obs-fold with one or more SP octets prior to interpreting the field
379    /// > value.
380    ///
381    /// Default is false.
382    ///
383    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
384    pub fn allow_obsolete_multiline_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
385        self.h1_parser_config
386            .allow_obsolete_multiline_headers_in_responses(enabled);
387        self
388    }
389
390    /// Set whether HTTP/1 connections will silently ignored malformed header lines.
391    ///
392    /// If this is enabled and a header line does not start with a valid header
393    /// name, or does not include a colon at all, the line will be silently ignored
394    /// and no error will be reported.
395    ///
396    /// Default is false.
397    pub fn ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
398        self.h1_parser_config
399            .ignore_invalid_headers_in_responses(enabled);
400        self
401    }
402
403    /// Set whether HTTP/1 connections should try to use vectored writes,
404    /// or always flatten into a single buffer.
405    ///
406    /// Note that setting this to false may mean more copies of body data,
407    /// but may also improve performance when an IO transport doesn't
408    /// support vectored writes well, such as most TLS implementations.
409    ///
410    /// Setting this to true will force hyper to use queued strategy
411    /// which may eliminate unnecessary cloning on some TLS backends
412    ///
413    /// Default is `auto`. In this mode hyper will try to guess which
414    /// mode to use
415    pub fn writev(&mut self, enabled: bool) -> &mut Builder {
416        self.h1_writev = Some(enabled);
417        self
418    }
419
420    /// Set whether HTTP/1 connections will write header names as title case at
421    /// the socket level.
422    ///
423    /// Default is false.
424    pub fn title_case_headers(&mut self, enabled: bool) -> &mut Builder {
425        self.h1_title_case_headers = enabled;
426        self
427    }
428
429    /// Set whether to support preserving original header cases.
430    ///
431    /// Currently, this will record the original cases received, and store them
432    /// in a private extension on the `Response`. It will also look for and use
433    /// such an extension in any provided `Request`.
434    ///
435    /// Since the relevant extension is still private, there is no way to
436    /// interact with the original cases. The only effect this can have now is
437    /// to forward the cases in a proxy-like fashion.
438    ///
439    /// Default is false.
440    pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
441        self.h1_preserve_header_case = enabled;
442        self
443    }
444
445    /// Set the maximum number of headers.
446    ///
447    /// When a response is received, the parser will reserve a buffer to store headers for optimal
448    /// performance.
449    ///
450    /// If client receives more headers than the buffer size, the error "message header too large"
451    /// is returned.
452    ///
453    /// Note that headers is allocated on the stack by default, which has higher performance. After
454    /// setting this value, headers will be allocated in heap memory, that is, heap memory
455    /// allocation will occur for each response, and there will be a performance drop of about 5%.
456    ///
457    /// Default is 100.
458    pub fn max_headers(&mut self, val: usize) -> &mut Self {
459        self.h1_max_headers = Some(val);
460        self
461    }
462
463    /// Set whether to support preserving original header order.
464    ///
465    /// Currently, this will record the order in which headers are received, and store this
466    /// ordering in a private extension on the `Response`. It will also look for and use
467    /// such an extension in any provided `Request`.
468    ///
469    /// Default is false.
470    #[cfg(feature = "ffi")]
471    pub fn preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
472        self.h1_preserve_header_order = enabled;
473        self
474    }
475
476    /// Sets the exact size of the read buffer to *always* use.
477    ///
478    /// Note that setting this option unsets the `max_buf_size` option.
479    ///
480    /// Default is an adaptive read buffer.
481    pub fn read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
482        self.h1_read_buf_exact_size = sz;
483        self.h1_max_buf_size = None;
484        self
485    }
486
487    /// Set the maximum buffer size for the connection.
488    ///
489    /// Default is ~400kb.
490    ///
491    /// Note that setting this option unsets the `read_exact_buf_size` option.
492    ///
493    /// # Panics
494    ///
495    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
496    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
497        assert!(
498            max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
499            "the max_buf_size cannot be smaller than the minimum that h1 specifies."
500        );
501
502        self.h1_max_buf_size = Some(max);
503        self.h1_read_buf_exact_size = None;
504        self
505    }
506
507    /// Constructs a connection with the configured options and IO.
508    /// See [`client::conn`](crate::client::conn) for more.
509    ///
510    /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
511    /// do nothing.
512    pub fn handshake<T, B>(
513        &self,
514        io: T,
515    ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
516    where
517        T: Read + Write + Unpin,
518        B: Body + 'static,
519        B::Data: Send,
520        B::Error: Into<Box<dyn StdError + Send + Sync>>,
521    {
522        let opts = self.clone();
523
524        async move {
525            trace!("client handshake HTTP/1");
526
527            let (tx, rx) = dispatch::channel();
528            let mut conn = proto::Conn::new(io);
529            conn.set_h1_parser_config(opts.h1_parser_config);
530            if let Some(writev) = opts.h1_writev {
531                if writev {
532                    conn.set_write_strategy_queue();
533                } else {
534                    conn.set_write_strategy_flatten();
535                }
536            }
537            if opts.h1_title_case_headers {
538                conn.set_title_case_headers();
539            }
540            if opts.h1_preserve_header_case {
541                conn.set_preserve_header_case();
542            }
543            if let Some(max_headers) = opts.h1_max_headers {
544                conn.set_http1_max_headers(max_headers);
545            }
546            #[cfg(feature = "ffi")]
547            if opts.h1_preserve_header_order {
548                conn.set_preserve_header_order();
549            }
550
551            if opts.h09_responses {
552                conn.set_h09_responses();
553            }
554
555            if let Some(sz) = opts.h1_read_buf_exact_size {
556                conn.set_read_buf_exact_size(sz);
557            }
558            if let Some(max) = opts.h1_max_buf_size {
559                conn.set_max_buf_size(max);
560            }
561            let cd = proto::h1::dispatch::Client::new(rx);
562            let proto = proto::h1::Dispatcher::new(cd, conn);
563
564            Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
565        }
566    }
567}
568
569mod upgrades {
570    use crate::upgrade::Upgraded;
571
572    use super::*;
573
574    // A future binding a connection with a Service with Upgrade support.
575    //
576    // This type is unnameable outside the crate.
577    #[must_use = "futures do nothing unless polled"]
578    #[allow(missing_debug_implementations)]
579    pub struct UpgradeableConnection<T, B>
580    where
581        T: Read + Write + Unpin + Send + 'static,
582        B: Body + 'static,
583        B::Error: Into<Box<dyn StdError + Send + Sync>>,
584    {
585        pub(super) inner: Option<Connection<T, B>>,
586    }
587
588    impl<I, B> Future for UpgradeableConnection<I, B>
589    where
590        I: Read + Write + Unpin + Send + 'static,
591        B: Body + 'static,
592        B::Data: Send,
593        B::Error: Into<Box<dyn StdError + Send + Sync>>,
594    {
595        type Output = crate::Result<()>;
596
597        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
598            match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) {
599                Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
600                Ok(proto::Dispatched::Upgrade(pending)) => {
601                    let Parts { io, read_buf } = self.inner.take().unwrap().into_parts();
602                    pending.fulfill(Upgraded::new(io, read_buf));
603                    Poll::Ready(Ok(()))
604                }
605                Err(e) => Poll::Ready(Err(e)),
606            }
607        }
608    }
609}