hyper/proto/h2/
client.rs

1use std::{
2    convert::Infallible,
3    future::Future,
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7    time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_util::future::{Either, FusedFuture, FutureExt as _};
15use futures_util::ready;
16use futures_util::stream::{StreamExt as _, StreamFuture};
17use h2::client::{Builder, Connection, SendRequest};
18use h2::SendStream;
19use http::{Method, StatusCode};
20use pin_project_lite::pin_project;
21
22use super::ping::{Ponger, Recorder};
23use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
24use crate::body::{Body, Incoming as IncomingBody};
25use crate::client::dispatch::{Callback, SendWhen, TrySendError};
26use crate::common::io::Compat;
27use crate::common::time::Time;
28use crate::ext::Protocol;
29use crate::headers;
30use crate::proto::h2::UpgradedSendStream;
31use crate::proto::Dispatched;
32use crate::rt::bounds::Http2ClientConnExec;
33use crate::upgrade::Upgraded;
34use crate::{Request, Response};
35use h2::client::ResponseFuture;
36
37type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
38
39///// An mpsc channel is used to help notify the `Connection` task when *all*
40///// other handles to it have been dropped, so that it can shutdown.
41type ConnDropRef = mpsc::Sender<Infallible>;
42
43///// A oneshot channel watches the `Connection` task, and when it completes,
44///// the "dispatch" task will be notified and can shutdown sooner.
45type ConnEof = oneshot::Receiver<Infallible>;
46
47// Our defaults are chosen for the "majority" case, which usually are not
48// resource constrained, and so the spec default of 64kb can be too limiting
49// for performance.
50const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
51const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
52const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
53const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb
54const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb
55
56// The maximum number of concurrent streams that the client is allowed to open
57// before it receives the initial SETTINGS frame from the server.
58// This default value is derived from what the HTTP/2 spec recommends as the
59// minimum value that endpoints advertise to their peers. It means that using
60// this value will minimize the chance of the failure where the local endpoint
61// attempts to open too many streams and gets rejected by the remote peer with
62// the `REFUSED_STREAM` error.
63const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
64
65#[derive(Clone, Debug)]
66pub(crate) struct Config {
67    pub(crate) adaptive_window: bool,
68    pub(crate) initial_conn_window_size: u32,
69    pub(crate) initial_stream_window_size: u32,
70    pub(crate) initial_max_send_streams: usize,
71    pub(crate) max_frame_size: u32,
72    pub(crate) max_header_list_size: u32,
73    pub(crate) keep_alive_interval: Option<Duration>,
74    pub(crate) keep_alive_timeout: Duration,
75    pub(crate) keep_alive_while_idle: bool,
76    pub(crate) max_concurrent_reset_streams: Option<usize>,
77    pub(crate) max_send_buffer_size: usize,
78    pub(crate) max_pending_accept_reset_streams: Option<usize>,
79}
80
81impl Default for Config {
82    fn default() -> Config {
83        Config {
84            adaptive_window: false,
85            initial_conn_window_size: DEFAULT_CONN_WINDOW,
86            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
87            initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
88            max_frame_size: DEFAULT_MAX_FRAME_SIZE,
89            max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
90            keep_alive_interval: None,
91            keep_alive_timeout: Duration::from_secs(20),
92            keep_alive_while_idle: false,
93            max_concurrent_reset_streams: None,
94            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
95            max_pending_accept_reset_streams: None,
96        }
97    }
98}
99
100fn new_builder(config: &Config) -> Builder {
101    let mut builder = Builder::default();
102    builder
103        .initial_max_send_streams(config.initial_max_send_streams)
104        .initial_window_size(config.initial_stream_window_size)
105        .initial_connection_window_size(config.initial_conn_window_size)
106        .max_frame_size(config.max_frame_size)
107        .max_header_list_size(config.max_header_list_size)
108        .max_send_buffer_size(config.max_send_buffer_size)
109        .enable_push(false);
110    if let Some(max) = config.max_concurrent_reset_streams {
111        builder.max_concurrent_reset_streams(max);
112    }
113    if let Some(max) = config.max_pending_accept_reset_streams {
114        builder.max_pending_accept_reset_streams(max);
115    }
116    builder
117}
118
119fn new_ping_config(config: &Config) -> ping::Config {
120    ping::Config {
121        bdp_initial_window: if config.adaptive_window {
122            Some(config.initial_stream_window_size)
123        } else {
124            None
125        },
126        keep_alive_interval: config.keep_alive_interval,
127        keep_alive_timeout: config.keep_alive_timeout,
128        keep_alive_while_idle: config.keep_alive_while_idle,
129    }
130}
131
132pub(crate) async fn handshake<T, B, E>(
133    io: T,
134    req_rx: ClientRx<B>,
135    config: &Config,
136    mut exec: E,
137    timer: Time,
138) -> crate::Result<ClientTask<B, E, T>>
139where
140    T: Read + Write + Unpin,
141    B: Body + 'static,
142    B::Data: Send + 'static,
143    E: Http2ClientConnExec<B, T> + Unpin,
144    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
145{
146    let (h2_tx, mut conn) = new_builder(config)
147        .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
148        .await
149        .map_err(crate::Error::new_h2)?;
150
151    // An mpsc channel is used entirely to detect when the
152    // 'Client' has been dropped. This is to get around a bug
153    // in h2 where dropping all SendRequests won't notify a
154    // parked Connection.
155    let (conn_drop_ref, rx) = mpsc::channel(1);
156    let (cancel_tx, conn_eof) = oneshot::channel();
157
158    let conn_drop_rx = rx.into_future();
159
160    let ping_config = new_ping_config(config);
161
162    let (conn, ping) = if ping_config.is_enabled() {
163        let pp = conn.ping_pong().expect("conn.ping_pong");
164        let (recorder, ponger) = ping::channel(pp, ping_config, timer);
165
166        let conn: Conn<_, B> = Conn::new(ponger, conn);
167        (Either::Left(conn), recorder)
168    } else {
169        (Either::Right(conn), ping::disabled())
170    };
171    let conn: ConnMapErr<T, B> = ConnMapErr {
172        conn,
173        is_terminated: false,
174    };
175
176    exec.execute_h2_future(H2ClientFuture::Task {
177        task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
178    });
179
180    Ok(ClientTask {
181        ping,
182        conn_drop_ref,
183        conn_eof,
184        executor: exec,
185        h2_tx,
186        req_rx,
187        fut_ctx: None,
188        marker: PhantomData,
189    })
190}
191
192pin_project! {
193    struct Conn<T, B>
194    where
195        B: Body,
196    {
197        #[pin]
198        ponger: Ponger,
199        #[pin]
200        conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
201    }
202}
203
204impl<T, B> Conn<T, B>
205where
206    B: Body,
207    T: Read + Write + Unpin,
208{
209    fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
210        Conn { ponger, conn }
211    }
212}
213
214impl<T, B> Future for Conn<T, B>
215where
216    B: Body,
217    T: Read + Write + Unpin,
218{
219    type Output = Result<(), h2::Error>;
220
221    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
222        let mut this = self.project();
223        match this.ponger.poll(cx) {
224            Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
225                this.conn.set_target_window_size(wnd);
226                this.conn.set_initial_window_size(wnd)?;
227            }
228            Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
229                debug!("connection keep-alive timed out");
230                return Poll::Ready(Ok(()));
231            }
232            Poll::Pending => {}
233        }
234
235        Pin::new(&mut this.conn).poll(cx)
236    }
237}
238
239pin_project! {
240    struct ConnMapErr<T, B>
241    where
242        B: Body,
243        T: Read,
244        T: Write,
245        T: Unpin,
246    {
247        #[pin]
248        conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
249        #[pin]
250        is_terminated: bool,
251    }
252}
253
254impl<T, B> Future for ConnMapErr<T, B>
255where
256    B: Body,
257    T: Read + Write + Unpin,
258{
259    type Output = Result<(), ()>;
260
261    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
262        let mut this = self.project();
263
264        if *this.is_terminated {
265            return Poll::Pending;
266        }
267        let polled = this.conn.poll(cx);
268        if polled.is_ready() {
269            *this.is_terminated = true;
270        }
271        polled.map_err(|_e| {
272            debug!(error = %_e, "connection error");
273        })
274    }
275}
276
277impl<T, B> FusedFuture for ConnMapErr<T, B>
278where
279    B: Body,
280    T: Read + Write + Unpin,
281{
282    fn is_terminated(&self) -> bool {
283        self.is_terminated
284    }
285}
286
287pin_project! {
288    pub struct ConnTask<T, B>
289    where
290        B: Body,
291        T: Read,
292        T: Write,
293        T: Unpin,
294    {
295        #[pin]
296        drop_rx: StreamFuture<Receiver<Infallible>>,
297        #[pin]
298        cancel_tx: Option<oneshot::Sender<Infallible>>,
299        #[pin]
300        conn: ConnMapErr<T, B>,
301    }
302}
303
304impl<T, B> ConnTask<T, B>
305where
306    B: Body,
307    T: Read + Write + Unpin,
308{
309    fn new(
310        conn: ConnMapErr<T, B>,
311        drop_rx: StreamFuture<Receiver<Infallible>>,
312        cancel_tx: oneshot::Sender<Infallible>,
313    ) -> Self {
314        Self {
315            drop_rx,
316            cancel_tx: Some(cancel_tx),
317            conn,
318        }
319    }
320}
321
322impl<T, B> Future for ConnTask<T, B>
323where
324    B: Body,
325    T: Read + Write + Unpin,
326{
327    type Output = ();
328
329    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
330        let mut this = self.project();
331
332        if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() {
333            // ok or err, the `conn` has finished.
334            return Poll::Ready(());
335        }
336
337        if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() {
338            // mpsc has been dropped, hopefully polling
339            // the connection some more should start shutdown
340            // and then close.
341            trace!("send_request dropped, starting conn shutdown");
342            drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
343        }
344
345        Poll::Pending
346    }
347}
348
349pin_project! {
350    #[project = H2ClientFutureProject]
351    pub enum H2ClientFuture<B, T>
352    where
353        B: http_body::Body,
354        B: 'static,
355        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
356        T: Read,
357        T: Write,
358        T: Unpin,
359    {
360        Pipe {
361            #[pin]
362            pipe: PipeMap<B>,
363        },
364        Send {
365            #[pin]
366            send_when: SendWhen<B>,
367        },
368        Task {
369            #[pin]
370            task: ConnTask<T, B>,
371        },
372    }
373}
374
375impl<B, T> Future for H2ClientFuture<B, T>
376where
377    B: http_body::Body + 'static,
378    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
379    T: Read + Write + Unpin,
380{
381    type Output = ();
382
383    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
384        let this = self.project();
385
386        match this {
387            H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
388            H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
389            H2ClientFutureProject::Task { task } => task.poll(cx),
390        }
391    }
392}
393
394struct FutCtx<B>
395where
396    B: Body,
397{
398    is_connect: bool,
399    eos: bool,
400    fut: ResponseFuture,
401    body_tx: SendStream<SendBuf<B::Data>>,
402    body: B,
403    cb: Callback<Request<B>, Response<IncomingBody>>,
404}
405
406impl<B: Body> Unpin for FutCtx<B> {}
407
408pub(crate) struct ClientTask<B, E, T>
409where
410    B: Body,
411    E: Unpin,
412{
413    ping: ping::Recorder,
414    conn_drop_ref: ConnDropRef,
415    conn_eof: ConnEof,
416    executor: E,
417    h2_tx: SendRequest<SendBuf<B::Data>>,
418    req_rx: ClientRx<B>,
419    fut_ctx: Option<FutCtx<B>>,
420    marker: PhantomData<T>,
421}
422
423impl<B, E, T> ClientTask<B, E, T>
424where
425    B: Body + 'static,
426    E: Http2ClientConnExec<B, T> + Unpin,
427    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
428    T: Read + Write + Unpin,
429{
430    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
431        self.h2_tx.is_extended_connect_protocol_enabled()
432    }
433}
434
435pin_project! {
436    pub struct PipeMap<S>
437    where
438        S: Body,
439    {
440        #[pin]
441        pipe: PipeToSendStream<S>,
442        #[pin]
443        conn_drop_ref: Option<Sender<Infallible>>,
444        #[pin]
445        ping: Option<Recorder>,
446    }
447}
448
449impl<B> Future for PipeMap<B>
450where
451    B: http_body::Body,
452    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
453{
454    type Output = ();
455
456    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
457        let mut this = self.project();
458
459        match this.pipe.poll_unpin(cx) {
460            Poll::Ready(result) => {
461                if let Err(_e) = result {
462                    debug!("client request body error: {}", _e);
463                }
464                drop(this.conn_drop_ref.take().expect("Future polled twice"));
465                drop(this.ping.take().expect("Future polled twice"));
466                return Poll::Ready(());
467            }
468            Poll::Pending => (),
469        };
470        Poll::Pending
471    }
472}
473
474impl<B, E, T> ClientTask<B, E, T>
475where
476    B: Body + 'static + Unpin,
477    B::Data: Send,
478    E: Http2ClientConnExec<B, T> + Unpin,
479    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
480    T: Read + Write + Unpin,
481{
482    fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
483        let ping = self.ping.clone();
484
485        let send_stream = if !f.is_connect {
486            if !f.eos {
487                let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
488
489                // eagerly see if the body pipe is ready and
490                // can thus skip allocating in the executor
491                match Pin::new(&mut pipe).poll(cx) {
492                    Poll::Ready(_) => (),
493                    Poll::Pending => {
494                        let conn_drop_ref = self.conn_drop_ref.clone();
495                        // keep the ping recorder's knowledge of an
496                        // "open stream" alive while this body is
497                        // still sending...
498                        let ping = ping.clone();
499
500                        let pipe = PipeMap {
501                            pipe,
502                            conn_drop_ref: Some(conn_drop_ref),
503                            ping: Some(ping),
504                        };
505                        // Clear send task
506                        self.executor
507                            .execute_h2_future(H2ClientFuture::Pipe { pipe });
508                    }
509                }
510            }
511
512            None
513        } else {
514            Some(f.body_tx)
515        };
516
517        self.executor.execute_h2_future(H2ClientFuture::Send {
518            send_when: SendWhen {
519                when: ResponseFutMap {
520                    fut: f.fut,
521                    ping: Some(ping),
522                    send_stream: Some(send_stream),
523                },
524                call_back: Some(f.cb),
525            },
526        });
527    }
528}
529
530pin_project! {
531    pub(crate) struct ResponseFutMap<B>
532    where
533        B: Body,
534        B: 'static,
535    {
536        #[pin]
537        fut: ResponseFuture,
538        #[pin]
539        ping: Option<Recorder>,
540        #[pin]
541        send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
542    }
543}
544
545impl<B> Future for ResponseFutMap<B>
546where
547    B: Body + 'static,
548{
549    type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
550
551    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
552        let mut this = self.project();
553
554        let result = ready!(this.fut.poll(cx));
555
556        let ping = this.ping.take().expect("Future polled twice");
557        let send_stream = this.send_stream.take().expect("Future polled twice");
558
559        match result {
560            Ok(res) => {
561                // record that we got the response headers
562                ping.record_non_data();
563
564                let content_length = headers::content_length_parse_all(res.headers());
565                if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
566                    if content_length.map_or(false, |len| len != 0) {
567                        warn!("h2 connect response with non-zero body not supported");
568
569                        send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
570                        return Poll::Ready(Err((
571                            crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
572                            None::<Request<B>>,
573                        )));
574                    }
575                    let (parts, recv_stream) = res.into_parts();
576                    let mut res = Response::from_parts(parts, IncomingBody::empty());
577
578                    let (pending, on_upgrade) = crate::upgrade::pending();
579                    let io = H2Upgraded {
580                        ping,
581                        send_stream: unsafe { UpgradedSendStream::new(send_stream) },
582                        recv_stream,
583                        buf: Bytes::new(),
584                    };
585                    let upgraded = Upgraded::new(io, Bytes::new());
586
587                    pending.fulfill(upgraded);
588                    res.extensions_mut().insert(on_upgrade);
589
590                    Poll::Ready(Ok(res))
591                } else {
592                    let res = res.map(|stream| {
593                        let ping = ping.for_stream(&stream);
594                        IncomingBody::h2(stream, content_length.into(), ping)
595                    });
596                    Poll::Ready(Ok(res))
597                }
598            }
599            Err(err) => {
600                ping.ensure_not_timed_out().map_err(|e| (e, None))?;
601
602                debug!("client response error: {}", err);
603                Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
604            }
605        }
606    }
607}
608
609impl<B, E, T> Future for ClientTask<B, E, T>
610where
611    B: Body + 'static + Unpin,
612    B::Data: Send,
613    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
614    E: Http2ClientConnExec<B, T> + Unpin,
615    T: Read + Write + Unpin,
616{
617    type Output = crate::Result<Dispatched>;
618
619    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
620        loop {
621            match ready!(self.h2_tx.poll_ready(cx)) {
622                Ok(()) => (),
623                Err(err) => {
624                    self.ping.ensure_not_timed_out()?;
625                    return if err.reason() == Some(::h2::Reason::NO_ERROR) {
626                        trace!("connection gracefully shutdown");
627                        Poll::Ready(Ok(Dispatched::Shutdown))
628                    } else {
629                        Poll::Ready(Err(crate::Error::new_h2(err)))
630                    };
631                }
632            };
633
634            // If we were waiting on pending open
635            // continue where we left off.
636            if let Some(f) = self.fut_ctx.take() {
637                self.poll_pipe(f, cx);
638                continue;
639            }
640
641            match self.req_rx.poll_recv(cx) {
642                Poll::Ready(Some((req, cb))) => {
643                    // check that future hasn't been canceled already
644                    if cb.is_canceled() {
645                        trace!("request callback is canceled");
646                        continue;
647                    }
648                    let (head, body) = req.into_parts();
649                    let mut req = ::http::Request::from_parts(head, ());
650                    super::strip_connection_headers(req.headers_mut(), true);
651                    if let Some(len) = body.size_hint().exact() {
652                        if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
653                            headers::set_content_length_if_missing(req.headers_mut(), len);
654                        }
655                    }
656
657                    let is_connect = req.method() == Method::CONNECT;
658                    let eos = body.is_end_stream();
659
660                    if is_connect
661                        && headers::content_length_parse_all(req.headers())
662                            .map_or(false, |len| len != 0)
663                    {
664                        warn!("h2 connect request with non-zero body not supported");
665                        cb.send(Err(TrySendError {
666                            error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
667                            message: None,
668                        }));
669                        continue;
670                    }
671
672                    if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
673                        req.extensions_mut().insert(protocol.into_inner());
674                    }
675
676                    let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
677                        Ok(ok) => ok,
678                        Err(err) => {
679                            debug!("client send request error: {}", err);
680                            cb.send(Err(TrySendError {
681                                error: crate::Error::new_h2(err),
682                                message: None,
683                            }));
684                            continue;
685                        }
686                    };
687
688                    let f = FutCtx {
689                        is_connect,
690                        eos,
691                        fut,
692                        body_tx,
693                        body,
694                        cb,
695                    };
696
697                    // Check poll_ready() again.
698                    // If the call to send_request() resulted in the new stream being pending open
699                    // we have to wait for the open to complete before accepting new requests.
700                    match self.h2_tx.poll_ready(cx) {
701                        Poll::Pending => {
702                            // Save Context
703                            self.fut_ctx = Some(f);
704                            return Poll::Pending;
705                        }
706                        Poll::Ready(Ok(())) => (),
707                        Poll::Ready(Err(err)) => {
708                            f.cb.send(Err(TrySendError {
709                                error: crate::Error::new_h2(err),
710                                message: None,
711                            }));
712                            continue;
713                        }
714                    }
715                    self.poll_pipe(f, cx);
716                    continue;
717                }
718
719                Poll::Ready(None) => {
720                    trace!("client::dispatch::Sender dropped");
721                    return Poll::Ready(Ok(Dispatched::Shutdown));
722                }
723
724                Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
725                    Ok(never) => match never {},
726                    Err(_conn_is_eof) => {
727                        trace!("connection task is closed, closing dispatch task");
728                        return Poll::Ready(Ok(Dispatched::Shutdown));
729                    }
730                },
731            }
732        }
733    }
734}