hyper/client/conn/
http2.rs

1//! HTTP/2 client connections
2
3use std::error::Error;
4use std::fmt;
5use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10use std::time::Duration;
11
12use crate::rt::{Read, Write};
13use futures_util::ready;
14use http::{Request, Response};
15
16use super::super::dispatch::{self, TrySendError};
17use crate::body::{Body, Incoming as IncomingBody};
18use crate::common::time::Time;
19use crate::proto;
20use crate::rt::bounds::Http2ClientConnExec;
21use crate::rt::Timer;
22
23/// The sender side of an established connection.
24pub struct SendRequest<B> {
25    dispatch: dispatch::UnboundedSender<Request<B>, Response<IncomingBody>>,
26}
27
28impl<B> Clone for SendRequest<B> {
29    fn clone(&self) -> SendRequest<B> {
30        SendRequest {
31            dispatch: self.dispatch.clone(),
32        }
33    }
34}
35
36/// A future that processes all HTTP state for the IO object.
37///
38/// In most cases, this should just be spawned into an executor, so that it
39/// can process incoming and outgoing messages, notice hangups, and the like.
40#[must_use = "futures do nothing unless polled"]
41pub struct Connection<T, B, E>
42where
43    T: Read + Write + Unpin,
44    B: Body + 'static,
45    E: Http2ClientConnExec<B, T> + Unpin,
46    B::Error: Into<Box<dyn Error + Send + Sync>>,
47{
48    inner: (PhantomData<T>, proto::h2::ClientTask<B, E, T>),
49}
50
51/// A builder to configure an HTTP connection.
52///
53/// After setting options, the builder is used to create a handshake future.
54///
55/// **Note**: The default values of options are *not considered stable*. They
56/// are subject to change at any time.
57#[derive(Clone, Debug)]
58pub struct Builder<Ex> {
59    pub(super) exec: Ex,
60    pub(super) timer: Time,
61    h2_builder: proto::h2::client::Config,
62}
63
64/// Returns a handshake future over some IO.
65///
66/// This is a shortcut for `Builder::new(exec).handshake(io)`.
67/// See [`client::conn`](crate::client::conn) for more.
68pub async fn handshake<E, T, B>(
69    exec: E,
70    io: T,
71) -> crate::Result<(SendRequest<B>, Connection<T, B, E>)>
72where
73    T: Read + Write + Unpin,
74    B: Body + 'static,
75    B::Data: Send,
76    B::Error: Into<Box<dyn Error + Send + Sync>>,
77    E: Http2ClientConnExec<B, T> + Unpin + Clone,
78{
79    Builder::new(exec).handshake(io).await
80}
81
82// ===== impl SendRequest
83
84impl<B> SendRequest<B> {
85    /// Polls to determine whether this sender can be used yet for a request.
86    ///
87    /// If the associated connection is closed, this returns an Error.
88    pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
89        if self.is_closed() {
90            Poll::Ready(Err(crate::Error::new_closed()))
91        } else {
92            Poll::Ready(Ok(()))
93        }
94    }
95
96    /// Waits until the dispatcher is ready
97    ///
98    /// If the associated connection is closed, this returns an Error.
99    pub async fn ready(&mut self) -> crate::Result<()> {
100        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
101    }
102
103    /// Checks if the connection is currently ready to send a request.
104    ///
105    /// # Note
106    ///
107    /// This is mostly a hint. Due to inherent latency of networks, it is
108    /// possible that even after checking this is ready, sending a request
109    /// may still fail because the connection was closed in the meantime.
110    pub fn is_ready(&self) -> bool {
111        self.dispatch.is_ready()
112    }
113
114    /// Checks if the connection side has been closed.
115    pub fn is_closed(&self) -> bool {
116        self.dispatch.is_closed()
117    }
118}
119
120impl<B> SendRequest<B>
121where
122    B: Body + 'static,
123{
124    /// Sends a `Request` on the associated connection.
125    ///
126    /// Returns a future that if successful, yields the `Response`.
127    ///
128    /// `req` must have a `Host` header.
129    ///
130    /// Absolute-form `Uri`s are not required. If received, they will be serialized
131    /// as-is.
132    pub fn send_request(
133        &mut self,
134        req: Request<B>,
135    ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
136        let sent = self.dispatch.send(req);
137
138        async move {
139            match sent {
140                Ok(rx) => match rx.await {
141                    Ok(Ok(resp)) => Ok(resp),
142                    Ok(Err(err)) => Err(err),
143                    // this is definite bug if it happens, but it shouldn't happen!
144                    Err(_canceled) => panic!("dispatch dropped without returning error"),
145                },
146                Err(_req) => {
147                    debug!("connection was not ready");
148
149                    Err(crate::Error::new_canceled().with("connection was not ready"))
150                }
151            }
152        }
153    }
154
155    /// Sends a `Request` on the associated connection.
156    ///
157    /// Returns a future that if successful, yields the `Response`.
158    ///
159    /// # Error
160    ///
161    /// If there was an error before trying to serialize the request to the
162    /// connection, the message will be returned as part of this error.
163    pub fn try_send_request(
164        &mut self,
165        req: Request<B>,
166    ) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
167        let sent = self.dispatch.try_send(req);
168        async move {
169            match sent {
170                Ok(rx) => match rx.await {
171                    Ok(Ok(res)) => Ok(res),
172                    Ok(Err(err)) => Err(err),
173                    // this is definite bug if it happens, but it shouldn't happen!
174                    Err(_) => panic!("dispatch dropped without returning error"),
175                },
176                Err(req) => {
177                    debug!("connection was not ready");
178                    let error = crate::Error::new_canceled().with("connection was not ready");
179                    Err(TrySendError {
180                        error,
181                        message: Some(req),
182                    })
183                }
184            }
185        }
186    }
187}
188
189impl<B> fmt::Debug for SendRequest<B> {
190    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191        f.debug_struct("SendRequest").finish()
192    }
193}
194
195// ===== impl Connection
196
197impl<T, B, E> Connection<T, B, E>
198where
199    T: Read + Write + Unpin + 'static,
200    B: Body + Unpin + 'static,
201    B::Data: Send,
202    B::Error: Into<Box<dyn Error + Send + Sync>>,
203    E: Http2ClientConnExec<B, T> + Unpin,
204{
205    /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
206    ///
207    /// This setting is configured by the server peer by sending the
208    /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
209    /// This method returns the currently acknowledged value received from the
210    /// remote.
211    ///
212    /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
213    /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
214    pub fn is_extended_connect_protocol_enabled(&self) -> bool {
215        self.inner.1.is_extended_connect_protocol_enabled()
216    }
217}
218
219impl<T, B, E> fmt::Debug for Connection<T, B, E>
220where
221    T: Read + Write + fmt::Debug + 'static + Unpin,
222    B: Body + 'static,
223    E: Http2ClientConnExec<B, T> + Unpin,
224    B::Error: Into<Box<dyn Error + Send + Sync>>,
225{
226    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227        f.debug_struct("Connection").finish()
228    }
229}
230
231impl<T, B, E> Future for Connection<T, B, E>
232where
233    T: Read + Write + Unpin + 'static,
234    B: Body + 'static + Unpin,
235    B::Data: Send,
236    E: Unpin,
237    B::Error: Into<Box<dyn Error + Send + Sync>>,
238    E: Http2ClientConnExec<B, T> + Unpin,
239{
240    type Output = crate::Result<()>;
241
242    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
243        match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
244            proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
245            #[cfg(feature = "http1")]
246            proto::Dispatched::Upgrade(_pending) => unreachable!("http2 cannot upgrade"),
247        }
248    }
249}
250
251// ===== impl Builder
252
253impl<Ex> Builder<Ex>
254where
255    Ex: Clone,
256{
257    /// Creates a new connection builder.
258    #[inline]
259    pub fn new(exec: Ex) -> Builder<Ex> {
260        Builder {
261            exec,
262            timer: Time::Empty,
263            h2_builder: Default::default(),
264        }
265    }
266
267    /// Provide a timer to execute background HTTP2 tasks.
268    pub fn timer<M>(&mut self, timer: M) -> &mut Builder<Ex>
269    where
270        M: Timer + Send + Sync + 'static,
271    {
272        self.timer = Time::Timer(Arc::new(timer));
273        self
274    }
275
276    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
277    /// stream-level flow control.
278    ///
279    /// Passing `None` will do nothing.
280    ///
281    /// If not set, hyper will use a default.
282    ///
283    /// [spec]: https://httpwg.org/specs/rfc9113.html#SETTINGS_INITIAL_WINDOW_SIZE
284    pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
285        if let Some(sz) = sz.into() {
286            self.h2_builder.adaptive_window = false;
287            self.h2_builder.initial_stream_window_size = sz;
288        }
289        self
290    }
291
292    /// Sets the max connection-level flow control for HTTP2
293    ///
294    /// Passing `None` will do nothing.
295    ///
296    /// If not set, hyper will use a default.
297    pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
298        if let Some(sz) = sz.into() {
299            self.h2_builder.adaptive_window = false;
300            self.h2_builder.initial_conn_window_size = sz;
301        }
302        self
303    }
304
305    /// Sets the initial maximum of locally initiated (send) streams.
306    ///
307    /// This value will be overwritten by the value included in the initial
308    /// SETTINGS frame received from the peer as part of a [connection preface].
309    ///
310    /// Passing `None` will do nothing.
311    ///
312    /// If not set, hyper will use a default.
313    ///
314    /// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface
315    pub fn initial_max_send_streams(&mut self, initial: impl Into<Option<usize>>) -> &mut Self {
316        if let Some(initial) = initial.into() {
317            self.h2_builder.initial_max_send_streams = initial;
318        }
319        self
320    }
321
322    /// Sets whether to use an adaptive flow control.
323    ///
324    /// Enabling this will override the limits set in
325    /// `initial_stream_window_size` and
326    /// `initial_connection_window_size`.
327    pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
328        use proto::h2::SPEC_WINDOW_SIZE;
329
330        self.h2_builder.adaptive_window = enabled;
331        if enabled {
332            self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
333            self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
334        }
335        self
336    }
337
338    /// Sets the maximum frame size to use for HTTP2.
339    ///
340    /// Passing `None` will do nothing.
341    ///
342    /// If not set, hyper will use a default.
343    pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
344        if let Some(sz) = sz.into() {
345            self.h2_builder.max_frame_size = sz;
346        }
347        self
348    }
349
350    /// Sets the max size of received header frames.
351    ///
352    /// Default is currently 16KB, but can change.
353    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
354        self.h2_builder.max_header_list_size = max;
355        self
356    }
357
358    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
359    /// connection alive.
360    ///
361    /// Pass `None` to disable HTTP2 keep-alive.
362    ///
363    /// Default is currently disabled.
364    pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
365        self.h2_builder.keep_alive_interval = interval.into();
366        self
367    }
368
369    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
370    ///
371    /// If the ping is not acknowledged within the timeout, the connection will
372    /// be closed. Does nothing if `keep_alive_interval` is disabled.
373    ///
374    /// Default is 20 seconds.
375    pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
376        self.h2_builder.keep_alive_timeout = timeout;
377        self
378    }
379
380    /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
381    ///
382    /// If disabled, keep-alive pings are only sent while there are open
383    /// request/responses streams. If enabled, pings are also sent when no
384    /// streams are active. Does nothing if `keep_alive_interval` is
385    /// disabled.
386    ///
387    /// Default is `false`.
388    pub fn keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
389        self.h2_builder.keep_alive_while_idle = enabled;
390        self
391    }
392
393    /// Sets the maximum number of HTTP2 concurrent locally reset streams.
394    ///
395    /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
396    /// details.
397    ///
398    /// The default value is determined by the `h2` crate.
399    ///
400    /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
401    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
402        self.h2_builder.max_concurrent_reset_streams = Some(max);
403        self
404    }
405
406    /// Set the maximum write buffer size for each HTTP/2 stream.
407    ///
408    /// Default is currently 1MB, but may change.
409    ///
410    /// # Panics
411    ///
412    /// The value must be no larger than `u32::MAX`.
413    pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
414        assert!(max <= u32::MAX as usize);
415        self.h2_builder.max_send_buffer_size = max;
416        self
417    }
418
419    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
420    ///
421    /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
422    /// As of v0.4.0, it is 20.
423    ///
424    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
425    pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
426        self.h2_builder.max_pending_accept_reset_streams = max.into();
427        self
428    }
429
430    /// Constructs a connection with the configured options and IO.
431    /// See [`client::conn`](crate::client::conn) for more.
432    ///
433    /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
434    /// do nothing.
435    pub fn handshake<T, B>(
436        &self,
437        io: T,
438    ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B, Ex>)>>
439    where
440        T: Read + Write + Unpin,
441        B: Body + 'static,
442        B::Data: Send,
443        B::Error: Into<Box<dyn Error + Send + Sync>>,
444        Ex: Http2ClientConnExec<B, T> + Unpin,
445    {
446        let opts = self.clone();
447
448        async move {
449            trace!("client handshake HTTP/2");
450
451            let (tx, rx) = dispatch::channel();
452            let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec, opts.timer)
453                .await?;
454            Ok((
455                SendRequest {
456                    dispatch: tx.unbound(),
457                },
458                Connection {
459                    inner: (PhantomData, h2),
460                },
461            ))
462        }
463    }
464}
465
466#[cfg(test)]
467mod tests {
468
469    #[tokio::test]
470    #[ignore] // only compilation is checked
471    async fn send_sync_executor_of_non_send_futures() {
472        #[derive(Clone)]
473        struct LocalTokioExecutor;
474
475        impl<F> crate::rt::Executor<F> for LocalTokioExecutor
476        where
477            F: std::future::Future + 'static, // not requiring `Send`
478        {
479            fn execute(&self, fut: F) {
480                // This will spawn into the currently running `LocalSet`.
481                tokio::task::spawn_local(fut);
482            }
483        }
484
485        #[allow(unused)]
486        async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) {
487            let (_sender, conn) = crate::client::conn::http2::handshake::<
488                _,
489                _,
490                http_body_util::Empty<bytes::Bytes>,
491            >(LocalTokioExecutor, io)
492            .await
493            .unwrap();
494
495            tokio::task::spawn_local(async move {
496                conn.await.unwrap();
497            });
498        }
499    }
500
501    #[tokio::test]
502    #[ignore] // only compilation is checked
503    async fn not_send_not_sync_executor_of_not_send_futures() {
504        #[derive(Clone)]
505        struct LocalTokioExecutor {
506            _x: std::marker::PhantomData<std::rc::Rc<()>>,
507        }
508
509        impl<F> crate::rt::Executor<F> for LocalTokioExecutor
510        where
511            F: std::future::Future + 'static, // not requiring `Send`
512        {
513            fn execute(&self, fut: F) {
514                // This will spawn into the currently running `LocalSet`.
515                tokio::task::spawn_local(fut);
516            }
517        }
518
519        #[allow(unused)]
520        async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) {
521            let (_sender, conn) =
522                crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(
523                    LocalTokioExecutor {
524                        _x: Default::default(),
525                    },
526                    io,
527                )
528                .await
529                .unwrap();
530
531            tokio::task::spawn_local(async move {
532                conn.await.unwrap();
533            });
534        }
535    }
536
537    #[tokio::test]
538    #[ignore] // only compilation is checked
539    async fn send_not_sync_executor_of_not_send_futures() {
540        #[derive(Clone)]
541        struct LocalTokioExecutor {
542            _x: std::marker::PhantomData<std::cell::Cell<()>>,
543        }
544
545        impl<F> crate::rt::Executor<F> for LocalTokioExecutor
546        where
547            F: std::future::Future + 'static, // not requiring `Send`
548        {
549            fn execute(&self, fut: F) {
550                // This will spawn into the currently running `LocalSet`.
551                tokio::task::spawn_local(fut);
552            }
553        }
554
555        #[allow(unused)]
556        async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) {
557            let (_sender, conn) =
558                crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(
559                    LocalTokioExecutor {
560                        _x: Default::default(),
561                    },
562                    io,
563                )
564                .await
565                .unwrap();
566
567            tokio::task::spawn_local(async move {
568                conn.await.unwrap();
569            });
570        }
571    }
572
573    #[tokio::test]
574    #[ignore] // only compilation is checked
575    async fn send_sync_executor_of_send_futures() {
576        #[derive(Clone)]
577        struct TokioExecutor;
578
579        impl<F> crate::rt::Executor<F> for TokioExecutor
580        where
581            F: std::future::Future + 'static + Send,
582            F::Output: Send + 'static,
583        {
584            fn execute(&self, fut: F) {
585                tokio::task::spawn(fut);
586            }
587        }
588
589        #[allow(unused)]
590        async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) {
591            let (_sender, conn) = crate::client::conn::http2::handshake::<
592                _,
593                _,
594                http_body_util::Empty<bytes::Bytes>,
595            >(TokioExecutor, io)
596            .await
597            .unwrap();
598
599            tokio::task::spawn(async move {
600                conn.await.unwrap();
601            });
602        }
603    }
604
605    #[tokio::test]
606    #[ignore] // only compilation is checked
607    async fn not_send_not_sync_executor_of_send_futures() {
608        #[derive(Clone)]
609        struct TokioExecutor {
610            // !Send, !Sync
611            _x: std::marker::PhantomData<std::rc::Rc<()>>,
612        }
613
614        impl<F> crate::rt::Executor<F> for TokioExecutor
615        where
616            F: std::future::Future + 'static + Send,
617            F::Output: Send + 'static,
618        {
619            fn execute(&self, fut: F) {
620                tokio::task::spawn(fut);
621            }
622        }
623
624        #[allow(unused)]
625        async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) {
626            let (_sender, conn) =
627                crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(
628                    TokioExecutor {
629                        _x: Default::default(),
630                    },
631                    io,
632                )
633                .await
634                .unwrap();
635
636            tokio::task::spawn_local(async move {
637                // can't use spawn here because when executor is !Send
638                conn.await.unwrap();
639            });
640        }
641    }
642
643    #[tokio::test]
644    #[ignore] // only compilation is checked
645    async fn send_not_sync_executor_of_send_futures() {
646        #[derive(Clone)]
647        struct TokioExecutor {
648            // !Sync
649            _x: std::marker::PhantomData<std::cell::Cell<()>>,
650        }
651
652        impl<F> crate::rt::Executor<F> for TokioExecutor
653        where
654            F: std::future::Future + 'static + Send,
655            F::Output: Send + 'static,
656        {
657            fn execute(&self, fut: F) {
658                tokio::task::spawn(fut);
659            }
660        }
661
662        #[allow(unused)]
663        async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) {
664            let (_sender, conn) =
665                crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(
666                    TokioExecutor {
667                        _x: Default::default(),
668                    },
669                    io,
670                )
671                .await
672                .unwrap();
673
674            tokio::task::spawn_local(async move {
675                // can't use spawn here because when executor is !Send
676                conn.await.unwrap();
677            });
678        }
679    }
680}