1use std::{
2 convert::Infallible,
3 future::Future,
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7 time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_util::future::{Either, FusedFuture, FutureExt as _};
15use futures_util::ready;
16use futures_util::stream::{StreamExt as _, StreamFuture};
17use h2::client::{Builder, Connection, SendRequest};
18use h2::SendStream;
19use http::{Method, StatusCode};
20use pin_project_lite::pin_project;
21
22use super::ping::{Ponger, Recorder};
23use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
24use crate::body::{Body, Incoming as IncomingBody};
25use crate::client::dispatch::{Callback, SendWhen, TrySendError};
26use crate::common::io::Compat;
27use crate::common::time::Time;
28use crate::ext::Protocol;
29use crate::headers;
30use crate::proto::h2::UpgradedSendStream;
31use crate::proto::Dispatched;
32use crate::rt::bounds::Http2ClientConnExec;
33use crate::upgrade::Upgraded;
34use crate::{Request, Response};
35use h2::client::ResponseFuture;
36
37type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
38
39type ConnDropRef = mpsc::Sender<Infallible>;
42
43type ConnEof = oneshot::Receiver<Infallible>;
46
47const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
64
65#[derive(Clone, Debug)]
66pub(crate) struct Config {
67 pub(crate) adaptive_window: bool,
68 pub(crate) initial_conn_window_size: u32,
69 pub(crate) initial_stream_window_size: u32,
70 pub(crate) initial_max_send_streams: usize,
71 pub(crate) max_frame_size: u32,
72 pub(crate) max_header_list_size: u32,
73 pub(crate) keep_alive_interval: Option<Duration>,
74 pub(crate) keep_alive_timeout: Duration,
75 pub(crate) keep_alive_while_idle: bool,
76 pub(crate) max_concurrent_reset_streams: Option<usize>,
77 pub(crate) max_send_buffer_size: usize,
78 pub(crate) max_pending_accept_reset_streams: Option<usize>,
79}
80
81impl Default for Config {
82 fn default() -> Config {
83 Config {
84 adaptive_window: false,
85 initial_conn_window_size: DEFAULT_CONN_WINDOW,
86 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
87 initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
88 max_frame_size: DEFAULT_MAX_FRAME_SIZE,
89 max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
90 keep_alive_interval: None,
91 keep_alive_timeout: Duration::from_secs(20),
92 keep_alive_while_idle: false,
93 max_concurrent_reset_streams: None,
94 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
95 max_pending_accept_reset_streams: None,
96 }
97 }
98}
99
100fn new_builder(config: &Config) -> Builder {
101 let mut builder = Builder::default();
102 builder
103 .initial_max_send_streams(config.initial_max_send_streams)
104 .initial_window_size(config.initial_stream_window_size)
105 .initial_connection_window_size(config.initial_conn_window_size)
106 .max_frame_size(config.max_frame_size)
107 .max_header_list_size(config.max_header_list_size)
108 .max_send_buffer_size(config.max_send_buffer_size)
109 .enable_push(false);
110 if let Some(max) = config.max_concurrent_reset_streams {
111 builder.max_concurrent_reset_streams(max);
112 }
113 if let Some(max) = config.max_pending_accept_reset_streams {
114 builder.max_pending_accept_reset_streams(max);
115 }
116 builder
117}
118
119fn new_ping_config(config: &Config) -> ping::Config {
120 ping::Config {
121 bdp_initial_window: if config.adaptive_window {
122 Some(config.initial_stream_window_size)
123 } else {
124 None
125 },
126 keep_alive_interval: config.keep_alive_interval,
127 keep_alive_timeout: config.keep_alive_timeout,
128 keep_alive_while_idle: config.keep_alive_while_idle,
129 }
130}
131
132pub(crate) async fn handshake<T, B, E>(
133 io: T,
134 req_rx: ClientRx<B>,
135 config: &Config,
136 mut exec: E,
137 timer: Time,
138) -> crate::Result<ClientTask<B, E, T>>
139where
140 T: Read + Write + Unpin,
141 B: Body + 'static,
142 B::Data: Send + 'static,
143 E: Http2ClientConnExec<B, T> + Unpin,
144 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
145{
146 let (h2_tx, mut conn) = new_builder(config)
147 .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
148 .await
149 .map_err(crate::Error::new_h2)?;
150
151 let (conn_drop_ref, rx) = mpsc::channel(1);
156 let (cancel_tx, conn_eof) = oneshot::channel();
157
158 let conn_drop_rx = rx.into_future();
159
160 let ping_config = new_ping_config(config);
161
162 let (conn, ping) = if ping_config.is_enabled() {
163 let pp = conn.ping_pong().expect("conn.ping_pong");
164 let (recorder, ponger) = ping::channel(pp, ping_config, timer);
165
166 let conn: Conn<_, B> = Conn::new(ponger, conn);
167 (Either::Left(conn), recorder)
168 } else {
169 (Either::Right(conn), ping::disabled())
170 };
171 let conn: ConnMapErr<T, B> = ConnMapErr {
172 conn,
173 is_terminated: false,
174 };
175
176 exec.execute_h2_future(H2ClientFuture::Task {
177 task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
178 });
179
180 Ok(ClientTask {
181 ping,
182 conn_drop_ref,
183 conn_eof,
184 executor: exec,
185 h2_tx,
186 req_rx,
187 fut_ctx: None,
188 marker: PhantomData,
189 })
190}
191
192pin_project! {
193 struct Conn<T, B>
194 where
195 B: Body,
196 {
197 #[pin]
198 ponger: Ponger,
199 #[pin]
200 conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
201 }
202}
203
204impl<T, B> Conn<T, B>
205where
206 B: Body,
207 T: Read + Write + Unpin,
208{
209 fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
210 Conn { ponger, conn }
211 }
212}
213
214impl<T, B> Future for Conn<T, B>
215where
216 B: Body,
217 T: Read + Write + Unpin,
218{
219 type Output = Result<(), h2::Error>;
220
221 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
222 let mut this = self.project();
223 match this.ponger.poll(cx) {
224 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
225 this.conn.set_target_window_size(wnd);
226 this.conn.set_initial_window_size(wnd)?;
227 }
228 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
229 debug!("connection keep-alive timed out");
230 return Poll::Ready(Ok(()));
231 }
232 Poll::Pending => {}
233 }
234
235 Pin::new(&mut this.conn).poll(cx)
236 }
237}
238
239pin_project! {
240 struct ConnMapErr<T, B>
241 where
242 B: Body,
243 T: Read,
244 T: Write,
245 T: Unpin,
246 {
247 #[pin]
248 conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
249 #[pin]
250 is_terminated: bool,
251 }
252}
253
254impl<T, B> Future for ConnMapErr<T, B>
255where
256 B: Body,
257 T: Read + Write + Unpin,
258{
259 type Output = Result<(), ()>;
260
261 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
262 let mut this = self.project();
263
264 if *this.is_terminated {
265 return Poll::Pending;
266 }
267 let polled = this.conn.poll(cx);
268 if polled.is_ready() {
269 *this.is_terminated = true;
270 }
271 polled.map_err(|_e| {
272 debug!(error = %_e, "connection error");
273 })
274 }
275}
276
277impl<T, B> FusedFuture for ConnMapErr<T, B>
278where
279 B: Body,
280 T: Read + Write + Unpin,
281{
282 fn is_terminated(&self) -> bool {
283 self.is_terminated
284 }
285}
286
287pin_project! {
288 pub struct ConnTask<T, B>
289 where
290 B: Body,
291 T: Read,
292 T: Write,
293 T: Unpin,
294 {
295 #[pin]
296 drop_rx: StreamFuture<Receiver<Infallible>>,
297 #[pin]
298 cancel_tx: Option<oneshot::Sender<Infallible>>,
299 #[pin]
300 conn: ConnMapErr<T, B>,
301 }
302}
303
304impl<T, B> ConnTask<T, B>
305where
306 B: Body,
307 T: Read + Write + Unpin,
308{
309 fn new(
310 conn: ConnMapErr<T, B>,
311 drop_rx: StreamFuture<Receiver<Infallible>>,
312 cancel_tx: oneshot::Sender<Infallible>,
313 ) -> Self {
314 Self {
315 drop_rx,
316 cancel_tx: Some(cancel_tx),
317 conn,
318 }
319 }
320}
321
322impl<T, B> Future for ConnTask<T, B>
323where
324 B: Body,
325 T: Read + Write + Unpin,
326{
327 type Output = ();
328
329 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
330 let mut this = self.project();
331
332 if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() {
333 return Poll::Ready(());
335 }
336
337 if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() {
338 trace!("send_request dropped, starting conn shutdown");
342 drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
343 }
344
345 Poll::Pending
346 }
347}
348
349pin_project! {
350 #[project = H2ClientFutureProject]
351 pub enum H2ClientFuture<B, T>
352 where
353 B: http_body::Body,
354 B: 'static,
355 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
356 T: Read,
357 T: Write,
358 T: Unpin,
359 {
360 Pipe {
361 #[pin]
362 pipe: PipeMap<B>,
363 },
364 Send {
365 #[pin]
366 send_when: SendWhen<B>,
367 },
368 Task {
369 #[pin]
370 task: ConnTask<T, B>,
371 },
372 }
373}
374
375impl<B, T> Future for H2ClientFuture<B, T>
376where
377 B: http_body::Body + 'static,
378 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
379 T: Read + Write + Unpin,
380{
381 type Output = ();
382
383 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
384 let this = self.project();
385
386 match this {
387 H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
388 H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
389 H2ClientFutureProject::Task { task } => task.poll(cx),
390 }
391 }
392}
393
394struct FutCtx<B>
395where
396 B: Body,
397{
398 is_connect: bool,
399 eos: bool,
400 fut: ResponseFuture,
401 body_tx: SendStream<SendBuf<B::Data>>,
402 body: B,
403 cb: Callback<Request<B>, Response<IncomingBody>>,
404}
405
406impl<B: Body> Unpin for FutCtx<B> {}
407
408pub(crate) struct ClientTask<B, E, T>
409where
410 B: Body,
411 E: Unpin,
412{
413 ping: ping::Recorder,
414 conn_drop_ref: ConnDropRef,
415 conn_eof: ConnEof,
416 executor: E,
417 h2_tx: SendRequest<SendBuf<B::Data>>,
418 req_rx: ClientRx<B>,
419 fut_ctx: Option<FutCtx<B>>,
420 marker: PhantomData<T>,
421}
422
423impl<B, E, T> ClientTask<B, E, T>
424where
425 B: Body + 'static,
426 E: Http2ClientConnExec<B, T> + Unpin,
427 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
428 T: Read + Write + Unpin,
429{
430 pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
431 self.h2_tx.is_extended_connect_protocol_enabled()
432 }
433}
434
435pin_project! {
436 pub struct PipeMap<S>
437 where
438 S: Body,
439 {
440 #[pin]
441 pipe: PipeToSendStream<S>,
442 #[pin]
443 conn_drop_ref: Option<Sender<Infallible>>,
444 #[pin]
445 ping: Option<Recorder>,
446 }
447}
448
449impl<B> Future for PipeMap<B>
450where
451 B: http_body::Body,
452 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
453{
454 type Output = ();
455
456 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
457 let mut this = self.project();
458
459 match this.pipe.poll_unpin(cx) {
460 Poll::Ready(result) => {
461 if let Err(_e) = result {
462 debug!("client request body error: {}", _e);
463 }
464 drop(this.conn_drop_ref.take().expect("Future polled twice"));
465 drop(this.ping.take().expect("Future polled twice"));
466 return Poll::Ready(());
467 }
468 Poll::Pending => (),
469 };
470 Poll::Pending
471 }
472}
473
474impl<B, E, T> ClientTask<B, E, T>
475where
476 B: Body + 'static + Unpin,
477 B::Data: Send,
478 E: Http2ClientConnExec<B, T> + Unpin,
479 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
480 T: Read + Write + Unpin,
481{
482 fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
483 let ping = self.ping.clone();
484
485 let send_stream = if !f.is_connect {
486 if !f.eos {
487 let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
488
489 match Pin::new(&mut pipe).poll(cx) {
492 Poll::Ready(_) => (),
493 Poll::Pending => {
494 let conn_drop_ref = self.conn_drop_ref.clone();
495 let ping = ping.clone();
499
500 let pipe = PipeMap {
501 pipe,
502 conn_drop_ref: Some(conn_drop_ref),
503 ping: Some(ping),
504 };
505 self.executor
507 .execute_h2_future(H2ClientFuture::Pipe { pipe });
508 }
509 }
510 }
511
512 None
513 } else {
514 Some(f.body_tx)
515 };
516
517 self.executor.execute_h2_future(H2ClientFuture::Send {
518 send_when: SendWhen {
519 when: ResponseFutMap {
520 fut: f.fut,
521 ping: Some(ping),
522 send_stream: Some(send_stream),
523 },
524 call_back: Some(f.cb),
525 },
526 });
527 }
528}
529
530pin_project! {
531 pub(crate) struct ResponseFutMap<B>
532 where
533 B: Body,
534 B: 'static,
535 {
536 #[pin]
537 fut: ResponseFuture,
538 #[pin]
539 ping: Option<Recorder>,
540 #[pin]
541 send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
542 }
543}
544
545impl<B> Future for ResponseFutMap<B>
546where
547 B: Body + 'static,
548{
549 type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
550
551 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
552 let mut this = self.project();
553
554 let result = ready!(this.fut.poll(cx));
555
556 let ping = this.ping.take().expect("Future polled twice");
557 let send_stream = this.send_stream.take().expect("Future polled twice");
558
559 match result {
560 Ok(res) => {
561 ping.record_non_data();
563
564 let content_length = headers::content_length_parse_all(res.headers());
565 if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
566 if content_length.map_or(false, |len| len != 0) {
567 warn!("h2 connect response with non-zero body not supported");
568
569 send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
570 return Poll::Ready(Err((
571 crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
572 None::<Request<B>>,
573 )));
574 }
575 let (parts, recv_stream) = res.into_parts();
576 let mut res = Response::from_parts(parts, IncomingBody::empty());
577
578 let (pending, on_upgrade) = crate::upgrade::pending();
579 let io = H2Upgraded {
580 ping,
581 send_stream: unsafe { UpgradedSendStream::new(send_stream) },
582 recv_stream,
583 buf: Bytes::new(),
584 };
585 let upgraded = Upgraded::new(io, Bytes::new());
586
587 pending.fulfill(upgraded);
588 res.extensions_mut().insert(on_upgrade);
589
590 Poll::Ready(Ok(res))
591 } else {
592 let res = res.map(|stream| {
593 let ping = ping.for_stream(&stream);
594 IncomingBody::h2(stream, content_length.into(), ping)
595 });
596 Poll::Ready(Ok(res))
597 }
598 }
599 Err(err) => {
600 ping.ensure_not_timed_out().map_err(|e| (e, None))?;
601
602 debug!("client response error: {}", err);
603 Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
604 }
605 }
606 }
607}
608
609impl<B, E, T> Future for ClientTask<B, E, T>
610where
611 B: Body + 'static + Unpin,
612 B::Data: Send,
613 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
614 E: Http2ClientConnExec<B, T> + Unpin,
615 T: Read + Write + Unpin,
616{
617 type Output = crate::Result<Dispatched>;
618
619 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
620 loop {
621 match ready!(self.h2_tx.poll_ready(cx)) {
622 Ok(()) => (),
623 Err(err) => {
624 self.ping.ensure_not_timed_out()?;
625 return if err.reason() == Some(::h2::Reason::NO_ERROR) {
626 trace!("connection gracefully shutdown");
627 Poll::Ready(Ok(Dispatched::Shutdown))
628 } else {
629 Poll::Ready(Err(crate::Error::new_h2(err)))
630 };
631 }
632 };
633
634 if let Some(f) = self.fut_ctx.take() {
637 self.poll_pipe(f, cx);
638 continue;
639 }
640
641 match self.req_rx.poll_recv(cx) {
642 Poll::Ready(Some((req, cb))) => {
643 if cb.is_canceled() {
645 trace!("request callback is canceled");
646 continue;
647 }
648 let (head, body) = req.into_parts();
649 let mut req = ::http::Request::from_parts(head, ());
650 super::strip_connection_headers(req.headers_mut(), true);
651 if let Some(len) = body.size_hint().exact() {
652 if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
653 headers::set_content_length_if_missing(req.headers_mut(), len);
654 }
655 }
656
657 let is_connect = req.method() == Method::CONNECT;
658 let eos = body.is_end_stream();
659
660 if is_connect
661 && headers::content_length_parse_all(req.headers())
662 .map_or(false, |len| len != 0)
663 {
664 warn!("h2 connect request with non-zero body not supported");
665 cb.send(Err(TrySendError {
666 error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
667 message: None,
668 }));
669 continue;
670 }
671
672 if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
673 req.extensions_mut().insert(protocol.into_inner());
674 }
675
676 let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
677 Ok(ok) => ok,
678 Err(err) => {
679 debug!("client send request error: {}", err);
680 cb.send(Err(TrySendError {
681 error: crate::Error::new_h2(err),
682 message: None,
683 }));
684 continue;
685 }
686 };
687
688 let f = FutCtx {
689 is_connect,
690 eos,
691 fut,
692 body_tx,
693 body,
694 cb,
695 };
696
697 match self.h2_tx.poll_ready(cx) {
701 Poll::Pending => {
702 self.fut_ctx = Some(f);
704 return Poll::Pending;
705 }
706 Poll::Ready(Ok(())) => (),
707 Poll::Ready(Err(err)) => {
708 f.cb.send(Err(TrySendError {
709 error: crate::Error::new_h2(err),
710 message: None,
711 }));
712 continue;
713 }
714 }
715 self.poll_pipe(f, cx);
716 continue;
717 }
718
719 Poll::Ready(None) => {
720 trace!("client::dispatch::Sender dropped");
721 return Poll::Ready(Ok(Dispatched::Shutdown));
722 }
723
724 Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
725 Ok(never) => match never {},
726 Err(_conn_is_eof) => {
727 trace!("connection task is closed, closing dispatch task");
728 return Poll::Ready(Ok(Dispatched::Shutdown));
729 }
730 },
731 }
732 }
733 }
734}