1use std::error::Error;
4use std::fmt;
5use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10use std::time::Duration;
11
12use crate::rt::{Read, Write};
13use futures_util::ready;
14use http::{Request, Response};
15
16use super::super::dispatch::{self, TrySendError};
17use crate::body::{Body, Incoming as IncomingBody};
18use crate::common::time::Time;
19use crate::proto;
20use crate::rt::bounds::Http2ClientConnExec;
21use crate::rt::Timer;
22
23pub struct SendRequest<B> {
25 dispatch: dispatch::UnboundedSender<Request<B>, Response<IncomingBody>>,
26}
27
28impl<B> Clone for SendRequest<B> {
29 fn clone(&self) -> SendRequest<B> {
30 SendRequest {
31 dispatch: self.dispatch.clone(),
32 }
33 }
34}
35
36#[must_use = "futures do nothing unless polled"]
41pub struct Connection<T, B, E>
42where
43 T: Read + Write + Unpin,
44 B: Body + 'static,
45 E: Http2ClientConnExec<B, T> + Unpin,
46 B::Error: Into<Box<dyn Error + Send + Sync>>,
47{
48 inner: (PhantomData<T>, proto::h2::ClientTask<B, E, T>),
49}
50
51#[derive(Clone, Debug)]
58pub struct Builder<Ex> {
59 pub(super) exec: Ex,
60 pub(super) timer: Time,
61 h2_builder: proto::h2::client::Config,
62}
63
64pub async fn handshake<E, T, B>(
69 exec: E,
70 io: T,
71) -> crate::Result<(SendRequest<B>, Connection<T, B, E>)>
72where
73 T: Read + Write + Unpin,
74 B: Body + 'static,
75 B::Data: Send,
76 B::Error: Into<Box<dyn Error + Send + Sync>>,
77 E: Http2ClientConnExec<B, T> + Unpin + Clone,
78{
79 Builder::new(exec).handshake(io).await
80}
81
82impl<B> SendRequest<B> {
85 pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
89 if self.is_closed() {
90 Poll::Ready(Err(crate::Error::new_closed()))
91 } else {
92 Poll::Ready(Ok(()))
93 }
94 }
95
96 pub async fn ready(&mut self) -> crate::Result<()> {
100 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
101 }
102
103 pub fn is_ready(&self) -> bool {
111 self.dispatch.is_ready()
112 }
113
114 pub fn is_closed(&self) -> bool {
116 self.dispatch.is_closed()
117 }
118}
119
120impl<B> SendRequest<B>
121where
122 B: Body + 'static,
123{
124 pub fn send_request(
133 &mut self,
134 req: Request<B>,
135 ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
136 let sent = self.dispatch.send(req);
137
138 async move {
139 match sent {
140 Ok(rx) => match rx.await {
141 Ok(Ok(resp)) => Ok(resp),
142 Ok(Err(err)) => Err(err),
143 Err(_canceled) => panic!("dispatch dropped without returning error"),
145 },
146 Err(_req) => {
147 debug!("connection was not ready");
148
149 Err(crate::Error::new_canceled().with("connection was not ready"))
150 }
151 }
152 }
153 }
154
155 pub fn try_send_request(
164 &mut self,
165 req: Request<B>,
166 ) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
167 let sent = self.dispatch.try_send(req);
168 async move {
169 match sent {
170 Ok(rx) => match rx.await {
171 Ok(Ok(res)) => Ok(res),
172 Ok(Err(err)) => Err(err),
173 Err(_) => panic!("dispatch dropped without returning error"),
175 },
176 Err(req) => {
177 debug!("connection was not ready");
178 let error = crate::Error::new_canceled().with("connection was not ready");
179 Err(TrySendError {
180 error,
181 message: Some(req),
182 })
183 }
184 }
185 }
186 }
187}
188
189impl<B> fmt::Debug for SendRequest<B> {
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 f.debug_struct("SendRequest").finish()
192 }
193}
194
195impl<T, B, E> Connection<T, B, E>
198where
199 T: Read + Write + Unpin + 'static,
200 B: Body + Unpin + 'static,
201 B::Data: Send,
202 B::Error: Into<Box<dyn Error + Send + Sync>>,
203 E: Http2ClientConnExec<B, T> + Unpin,
204{
205 pub fn is_extended_connect_protocol_enabled(&self) -> bool {
215 self.inner.1.is_extended_connect_protocol_enabled()
216 }
217}
218
219impl<T, B, E> fmt::Debug for Connection<T, B, E>
220where
221 T: Read + Write + fmt::Debug + 'static + Unpin,
222 B: Body + 'static,
223 E: Http2ClientConnExec<B, T> + Unpin,
224 B::Error: Into<Box<dyn Error + Send + Sync>>,
225{
226 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227 f.debug_struct("Connection").finish()
228 }
229}
230
231impl<T, B, E> Future for Connection<T, B, E>
232where
233 T: Read + Write + Unpin + 'static,
234 B: Body + 'static + Unpin,
235 B::Data: Send,
236 E: Unpin,
237 B::Error: Into<Box<dyn Error + Send + Sync>>,
238 E: Http2ClientConnExec<B, T> + Unpin,
239{
240 type Output = crate::Result<()>;
241
242 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
243 match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
244 proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
245 #[cfg(feature = "http1")]
246 proto::Dispatched::Upgrade(_pending) => unreachable!("http2 cannot upgrade"),
247 }
248 }
249}
250
251impl<Ex> Builder<Ex>
254where
255 Ex: Clone,
256{
257 #[inline]
259 pub fn new(exec: Ex) -> Builder<Ex> {
260 Builder {
261 exec,
262 timer: Time::Empty,
263 h2_builder: Default::default(),
264 }
265 }
266
267 pub fn timer<M>(&mut self, timer: M) -> &mut Builder<Ex>
269 where
270 M: Timer + Send + Sync + 'static,
271 {
272 self.timer = Time::Timer(Arc::new(timer));
273 self
274 }
275
276 pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
285 if let Some(sz) = sz.into() {
286 self.h2_builder.adaptive_window = false;
287 self.h2_builder.initial_stream_window_size = sz;
288 }
289 self
290 }
291
292 pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
298 if let Some(sz) = sz.into() {
299 self.h2_builder.adaptive_window = false;
300 self.h2_builder.initial_conn_window_size = sz;
301 }
302 self
303 }
304
305 pub fn initial_max_send_streams(&mut self, initial: impl Into<Option<usize>>) -> &mut Self {
316 if let Some(initial) = initial.into() {
317 self.h2_builder.initial_max_send_streams = initial;
318 }
319 self
320 }
321
322 pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
328 use proto::h2::SPEC_WINDOW_SIZE;
329
330 self.h2_builder.adaptive_window = enabled;
331 if enabled {
332 self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
333 self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
334 }
335 self
336 }
337
338 pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
344 if let Some(sz) = sz.into() {
345 self.h2_builder.max_frame_size = sz;
346 }
347 self
348 }
349
350 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
354 self.h2_builder.max_header_list_size = max;
355 self
356 }
357
358 pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
365 self.h2_builder.keep_alive_interval = interval.into();
366 self
367 }
368
369 pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
376 self.h2_builder.keep_alive_timeout = timeout;
377 self
378 }
379
380 pub fn keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
389 self.h2_builder.keep_alive_while_idle = enabled;
390 self
391 }
392
393 pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
402 self.h2_builder.max_concurrent_reset_streams = Some(max);
403 self
404 }
405
406 pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
414 assert!(max <= u32::MAX as usize);
415 self.h2_builder.max_send_buffer_size = max;
416 self
417 }
418
419 pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
426 self.h2_builder.max_pending_accept_reset_streams = max.into();
427 self
428 }
429
430 pub fn handshake<T, B>(
436 &self,
437 io: T,
438 ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B, Ex>)>>
439 where
440 T: Read + Write + Unpin,
441 B: Body + 'static,
442 B::Data: Send,
443 B::Error: Into<Box<dyn Error + Send + Sync>>,
444 Ex: Http2ClientConnExec<B, T> + Unpin,
445 {
446 let opts = self.clone();
447
448 async move {
449 trace!("client handshake HTTP/2");
450
451 let (tx, rx) = dispatch::channel();
452 let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec, opts.timer)
453 .await?;
454 Ok((
455 SendRequest {
456 dispatch: tx.unbound(),
457 },
458 Connection {
459 inner: (PhantomData, h2),
460 },
461 ))
462 }
463 }
464}
465
466#[cfg(test)]
467mod tests {
468
469 #[tokio::test]
470 #[ignore] async fn send_sync_executor_of_non_send_futures() {
472 #[derive(Clone)]
473 struct LocalTokioExecutor;
474
475 impl<F> crate::rt::Executor<F> for LocalTokioExecutor
476 where
477 F: std::future::Future + 'static, {
479 fn execute(&self, fut: F) {
480 tokio::task::spawn_local(fut);
482 }
483 }
484
485 #[allow(unused)]
486 async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) {
487 let (_sender, conn) = crate::client::conn::http2::handshake::<
488 _,
489 _,
490 http_body_util::Empty<bytes::Bytes>,
491 >(LocalTokioExecutor, io)
492 .await
493 .unwrap();
494
495 tokio::task::spawn_local(async move {
496 conn.await.unwrap();
497 });
498 }
499 }
500
501 #[tokio::test]
502 #[ignore] async fn not_send_not_sync_executor_of_not_send_futures() {
504 #[derive(Clone)]
505 struct LocalTokioExecutor {
506 _x: std::marker::PhantomData<std::rc::Rc<()>>,
507 }
508
509 impl<F> crate::rt::Executor<F> for LocalTokioExecutor
510 where
511 F: std::future::Future + 'static, {
513 fn execute(&self, fut: F) {
514 tokio::task::spawn_local(fut);
516 }
517 }
518
519 #[allow(unused)]
520 async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) {
521 let (_sender, conn) =
522 crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(
523 LocalTokioExecutor {
524 _x: Default::default(),
525 },
526 io,
527 )
528 .await
529 .unwrap();
530
531 tokio::task::spawn_local(async move {
532 conn.await.unwrap();
533 });
534 }
535 }
536
537 #[tokio::test]
538 #[ignore] async fn send_not_sync_executor_of_not_send_futures() {
540 #[derive(Clone)]
541 struct LocalTokioExecutor {
542 _x: std::marker::PhantomData<std::cell::Cell<()>>,
543 }
544
545 impl<F> crate::rt::Executor<F> for LocalTokioExecutor
546 where
547 F: std::future::Future + 'static, {
549 fn execute(&self, fut: F) {
550 tokio::task::spawn_local(fut);
552 }
553 }
554
555 #[allow(unused)]
556 async fn run(io: impl crate::rt::Read + crate::rt::Write + Unpin + 'static) {
557 let (_sender, conn) =
558 crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(
559 LocalTokioExecutor {
560 _x: Default::default(),
561 },
562 io,
563 )
564 .await
565 .unwrap();
566
567 tokio::task::spawn_local(async move {
568 conn.await.unwrap();
569 });
570 }
571 }
572
573 #[tokio::test]
574 #[ignore] async fn send_sync_executor_of_send_futures() {
576 #[derive(Clone)]
577 struct TokioExecutor;
578
579 impl<F> crate::rt::Executor<F> for TokioExecutor
580 where
581 F: std::future::Future + 'static + Send,
582 F::Output: Send + 'static,
583 {
584 fn execute(&self, fut: F) {
585 tokio::task::spawn(fut);
586 }
587 }
588
589 #[allow(unused)]
590 async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) {
591 let (_sender, conn) = crate::client::conn::http2::handshake::<
592 _,
593 _,
594 http_body_util::Empty<bytes::Bytes>,
595 >(TokioExecutor, io)
596 .await
597 .unwrap();
598
599 tokio::task::spawn(async move {
600 conn.await.unwrap();
601 });
602 }
603 }
604
605 #[tokio::test]
606 #[ignore] async fn not_send_not_sync_executor_of_send_futures() {
608 #[derive(Clone)]
609 struct TokioExecutor {
610 _x: std::marker::PhantomData<std::rc::Rc<()>>,
612 }
613
614 impl<F> crate::rt::Executor<F> for TokioExecutor
615 where
616 F: std::future::Future + 'static + Send,
617 F::Output: Send + 'static,
618 {
619 fn execute(&self, fut: F) {
620 tokio::task::spawn(fut);
621 }
622 }
623
624 #[allow(unused)]
625 async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) {
626 let (_sender, conn) =
627 crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(
628 TokioExecutor {
629 _x: Default::default(),
630 },
631 io,
632 )
633 .await
634 .unwrap();
635
636 tokio::task::spawn_local(async move {
637 conn.await.unwrap();
639 });
640 }
641 }
642
643 #[tokio::test]
644 #[ignore] async fn send_not_sync_executor_of_send_futures() {
646 #[derive(Clone)]
647 struct TokioExecutor {
648 _x: std::marker::PhantomData<std::cell::Cell<()>>,
650 }
651
652 impl<F> crate::rt::Executor<F> for TokioExecutor
653 where
654 F: std::future::Future + 'static + Send,
655 F::Output: Send + 'static,
656 {
657 fn execute(&self, fut: F) {
658 tokio::task::spawn(fut);
659 }
660 }
661
662 #[allow(unused)]
663 async fn run(io: impl crate::rt::Read + crate::rt::Write + Send + Unpin + 'static) {
664 let (_sender, conn) =
665 crate::client::conn::http2::handshake::<_, _, http_body_util::Empty<bytes::Bytes>>(
666 TokioExecutor {
667 _x: Default::default(),
668 },
669 io,
670 )
671 .await
672 .unwrap();
673
674 tokio::task::spawn_local(async move {
675 conn.await.unwrap();
677 });
678 }
679 }
680}