1use std::fmt;
2#[cfg(feature = "server")]
3use std::future::Future;
4use std::io;
5use std::marker::{PhantomData, Unpin};
6use std::pin::Pin;
7use std::task::{Context, Poll};
8#[cfg(feature = "server")]
9use std::time::{Duration, Instant};
10
11use crate::rt::{Read, Write};
12use bytes::{Buf, Bytes};
13use futures_util::ready;
14use http::header::{HeaderValue, CONNECTION, TE};
15use http::{HeaderMap, Method, Version};
16use http_body::Frame;
17use httparse::ParserConfig;
18
19use super::io::Buffered;
20use super::{Decoder, Encode, EncodedBuf, Encoder, Http1Transaction, ParseContext, Wants};
21use crate::body::DecodedLength;
22#[cfg(feature = "server")]
23use crate::common::time::Time;
24use crate::headers::connection_keep_alive;
25use crate::proto::{BodyLength, MessageHead};
26#[cfg(feature = "server")]
27use crate::rt::Sleep;
28
29const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
30
31pub(crate) struct Conn<I, B, T> {
39 io: Buffered<I, EncodedBuf<B>>,
40 state: State,
41 _marker: PhantomData<fn(T)>,
42}
43
44impl<I, B, T> Conn<I, B, T>
45where
46 I: Read + Write + Unpin,
47 B: Buf,
48 T: Http1Transaction,
49{
50 pub(crate) fn new(io: I) -> Conn<I, B, T> {
51 Conn {
52 io: Buffered::new(io),
53 state: State {
54 allow_half_close: false,
55 cached_headers: None,
56 error: None,
57 keep_alive: KA::Busy,
58 method: None,
59 h1_parser_config: ParserConfig::default(),
60 h1_max_headers: None,
61 #[cfg(feature = "server")]
62 h1_header_read_timeout: None,
63 #[cfg(feature = "server")]
64 h1_header_read_timeout_fut: None,
65 #[cfg(feature = "server")]
66 h1_header_read_timeout_running: false,
67 #[cfg(feature = "server")]
68 date_header: true,
69 #[cfg(feature = "server")]
70 timer: Time::Empty,
71 preserve_header_case: false,
72 #[cfg(feature = "ffi")]
73 preserve_header_order: false,
74 title_case_headers: false,
75 h09_responses: false,
76 #[cfg(feature = "ffi")]
77 on_informational: None,
78 notify_read: false,
79 reading: Reading::Init,
80 writing: Writing::Init,
81 upgrade: None,
82 version: Version::HTTP_11,
85 allow_trailer_fields: false,
86 },
87 _marker: PhantomData,
88 }
89 }
90
91 #[cfg(feature = "server")]
92 pub(crate) fn set_timer(&mut self, timer: Time) {
93 self.state.timer = timer;
94 }
95
96 #[cfg(feature = "server")]
97 pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
98 self.io.set_flush_pipeline(enabled);
99 }
100
101 pub(crate) fn set_write_strategy_queue(&mut self) {
102 self.io.set_write_strategy_queue();
103 }
104
105 pub(crate) fn set_max_buf_size(&mut self, max: usize) {
106 self.io.set_max_buf_size(max);
107 }
108
109 #[cfg(feature = "client")]
110 pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
111 self.io.set_read_buf_exact_size(sz);
112 }
113
114 pub(crate) fn set_write_strategy_flatten(&mut self) {
115 self.io.set_write_strategy_flatten();
116 }
117
118 #[cfg(feature = "client")]
119 pub(crate) fn set_h1_parser_config(&mut self, parser_config: ParserConfig) {
120 self.state.h1_parser_config = parser_config;
121 }
122
123 pub(crate) fn set_title_case_headers(&mut self) {
124 self.state.title_case_headers = true;
125 }
126
127 pub(crate) fn set_preserve_header_case(&mut self) {
128 self.state.preserve_header_case = true;
129 }
130
131 #[cfg(feature = "ffi")]
132 pub(crate) fn set_preserve_header_order(&mut self) {
133 self.state.preserve_header_order = true;
134 }
135
136 #[cfg(feature = "client")]
137 pub(crate) fn set_h09_responses(&mut self) {
138 self.state.h09_responses = true;
139 }
140
141 pub(crate) fn set_http1_max_headers(&mut self, val: usize) {
142 self.state.h1_max_headers = Some(val);
143 }
144
145 #[cfg(feature = "server")]
146 pub(crate) fn set_http1_header_read_timeout(&mut self, val: Duration) {
147 self.state.h1_header_read_timeout = Some(val);
148 }
149
150 #[cfg(feature = "server")]
151 pub(crate) fn set_allow_half_close(&mut self) {
152 self.state.allow_half_close = true;
153 }
154
155 pub(crate) fn into_inner(self) -> (I, Bytes) {
156 self.io.into_inner()
157 }
158
159 pub(crate) fn pending_upgrade(&mut self) -> Option<crate::upgrade::Pending> {
160 self.state.upgrade.take()
161 }
162
163 pub(crate) fn is_read_closed(&self) -> bool {
164 self.state.is_read_closed()
165 }
166
167 pub(crate) fn is_write_closed(&self) -> bool {
168 self.state.is_write_closed()
169 }
170
171 pub(crate) fn can_read_head(&self) -> bool {
172 if !matches!(self.state.reading, Reading::Init) {
173 return false;
174 }
175
176 if T::should_read_first() {
177 return true;
178 }
179
180 !matches!(self.state.writing, Writing::Init)
181 }
182
183 pub(crate) fn can_read_body(&self) -> bool {
184 matches!(
185 self.state.reading,
186 Reading::Body(..) | Reading::Continue(..)
187 )
188 }
189
190 #[cfg(feature = "server")]
191 pub(crate) fn has_initial_read_write_state(&self) -> bool {
192 matches!(self.state.reading, Reading::Init)
193 && matches!(self.state.writing, Writing::Init)
194 && self.io.read_buf().is_empty()
195 }
196
197 fn should_error_on_eof(&self) -> bool {
198 T::should_error_on_parse_eof() && !self.state.is_idle()
200 }
201
202 fn has_h2_prefix(&self) -> bool {
203 let read_buf = self.io.read_buf();
204 read_buf.len() >= 24 && read_buf[..24] == *H2_PREFACE
205 }
206
207 pub(super) fn poll_read_head(
208 &mut self,
209 cx: &mut Context<'_>,
210 ) -> Poll<Option<crate::Result<(MessageHead<T::Incoming>, DecodedLength, Wants)>>> {
211 debug_assert!(self.can_read_head());
212 trace!("Conn::read_head");
213
214 #[cfg(feature = "server")]
215 if !self.state.h1_header_read_timeout_running {
216 if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
217 let deadline = Instant::now() + h1_header_read_timeout;
218 self.state.h1_header_read_timeout_running = true;
219 match self.state.h1_header_read_timeout_fut {
220 Some(ref mut h1_header_read_timeout_fut) => {
221 trace!("resetting h1 header read timeout timer");
222 self.state.timer.reset(h1_header_read_timeout_fut, deadline);
223 }
224 None => {
225 trace!("setting h1 header read timeout timer");
226 self.state.h1_header_read_timeout_fut =
227 Some(self.state.timer.sleep_until(deadline));
228 }
229 }
230 }
231 }
232
233 let msg = match self.io.parse::<T>(
234 cx,
235 ParseContext {
236 cached_headers: &mut self.state.cached_headers,
237 req_method: &mut self.state.method,
238 h1_parser_config: self.state.h1_parser_config.clone(),
239 h1_max_headers: self.state.h1_max_headers,
240 preserve_header_case: self.state.preserve_header_case,
241 #[cfg(feature = "ffi")]
242 preserve_header_order: self.state.preserve_header_order,
243 h09_responses: self.state.h09_responses,
244 #[cfg(feature = "ffi")]
245 on_informational: &mut self.state.on_informational,
246 },
247 ) {
248 Poll::Ready(Ok(msg)) => msg,
249 Poll::Ready(Err(e)) => return self.on_read_head_error(e),
250 Poll::Pending => {
251 #[cfg(feature = "server")]
252 if self.state.h1_header_read_timeout_running {
253 if let Some(ref mut h1_header_read_timeout_fut) =
254 self.state.h1_header_read_timeout_fut
255 {
256 if Pin::new(h1_header_read_timeout_fut).poll(cx).is_ready() {
257 self.state.h1_header_read_timeout_running = false;
258
259 warn!("read header from client timeout");
260 return Poll::Ready(Some(Err(crate::Error::new_header_timeout())));
261 }
262 }
263 }
264
265 return Poll::Pending;
266 }
267 };
268
269 #[cfg(feature = "server")]
270 {
271 self.state.h1_header_read_timeout_running = false;
272 self.state.h1_header_read_timeout_fut = None;
273 }
274
275 debug!("incoming body is {}", msg.decode);
279
280 self.state.h09_responses = false;
282
283 #[cfg(feature = "ffi")]
285 {
286 self.state.on_informational = None;
287 }
288
289 self.state.busy();
290 self.state.keep_alive &= msg.keep_alive;
291 self.state.version = msg.head.version;
292
293 let mut wants = if msg.wants_upgrade {
294 Wants::UPGRADE
295 } else {
296 Wants::EMPTY
297 };
298
299 if msg.decode == DecodedLength::ZERO {
300 if msg.expect_continue {
301 debug!("ignoring expect-continue since body is empty");
302 }
303 self.state.reading = Reading::KeepAlive;
304 if !T::should_read_first() {
305 self.try_keep_alive(cx);
306 }
307 } else if msg.expect_continue && msg.head.version.gt(&Version::HTTP_10) {
308 let h1_max_header_size = None; self.state.reading = Reading::Continue(Decoder::new(
310 msg.decode,
311 self.state.h1_max_headers,
312 h1_max_header_size,
313 ));
314 wants = wants.add(Wants::EXPECT);
315 } else {
316 let h1_max_header_size = None; self.state.reading = Reading::Body(Decoder::new(
318 msg.decode,
319 self.state.h1_max_headers,
320 h1_max_header_size,
321 ));
322 }
323
324 self.state.allow_trailer_fields = msg
325 .head
326 .headers
327 .get(TE)
328 .map_or(false, |te_header| te_header == "trailers");
329
330 Poll::Ready(Some(Ok((msg.head, msg.decode, wants))))
331 }
332
333 fn on_read_head_error<Z>(&mut self, e: crate::Error) -> Poll<Option<crate::Result<Z>>> {
334 let must_error = self.should_error_on_eof();
338 self.close_read();
339 self.io.consume_leading_lines();
340 let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
341 if was_mid_parse || must_error {
342 debug!(
344 "parse error ({}) with {} bytes",
345 e,
346 self.io.read_buf().len()
347 );
348 match self.on_parse_error(e) {
349 Ok(()) => Poll::Pending, Err(e) => Poll::Ready(Some(Err(e))),
351 }
352 } else {
353 debug!("read eof");
354 self.close_write();
355 Poll::Ready(None)
356 }
357 }
358
359 pub(crate) fn poll_read_body(
360 &mut self,
361 cx: &mut Context<'_>,
362 ) -> Poll<Option<io::Result<Frame<Bytes>>>> {
363 debug_assert!(self.can_read_body());
364
365 let (reading, ret) = match self.state.reading {
366 Reading::Body(ref mut decoder) => {
367 match ready!(decoder.decode(cx, &mut self.io)) {
368 Ok(frame) => {
369 if frame.is_data() {
370 let slice = frame.data_ref().unwrap_or_else(|| unreachable!());
371 let (reading, maybe_frame) = if decoder.is_eof() {
372 debug!("incoming body completed");
373 (
374 Reading::KeepAlive,
375 if !slice.is_empty() {
376 Some(Ok(frame))
377 } else {
378 None
379 },
380 )
381 } else if slice.is_empty() {
382 error!("incoming body unexpectedly ended");
383 (Reading::Closed, None)
387 } else {
388 return Poll::Ready(Some(Ok(frame)));
389 };
390 (reading, Poll::Ready(maybe_frame))
391 } else if frame.is_trailers() {
392 (Reading::Closed, Poll::Ready(Some(Ok(frame))))
393 } else {
394 trace!("discarding unknown frame");
395 (Reading::Closed, Poll::Ready(None))
396 }
397 }
398 Err(e) => {
399 debug!("incoming body decode error: {}", e);
400 (Reading::Closed, Poll::Ready(Some(Err(e))))
401 }
402 }
403 }
404 Reading::Continue(ref decoder) => {
405 if let Writing::Init = self.state.writing {
407 trace!("automatically sending 100 Continue");
408 let cont = b"HTTP/1.1 100 Continue\r\n\r\n";
409 self.io.headers_buf().extend_from_slice(cont);
410 }
411
412 self.state.reading = Reading::Body(decoder.clone());
414 return self.poll_read_body(cx);
415 }
416 _ => unreachable!("poll_read_body invalid state: {:?}", self.state.reading),
417 };
418
419 self.state.reading = reading;
420 self.try_keep_alive(cx);
421 ret
422 }
423
424 pub(crate) fn wants_read_again(&mut self) -> bool {
425 let ret = self.state.notify_read;
426 self.state.notify_read = false;
427 ret
428 }
429
430 pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
431 debug_assert!(!self.can_read_head() && !self.can_read_body());
432
433 if self.is_read_closed() {
434 Poll::Pending
435 } else if self.is_mid_message() {
436 self.mid_message_detect_eof(cx)
437 } else {
438 self.require_empty_read(cx)
439 }
440 }
441
442 fn is_mid_message(&self) -> bool {
443 !matches!(
444 (&self.state.reading, &self.state.writing),
445 (&Reading::Init, &Writing::Init)
446 )
447 }
448
449 fn require_empty_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
454 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
455 debug_assert!(!self.is_mid_message());
456 debug_assert!(T::is_client());
457
458 if !self.io.read_buf().is_empty() {
459 debug!("received an unexpected {} bytes", self.io.read_buf().len());
460 return Poll::Ready(Err(crate::Error::new_unexpected_message()));
461 }
462
463 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
464
465 if num_read == 0 {
466 let ret = if self.should_error_on_eof() {
467 trace!("found unexpected EOF on busy connection: {:?}", self.state);
468 Poll::Ready(Err(crate::Error::new_incomplete()))
469 } else {
470 trace!("found EOF on idle connection, closing");
471 Poll::Ready(Ok(()))
472 };
473
474 self.state.close_read();
476 return ret;
477 }
478
479 debug!(
480 "received unexpected {} bytes on an idle connection",
481 num_read
482 );
483 Poll::Ready(Err(crate::Error::new_unexpected_message()))
484 }
485
486 fn mid_message_detect_eof(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
487 debug_assert!(!self.can_read_head() && !self.can_read_body() && !self.is_read_closed());
488 debug_assert!(self.is_mid_message());
489
490 if self.state.allow_half_close || !self.io.read_buf().is_empty() {
491 return Poll::Pending;
492 }
493
494 let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;
495
496 if num_read == 0 {
497 trace!("found unexpected EOF on busy connection: {:?}", self.state);
498 self.state.close_read();
499 Poll::Ready(Err(crate::Error::new_incomplete()))
500 } else {
501 Poll::Ready(Ok(()))
502 }
503 }
504
505 fn force_io_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
506 debug_assert!(!self.state.is_read_closed());
507
508 let result = ready!(self.io.poll_read_from_io(cx));
509 Poll::Ready(result.map_err(|e| {
510 trace!(error = %e, "force_io_read; io error");
511 self.state.close();
512 e
513 }))
514 }
515
516 fn maybe_notify(&mut self, cx: &mut Context<'_>) {
517 match self.state.reading {
523 Reading::Continue(..) | Reading::Body(..) | Reading::KeepAlive | Reading::Closed => {
524 return
525 }
526 Reading::Init => (),
527 };
528
529 match self.state.writing {
530 Writing::Body(..) => return,
531 Writing::Init | Writing::KeepAlive | Writing::Closed => (),
532 }
533
534 if !self.io.is_read_blocked() {
535 if self.io.read_buf().is_empty() {
536 match self.io.poll_read_from_io(cx) {
537 Poll::Ready(Ok(n)) => {
538 if n == 0 {
539 trace!("maybe_notify; read eof");
540 if self.state.is_idle() {
541 self.state.close();
542 } else {
543 self.close_read()
544 }
545 return;
546 }
547 }
548 Poll::Pending => {
549 trace!("maybe_notify; read_from_io blocked");
550 return;
551 }
552 Poll::Ready(Err(e)) => {
553 trace!("maybe_notify; read_from_io error: {}", e);
554 self.state.close();
555 self.state.error = Some(crate::Error::new_io(e));
556 }
557 }
558 }
559 self.state.notify_read = true;
560 }
561 }
562
563 fn try_keep_alive(&mut self, cx: &mut Context<'_>) {
564 self.state.try_keep_alive::<T>();
565 self.maybe_notify(cx);
566 }
567
568 pub(crate) fn can_write_head(&self) -> bool {
569 if !T::should_read_first() && matches!(self.state.reading, Reading::Closed) {
570 return false;
571 }
572
573 match self.state.writing {
574 Writing::Init => self.io.can_headers_buf(),
575 _ => false,
576 }
577 }
578
579 pub(crate) fn can_write_body(&self) -> bool {
580 match self.state.writing {
581 Writing::Body(..) => true,
582 Writing::Init | Writing::KeepAlive | Writing::Closed => false,
583 }
584 }
585
586 pub(crate) fn can_buffer_body(&self) -> bool {
587 self.io.can_buffer()
588 }
589
590 pub(crate) fn write_head(&mut self, head: MessageHead<T::Outgoing>, body: Option<BodyLength>) {
591 if let Some(encoder) = self.encode_head(head, body) {
592 self.state.writing = if !encoder.is_eof() {
593 Writing::Body(encoder)
594 } else if encoder.is_last() {
595 Writing::Closed
596 } else {
597 Writing::KeepAlive
598 };
599 }
600 }
601
602 fn encode_head(
603 &mut self,
604 mut head: MessageHead<T::Outgoing>,
605 body: Option<BodyLength>,
606 ) -> Option<Encoder> {
607 debug_assert!(self.can_write_head());
608
609 if !T::should_read_first() {
610 self.state.busy();
611 }
612
613 self.enforce_version(&mut head);
614
615 let buf = self.io.headers_buf();
616 match super::role::encode_headers::<T>(
617 Encode {
618 head: &mut head,
619 body,
620 #[cfg(feature = "server")]
621 keep_alive: self.state.wants_keep_alive(),
622 req_method: &mut self.state.method,
623 title_case_headers: self.state.title_case_headers,
624 #[cfg(feature = "server")]
625 date_header: self.state.date_header,
626 },
627 buf,
628 ) {
629 Ok(encoder) => {
630 debug_assert!(self.state.cached_headers.is_none());
631 debug_assert!(head.headers.is_empty());
632 self.state.cached_headers = Some(head.headers);
633
634 #[cfg(feature = "ffi")]
635 {
636 self.state.on_informational =
637 head.extensions.remove::<crate::ffi::OnInformational>();
638 }
639
640 Some(encoder)
641 }
642 Err(err) => {
643 self.state.error = Some(err);
644 self.state.writing = Writing::Closed;
645 None
646 }
647 }
648 }
649
650 fn fix_keep_alive(&mut self, head: &mut MessageHead<T::Outgoing>) {
652 let outgoing_is_keep_alive = head
653 .headers
654 .get(CONNECTION)
655 .map_or(false, connection_keep_alive);
656
657 if !outgoing_is_keep_alive {
658 match head.version {
659 Version::HTTP_10 => self.state.disable_keep_alive(),
662 Version::HTTP_11 => {
665 if self.state.wants_keep_alive() {
666 head.headers
667 .insert(CONNECTION, HeaderValue::from_static("keep-alive"));
668 }
669 }
670 _ => (),
671 }
672 }
673 }
674
675 fn enforce_version(&mut self, head: &mut MessageHead<T::Outgoing>) {
678 if let Version::HTTP_10 = self.state.version {
679 self.fix_keep_alive(head);
681 head.version = Version::HTTP_10;
684 }
685 }
689
690 pub(crate) fn write_body(&mut self, chunk: B) {
691 debug_assert!(self.can_write_body() && self.can_buffer_body());
692 debug_assert!(chunk.remaining() != 0);
694
695 let state = match self.state.writing {
696 Writing::Body(ref mut encoder) => {
697 self.io.buffer(encoder.encode(chunk));
698
699 if !encoder.is_eof() {
700 return;
701 }
702
703 if encoder.is_last() {
704 Writing::Closed
705 } else {
706 Writing::KeepAlive
707 }
708 }
709 _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
710 };
711
712 self.state.writing = state;
713 }
714
715 pub(crate) fn write_trailers(&mut self, trailers: HeaderMap) {
716 if T::is_server() && !self.state.allow_trailer_fields {
717 debug!("trailers not allowed to be sent");
718 return;
719 }
720 debug_assert!(self.can_write_body() && self.can_buffer_body());
721
722 match self.state.writing {
723 Writing::Body(ref encoder) => {
724 if let Some(enc_buf) =
725 encoder.encode_trailers(trailers, self.state.title_case_headers)
726 {
727 self.io.buffer(enc_buf);
728
729 self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
730 Writing::Closed
731 } else {
732 Writing::KeepAlive
733 };
734 }
735 }
736 _ => unreachable!("write_trailers invalid state: {:?}", self.state.writing),
737 }
738 }
739
740 pub(crate) fn write_body_and_end(&mut self, chunk: B) {
741 debug_assert!(self.can_write_body() && self.can_buffer_body());
742 debug_assert!(chunk.remaining() != 0);
744
745 let state = match self.state.writing {
746 Writing::Body(ref encoder) => {
747 let can_keep_alive = encoder.encode_and_end(chunk, self.io.write_buf());
748 if can_keep_alive {
749 Writing::KeepAlive
750 } else {
751 Writing::Closed
752 }
753 }
754 _ => unreachable!("write_body invalid state: {:?}", self.state.writing),
755 };
756
757 self.state.writing = state;
758 }
759
760 pub(crate) fn end_body(&mut self) -> crate::Result<()> {
761 debug_assert!(self.can_write_body());
762
763 let encoder = match self.state.writing {
764 Writing::Body(ref mut enc) => enc,
765 _ => return Ok(()),
766 };
767
768 match encoder.end() {
770 Ok(end) => {
771 if let Some(end) = end {
772 self.io.buffer(end);
773 }
774
775 self.state.writing = if encoder.is_last() || encoder.is_close_delimited() {
776 Writing::Closed
777 } else {
778 Writing::KeepAlive
779 };
780
781 Ok(())
782 }
783 Err(not_eof) => {
784 self.state.writing = Writing::Closed;
785 Err(crate::Error::new_body_write_aborted().with(not_eof))
786 }
787 }
788 }
789
790 fn on_parse_error(&mut self, err: crate::Error) -> crate::Result<()> {
796 if let Writing::Init = self.state.writing {
797 if self.has_h2_prefix() {
798 return Err(crate::Error::new_version_h2());
799 }
800 if let Some(msg) = T::on_error(&err) {
801 self.state.cached_headers.take();
804 self.write_head(msg, None);
805 self.state.error = Some(err);
806 return Ok(());
807 }
808 }
809
810 Err(err)
812 }
813
814 pub(crate) fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
815 ready!(Pin::new(&mut self.io).poll_flush(cx))?;
816 self.try_keep_alive(cx);
817 trace!("flushed({}): {:?}", T::LOG, self.state);
818 Poll::Ready(Ok(()))
819 }
820
821 pub(crate) fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
822 match ready!(Pin::new(self.io.io_mut()).poll_shutdown(cx)) {
823 Ok(()) => {
824 trace!("shut down IO complete");
825 Poll::Ready(Ok(()))
826 }
827 Err(e) => {
828 debug!("error shutting down IO: {}", e);
829 Poll::Ready(Err(e))
830 }
831 }
832 }
833
834 pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) {
836 if let Reading::Continue(ref decoder) = self.state.reading {
837 self.state.reading = Reading::Body(decoder.clone());
840 }
841
842 let _ = self.poll_read_body(cx);
843
844 match self.state.reading {
846 Reading::Init | Reading::KeepAlive => {
847 trace!("body drained")
848 }
849 _ => self.close_read(),
850 }
851 }
852
853 pub(crate) fn close_read(&mut self) {
854 self.state.close_read();
855 }
856
857 pub(crate) fn close_write(&mut self) {
858 self.state.close_write();
859 }
860
861 #[cfg(feature = "server")]
862 pub(crate) fn disable_keep_alive(&mut self) {
863 if self.state.is_idle() {
864 trace!("disable_keep_alive; closing idle connection");
865 self.state.close();
866 } else {
867 trace!("disable_keep_alive; in-progress connection");
868 self.state.disable_keep_alive();
869 }
870 }
871
872 pub(crate) fn take_error(&mut self) -> crate::Result<()> {
873 if let Some(err) = self.state.error.take() {
874 Err(err)
875 } else {
876 Ok(())
877 }
878 }
879
880 pub(super) fn on_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
881 trace!("{}: prepare possible HTTP upgrade", T::LOG);
882 self.state.prepare_upgrade()
883 }
884}
885
886impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
887 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
888 f.debug_struct("Conn")
889 .field("state", &self.state)
890 .field("io", &self.io)
891 .finish()
892 }
893}
894
895impl<I: Unpin, B, T> Unpin for Conn<I, B, T> {}
897
898struct State {
899 allow_half_close: bool,
900 cached_headers: Option<HeaderMap>,
902 error: Option<crate::Error>,
905 keep_alive: KA,
907 method: Option<Method>,
912 h1_parser_config: ParserConfig,
913 h1_max_headers: Option<usize>,
914 #[cfg(feature = "server")]
915 h1_header_read_timeout: Option<Duration>,
916 #[cfg(feature = "server")]
917 h1_header_read_timeout_fut: Option<Pin<Box<dyn Sleep>>>,
918 #[cfg(feature = "server")]
919 h1_header_read_timeout_running: bool,
920 #[cfg(feature = "server")]
921 date_header: bool,
922 #[cfg(feature = "server")]
923 timer: Time,
924 preserve_header_case: bool,
925 #[cfg(feature = "ffi")]
926 preserve_header_order: bool,
927 title_case_headers: bool,
928 h09_responses: bool,
929 #[cfg(feature = "ffi")]
933 on_informational: Option<crate::ffi::OnInformational>,
934 notify_read: bool,
937 reading: Reading,
939 writing: Writing,
941 upgrade: Option<crate::upgrade::Pending>,
943 version: Version,
945 allow_trailer_fields: bool,
947}
948
949#[derive(Debug)]
950enum Reading {
951 Init,
952 Continue(Decoder),
953 Body(Decoder),
954 KeepAlive,
955 Closed,
956}
957
958enum Writing {
959 Init,
960 Body(Encoder),
961 KeepAlive,
962 Closed,
963}
964
965impl fmt::Debug for State {
966 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
967 let mut builder = f.debug_struct("State");
968 builder
969 .field("reading", &self.reading)
970 .field("writing", &self.writing)
971 .field("keep_alive", &self.keep_alive);
972
973 if let Some(ref error) = self.error {
975 builder.field("error", error);
976 }
977
978 if self.allow_half_close {
979 builder.field("allow_half_close", &true);
980 }
981
982 builder.finish()
985 }
986}
987
988impl fmt::Debug for Writing {
989 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
990 match *self {
991 Writing::Init => f.write_str("Init"),
992 Writing::Body(ref enc) => f.debug_tuple("Body").field(enc).finish(),
993 Writing::KeepAlive => f.write_str("KeepAlive"),
994 Writing::Closed => f.write_str("Closed"),
995 }
996 }
997}
998
999impl std::ops::BitAndAssign<bool> for KA {
1000 fn bitand_assign(&mut self, enabled: bool) {
1001 if !enabled {
1002 trace!("remote disabling keep-alive");
1003 *self = KA::Disabled;
1004 }
1005 }
1006}
1007
1008#[derive(Clone, Copy, Debug, Default)]
1009enum KA {
1010 Idle,
1011 #[default]
1012 Busy,
1013 Disabled,
1014}
1015
1016impl KA {
1017 fn idle(&mut self) {
1018 *self = KA::Idle;
1019 }
1020
1021 fn busy(&mut self) {
1022 *self = KA::Busy;
1023 }
1024
1025 fn disable(&mut self) {
1026 *self = KA::Disabled;
1027 }
1028
1029 fn status(&self) -> KA {
1030 *self
1031 }
1032}
1033
1034impl State {
1035 fn close(&mut self) {
1036 trace!("State::close()");
1037 self.reading = Reading::Closed;
1038 self.writing = Writing::Closed;
1039 self.keep_alive.disable();
1040 }
1041
1042 fn close_read(&mut self) {
1043 trace!("State::close_read()");
1044 self.reading = Reading::Closed;
1045 self.keep_alive.disable();
1046 }
1047
1048 fn close_write(&mut self) {
1049 trace!("State::close_write()");
1050 self.writing = Writing::Closed;
1051 self.keep_alive.disable();
1052 }
1053
1054 fn wants_keep_alive(&self) -> bool {
1055 !matches!(self.keep_alive.status(), KA::Disabled)
1056 }
1057
1058 fn try_keep_alive<T: Http1Transaction>(&mut self) {
1059 match (&self.reading, &self.writing) {
1060 (&Reading::KeepAlive, &Writing::KeepAlive) => {
1061 if let KA::Busy = self.keep_alive.status() {
1062 self.idle::<T>();
1063 } else {
1064 trace!(
1065 "try_keep_alive({}): could keep-alive, but status = {:?}",
1066 T::LOG,
1067 self.keep_alive
1068 );
1069 self.close();
1070 }
1071 }
1072 (&Reading::Closed, &Writing::KeepAlive) | (&Reading::KeepAlive, &Writing::Closed) => {
1073 self.close()
1074 }
1075 _ => (),
1076 }
1077 }
1078
1079 fn disable_keep_alive(&mut self) {
1080 self.keep_alive.disable()
1081 }
1082
1083 fn busy(&mut self) {
1084 if let KA::Disabled = self.keep_alive.status() {
1085 return;
1086 }
1087 self.keep_alive.busy();
1088 }
1089
1090 fn idle<T: Http1Transaction>(&mut self) {
1091 debug_assert!(!self.is_idle(), "State::idle() called while idle");
1092
1093 self.method = None;
1094 self.keep_alive.idle();
1095
1096 if !self.is_idle() {
1097 self.close();
1098 return;
1099 }
1100
1101 self.reading = Reading::Init;
1102 self.writing = Writing::Init;
1103
1104 if !T::should_read_first() {
1110 self.notify_read = true;
1111 }
1112 }
1113
1114 fn is_idle(&self) -> bool {
1115 matches!(self.keep_alive.status(), KA::Idle)
1116 }
1117
1118 fn is_read_closed(&self) -> bool {
1119 matches!(self.reading, Reading::Closed)
1120 }
1121
1122 fn is_write_closed(&self) -> bool {
1123 matches!(self.writing, Writing::Closed)
1124 }
1125
1126 fn prepare_upgrade(&mut self) -> crate::upgrade::OnUpgrade {
1127 let (tx, rx) = crate::upgrade::pending();
1128 self.upgrade = Some(tx);
1129 rx
1130 }
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135 #[cfg(all(feature = "nightly", not(miri)))]
1136 #[bench]
1137 fn bench_read_head_short(b: &mut ::test::Bencher) {
1138 use super::*;
1139 use crate::common::io::Compat;
1140 let s = b"GET / HTTP/1.1\r\nHost: localhost:8080\r\n\r\n";
1141 let len = s.len();
1142 b.bytes = len as u64;
1143
1144 let io = Compat(tokio_test::io::Builder::new().build());
1146 let mut conn = Conn::<_, bytes::Bytes, crate::proto::h1::ServerTransaction>::new(io);
1147 *conn.io.read_buf_mut() = ::bytes::BytesMut::from(&s[..]);
1148 conn.state.cached_headers = Some(HeaderMap::with_capacity(2));
1149
1150 let rt = tokio::runtime::Builder::new_current_thread()
1151 .enable_all()
1152 .build()
1153 .unwrap();
1154
1155 b.iter(|| {
1156 rt.block_on(futures_util::future::poll_fn(|cx| {
1157 match conn.poll_read_head(cx) {
1158 Poll::Ready(Some(Ok(x))) => {
1159 ::test::black_box(&x);
1160 let mut headers = x.0.headers;
1161 headers.clear();
1162 conn.state.cached_headers = Some(headers);
1163 }
1164 f => panic!("expected Ready(Some(Ok(..))): {:?}", f),
1165 }
1166
1167 conn.io.read_buf_mut().reserve(1);
1168 unsafe {
1169 conn.io.read_buf_mut().set_len(len);
1170 }
1171 conn.state.reading = Reading::Init;
1172 Poll::Ready(())
1173 }));
1174 });
1175 }
1176
1177 }